diff --git a/common/recipes-rest/rest-api/files/common_endpoint.py b/common/recipes-rest/rest-api/files/common_endpoint.py index 01bcc2da342..2c25c156cb9 100755 --- a/common/recipes-rest/rest-api/files/common_endpoint.py +++ b/common/recipes-rest/rest-api/files/common_endpoint.py @@ -32,9 +32,9 @@ import rest_server from aiohttp import web from common_utils import ( - common_force_async, + RequestContext, + async_web_handler_in_common_executor, dumps_bytestr, - get_data_from_generator, get_endpoints, ) @@ -42,7 +42,7 @@ class commonApp_Handler: # Handler for root resource endpoint - def helper_rest_api(self, request): + def helper_rest_api(self, ctx: RequestContext): result = { "Information": { "Description": "Wedge RESTful API Entry", @@ -53,12 +53,12 @@ def helper_rest_api(self, request): } return web.json_response(result, dumps=dumps_bytestr) - @common_force_async - def rest_api(self, request): - return self.helper_rest_api(request) + @async_web_handler_in_common_executor + def rest_api(self, ctx: RequestContext): + return self.helper_rest_api(ctx) # Handler for root resource endpoint - def helper_rest_sys(self, request): + def helper_rest_sys(self, ctx: RequestContext): result = { "Information": {"Description": "Wedge System"}, "Actions": [], @@ -66,12 +66,12 @@ def helper_rest_sys(self, request): } return web.json_response(result, dumps=dumps_bytestr) - @common_force_async - def rest_sys(self, request): - return self.helper_rest_sys(request) + @async_web_handler_in_common_executor + def rest_sys(self, ctx: RequestContext): + return self.helper_rest_sys(ctx) # Handler for sys/mb resource endpoint - def helper_rest_mb_sys(self, request): + def helper_rest_mb_sys(self, ctx: RequestContext): result = { "Information": {"Description": "System Motherboard"}, "Actions": [], @@ -79,101 +79,99 @@ def helper_rest_mb_sys(self, request): } return web.json_response(result, dumps=dumps_bytestr) - @common_force_async - def rest_mb_sys(self, request): - return self.helper_rest_mb_sys(request) + @async_web_handler_in_common_executor + def rest_mb_sys(self, ctx: RequestContext): + return self.helper_rest_mb_sys(ctx) # Handler for sys/mb/fruid resource endpoint - def helper_rest_fruid_hdl(self, request): + def helper_rest_fruid_hdl(self, ctx: RequestContext): return web.json_response(rest_fruid.get_fruid(), dumps=dumps_bytestr) - @common_force_async - def rest_fruid_hdl(self, request): - return self.helper_rest_fruid_hdl(request) + @async_web_handler_in_common_executor + def rest_fruid_hdl(self, ctx: RequestContext): + return self.helper_rest_fruid_hdl(ctx) # Handler for sys/mb/fruid resource endpoint - def helper_rest_fruid_pim_hdl(self, request): + def helper_rest_fruid_pim_hdl(self, ctx: RequestContext): return web.json_response( rest_fruid_pim.get_fruid(), dumps=dumps_bytestr, status=200 ) - @common_force_async - def rest_fruid_pim_hdl(self, request): - return self.helper_rest_fruid_pim_hdl(request) + @async_web_handler_in_common_executor + def rest_fruid_pim_hdl(self, ctx: RequestContext): + return self.helper_rest_fruid_pim_hdl(ctx) async def rest_bmc_hdl(self, request): result = await rest_bmc.get_bmc() return web.json_response(result, dumps=dumps_bytestr) # Handler for sys/server resource endpoint - def helper_rest_server_hdl(self, request): + def helper_rest_server_hdl(self, ctx: RequestContext): return web.json_response(rest_server.get_server(), dumps=dumps_bytestr) - @common_force_async - def rest_server_hdl(self, request): - return self.helper_rest_server_hdl(request) + @async_web_handler_in_common_executor + def rest_server_hdl(self, ctx: RequestContext): + return self.helper_rest_server_hdl(ctx) # Handler for uServer resource endpoint - def helper_rest_server_act_hdl(self, request): - data = get_data_from_generator(request.json()) + def helper_rest_server_act_hdl(self, ctx: RequestContext): return web.json_response( - rest_server.server_action(data), dumps=dumps_bytestr, status=200 + rest_server.server_action(ctx.json_data), dumps=dumps_bytestr, status=200 ) - @common_force_async - def rest_server_act_hdl(self, request): - return self.helper_rest_server_act_hdl(request) + @async_web_handler_in_common_executor + def rest_server_act_hdl(self, ctx: RequestContext): + return self.helper_rest_server_act_hdl(ctx) # Handler for sensors resource endpoint - def helper_rest_sensors_hdl(self, request): + def helper_rest_sensors_hdl(self, ctx: RequestContext): return web.json_response( rest_sensors.get_sensors(), dumps=dumps_bytestr, status=200 ) - @common_force_async - def rest_sensors_hdl(self, request): - return self.helper_rest_sensors_hdl(request) + @async_web_handler_in_common_executor + def rest_sensors_hdl(self, ctx: RequestContext): + return self.helper_rest_sensors_hdl(ctx) # Handler for gpios resource endpoint - def helper_rest_gpios_hdl(self, request): + def helper_rest_gpios_hdl(self, ctx: RequestContext): return web.json_response( rest_gpios.get_gpios(), dumps=dumps_bytestr, status=200 ) - @common_force_async - def rest_gpios_hdl(self, request): - return self.helper_rest_gpios_hdl(request) + @async_web_handler_in_common_executor + def rest_gpios_hdl(self, ctx: RequestContext): + return self.helper_rest_gpios_hdl(ctx) # Handler for peer FC presence resource endpoint - def helper_rest_fcpresent_hdl(self, request): + def helper_rest_fcpresent_hdl(self, ctx: RequestContext): return web.json_response( rest_fcpresent.get_fcpresent(), dumps=dumps_bytestr, status=200 ) - @common_force_async - def rest_fcpresent_hdl(self, request): - return self.helper_rest_fcpresent_hdl(request) + @async_web_handler_in_common_executor + def rest_fcpresent_hdl(self, ctx: RequestContext): + return self.helper_rest_fcpresent_hdl(ctx) # Handler for psu_update resource endpoint - def helper_psu_update_hdl(self, request): + def helper_psu_update_hdl(self, ctx: RequestContext): return web.json_response( rest_psu_update.get_jobs(), dumps=dumps_bytestr, status=200 ) - @common_force_async - def psu_update_hdl(self, request): - return self.helper_psu_update_hdl(request) + @async_web_handler_in_common_executor + def psu_update_hdl(self, ctx: RequestContext): + return self.helper_psu_update_hdl(ctx) # Handler for psu_update resource action - def helper_psu_update_hdl_post(self, request): - data = get_data_from_generator(request.json()) + def helper_psu_update_hdl_post(self, ctx: RequestContext): return web.json_response( - rest_psu_update.begin_job(data), dumps=dumps_bytestr, status=200 + rest_psu_update.begin_job(ctx.json_data), dumps=dumps_bytestr, status=200 ) - @common_force_async - def psu_update_hdl_post(self, request): - return self.helper_psu_update_hdl_post(request) + @async_web_handler_in_common_executor + def psu_update_hdl_post(self, ctx: RequestContext): + return self.helper_psu_update_hdl_post(ctx) # Handler for additional fscd sensor data @staticmethod diff --git a/common/recipes-rest/rest-api/files/common_utils.py b/common/recipes-rest/rest-api/files/common_utils.py index c527d2af2be..482535217ea 100644 --- a/common/recipes-rest/rest-api/files/common_utils.py +++ b/common/recipes-rest/rest-api/files/common_utils.py @@ -3,7 +3,8 @@ import os import re from concurrent.futures import ThreadPoolExecutor -from typing import Dict, List, Optional, Set, Tuple, Union # noqa: F401 +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Set, Tuple, Union # noqa: F401 from aiohttp import web from common_webapp import WebApp @@ -14,38 +15,76 @@ ENDPOINT_CHILDREN = {} # type: Dict[str, Set[str]] -def common_force_async(func): +@dataclass(frozen=True) +class RequestContext: + """Thread-safe, immutable context extracted from an aiohttp request. + + This dataclass captures request data before entering the executor thread, + since aiohttp's request object is not thread-safe. + """ + + method: str + path: str + json_data: Dict[str, Any] + + +async def _run_in_common_executor(func, *args, **kwargs): # common handler will use its own executor (thread based), # we initentionally separated this from the executor of # board-specific REST handler, so that any problem in # common REST handlers will not interfere with board-specific # REST handler, and vice versa + loop = asyncio.get_running_loop() + return await loop.run_in_executor(common_executor, func, *args, **kwargs) + + +def async_in_common_executor(func): + """Decorator to run a blocking function in the common executor thread pool. + + Use this for standalone functions that do NOT take an aiohttp request as a + parameter. The decorated function's arguments are forwarded unchanged. + + For request handlers that need access to the aiohttp request, use + `async_web_handler_in_common_executor` instead. + """ + async def func_wrapper(*args, **kwargs): # Convert the possibly blocking helper function into async - loop = asyncio.get_running_loop() - result = await loop.run_in_executor(common_executor, func, *args, **kwargs) - return result + return await _run_in_common_executor(func, *args, **kwargs) return func_wrapper -# When we call request.json() in asynchronous function, a generator -# will be returned. Upon calling next(), the generator will either : -# -# 1) return the next data as usual, -# - OR - -# 2) throw StopIteration, with its first argument as the data -# (this is for indicating that no more data is available) -# -# Not sure why aiohttp's request generator is implemented this way, but -# the following function will handle both of the cases mentioned above. -def get_data_from_generator(data_generator): - data = None - try: - data = next(data_generator) - except StopIteration as e: - data = e.args[0] - return data +def async_web_handler_in_common_executor(func): + """Decorator for aiohttp endpoint handlers, run in the common executor thread pool. + + Use this for handlers wired up as aiohttp routes (the ones aiohttp invokes + with a `web.Request`). Note that the decorated handler does NOT receive the + raw request: the decorator reads the request on the event loop and passes a + `RequestContext` in its place, so the handler's signature is + `(self, ctx: RequestContext)`, not `(self, request)`. This conversion is + required because the aiohttp request object is not thread-safe and must not + be touched from the executor thread. + + For standalone functions that do not handle a web request, use `async_in_common_executor` + instead. + """ + + async def func_wrapper(self, request: web.Request, *args, **kwargs): + # aiohttp >= 2.3 exposes can_read_body; aiohttp 2.1.0 used on + # older distros (rocko/dunfell) only has the equivalent payload check + # via `request.content.at_eof()``. + can_read_body = getattr(request, "can_read_body", None) + if can_read_body is None: + can_read_body = not request.content.at_eof() + ctx = RequestContext( + method=request.method, + path=request.path, + json_data=await request.json() if can_read_body else {}, + ) + return await _run_in_common_executor(func, self, ctx, *args, **kwargs) + + return func_wrapper def get_endpoints(path: str): diff --git a/common/recipes-rest/rest-api/files/redfish_sensors.py b/common/recipes-rest/rest-api/files/redfish_sensors.py index ddce836f247..b2e97f372d3 100644 --- a/common/recipes-rest/rest-api/files/redfish_sensors.py +++ b/common/recipes-rest/rest-api/files/redfish_sensors.py @@ -7,7 +7,7 @@ import redfish_chassis_helper import rest_pal_legacy from aiohttp import web -from common_utils import common_force_async, dumps_bytestr, parse_expand_level +from common_utils import async_in_common_executor, dumps_bytestr, parse_expand_level from redfish_base import validate_keys try: @@ -99,7 +99,7 @@ async def get_redfish_sensor_handler(request: web.Request) -> web.Response: return web.json_response(body, dumps=dumps_bytestr) -@common_force_async +@async_in_common_executor def _get_sensor_members(parent_resource: str, fru_names: t.List[str], expand: bool): assert _SENSOR_LOCK.locked() members_json = []