Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 52 additions & 54 deletions common/recipes-rest/rest-api/files/common_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@
import rest_server
from aiohttp import web
from common_utils import (
common_force_async,
RequestContext,
async_web_handler_in_common_executor,
dumps_bytestr,
get_data_from_generator,
get_endpoints,
)


class commonApp_Handler:

# Handler for root resource endpoint
def helper_rest_api(self, request):
def helper_rest_api(self, ctx: RequestContext):
result = {
"Information": {
"Description": "Wedge RESTful API Entry",
Expand All @@ -53,127 +53,125 @@ def helper_rest_api(self, request):
}
return web.json_response(result, dumps=dumps_bytestr)

@common_force_async
def rest_api(self, request):
return self.helper_rest_api(request)
@async_web_handler_in_common_executor
def rest_api(self, ctx: RequestContext):
return self.helper_rest_api(ctx)

# Handler for root resource endpoint
def helper_rest_sys(self, request):
def helper_rest_sys(self, ctx: RequestContext):
result = {
"Information": {"Description": "Wedge System"},
"Actions": [],
"Resources": get_endpoints("/api/sys"),
}
return web.json_response(result, dumps=dumps_bytestr)

@common_force_async
def rest_sys(self, request):
return self.helper_rest_sys(request)
@async_web_handler_in_common_executor
def rest_sys(self, ctx: RequestContext):
return self.helper_rest_sys(ctx)

# Handler for sys/mb resource endpoint
def helper_rest_mb_sys(self, request):
def helper_rest_mb_sys(self, ctx: RequestContext):
result = {
"Information": {"Description": "System Motherboard"},
"Actions": [],
"Resources": get_endpoints("/api/sys/mb"),
}
return web.json_response(result, dumps=dumps_bytestr)

@common_force_async
def rest_mb_sys(self, request):
return self.helper_rest_mb_sys(request)
@async_web_handler_in_common_executor
def rest_mb_sys(self, ctx: RequestContext):
return self.helper_rest_mb_sys(ctx)

# Handler for sys/mb/fruid resource endpoint
def helper_rest_fruid_hdl(self, request):
def helper_rest_fruid_hdl(self, ctx: RequestContext):
return web.json_response(rest_fruid.get_fruid(), dumps=dumps_bytestr)

@common_force_async
def rest_fruid_hdl(self, request):
return self.helper_rest_fruid_hdl(request)
@async_web_handler_in_common_executor
def rest_fruid_hdl(self, ctx: RequestContext):
return self.helper_rest_fruid_hdl(ctx)

# Handler for sys/mb/fruid resource endpoint
def helper_rest_fruid_pim_hdl(self, request):
def helper_rest_fruid_pim_hdl(self, ctx: RequestContext):
return web.json_response(
rest_fruid_pim.get_fruid(), dumps=dumps_bytestr, status=200
)

@common_force_async
def rest_fruid_pim_hdl(self, request):
return self.helper_rest_fruid_pim_hdl(request)
@async_web_handler_in_common_executor
def rest_fruid_pim_hdl(self, ctx: RequestContext):
return self.helper_rest_fruid_pim_hdl(ctx)

async def rest_bmc_hdl(self, request):
result = await rest_bmc.get_bmc()
return web.json_response(result, dumps=dumps_bytestr)

# Handler for sys/server resource endpoint
def helper_rest_server_hdl(self, request):
def helper_rest_server_hdl(self, ctx: RequestContext):
return web.json_response(rest_server.get_server(), dumps=dumps_bytestr)

@common_force_async
def rest_server_hdl(self, request):
return self.helper_rest_server_hdl(request)
@async_web_handler_in_common_executor
def rest_server_hdl(self, ctx: RequestContext):
return self.helper_rest_server_hdl(ctx)

# Handler for uServer resource endpoint
def helper_rest_server_act_hdl(self, request):
data = get_data_from_generator(request.json())
def helper_rest_server_act_hdl(self, ctx: RequestContext):
return web.json_response(
rest_server.server_action(data), dumps=dumps_bytestr, status=200
rest_server.server_action(ctx.json_data), dumps=dumps_bytestr, status=200
)

@common_force_async
def rest_server_act_hdl(self, request):
return self.helper_rest_server_act_hdl(request)
@async_web_handler_in_common_executor
def rest_server_act_hdl(self, ctx: RequestContext):
return self.helper_rest_server_act_hdl(ctx)

# Handler for sensors resource endpoint
def helper_rest_sensors_hdl(self, request):
def helper_rest_sensors_hdl(self, ctx: RequestContext):
return web.json_response(
rest_sensors.get_sensors(), dumps=dumps_bytestr, status=200
)

@common_force_async
def rest_sensors_hdl(self, request):
return self.helper_rest_sensors_hdl(request)
@async_web_handler_in_common_executor
def rest_sensors_hdl(self, ctx: RequestContext):
return self.helper_rest_sensors_hdl(ctx)

# Handler for gpios resource endpoint
def helper_rest_gpios_hdl(self, request):
def helper_rest_gpios_hdl(self, ctx: RequestContext):
return web.json_response(
rest_gpios.get_gpios(), dumps=dumps_bytestr, status=200
)

@common_force_async
def rest_gpios_hdl(self, request):
return self.helper_rest_gpios_hdl(request)
@async_web_handler_in_common_executor
def rest_gpios_hdl(self, ctx: RequestContext):
return self.helper_rest_gpios_hdl(ctx)

# Handler for peer FC presence resource endpoint
def helper_rest_fcpresent_hdl(self, request):
def helper_rest_fcpresent_hdl(self, ctx: RequestContext):
return web.json_response(
rest_fcpresent.get_fcpresent(), dumps=dumps_bytestr, status=200
)

@common_force_async
def rest_fcpresent_hdl(self, request):
return self.helper_rest_fcpresent_hdl(request)
@async_web_handler_in_common_executor
def rest_fcpresent_hdl(self, ctx: RequestContext):
return self.helper_rest_fcpresent_hdl(ctx)

# Handler for psu_update resource endpoint
def helper_psu_update_hdl(self, request):
def helper_psu_update_hdl(self, ctx: RequestContext):
return web.json_response(
rest_psu_update.get_jobs(), dumps=dumps_bytestr, status=200
)

@common_force_async
def psu_update_hdl(self, request):
return self.helper_psu_update_hdl(request)
@async_web_handler_in_common_executor
def psu_update_hdl(self, ctx: RequestContext):
return self.helper_psu_update_hdl(ctx)

# Handler for psu_update resource action
def helper_psu_update_hdl_post(self, request):
data = get_data_from_generator(request.json())
def helper_psu_update_hdl_post(self, ctx: RequestContext):
return web.json_response(
rest_psu_update.begin_job(data), dumps=dumps_bytestr, status=200
rest_psu_update.begin_job(ctx.json_data), dumps=dumps_bytestr, status=200
)

@common_force_async
def psu_update_hdl_post(self, request):
return self.helper_psu_update_hdl_post(request)
@async_web_handler_in_common_executor
def psu_update_hdl_post(self, ctx: RequestContext):
return self.helper_psu_update_hdl_post(ctx)

# Handler for additional fscd sensor data
@staticmethod
Expand Down
83 changes: 61 additions & 22 deletions common/recipes-rest/rest-api/files/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import os
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional, Set, Tuple, Union # noqa: F401
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Tuple, Union # noqa: F401

from aiohttp import web
from common_webapp import WebApp
Expand All @@ -14,38 +15,76 @@
ENDPOINT_CHILDREN = {} # type: Dict[str, Set[str]]


def common_force_async(func):
@dataclass(frozen=True)
class RequestContext:
"""Thread-safe, immutable context extracted from an aiohttp request.

This dataclass captures request data before entering the executor thread,
since aiohttp's request object is not thread-safe.
"""

method: str
path: str
json_data: Dict[str, Any]


async def _run_in_common_executor(func, *args, **kwargs):
# common handler will use its own executor (thread based),
# we initentionally separated this from the executor of
# board-specific REST handler, so that any problem in
# common REST handlers will not interfere with board-specific
# REST handler, and vice versa
loop = asyncio.get_running_loop()
return await loop.run_in_executor(common_executor, func, *args, **kwargs)


def async_in_common_executor(func):
"""Decorator to run a blocking function in the common executor thread pool.

Use this for standalone functions that do NOT take an aiohttp request as a
parameter. The decorated function's arguments are forwarded unchanged.

For request handlers that need access to the aiohttp request, use
`async_web_handler_in_common_executor` instead.
"""

async def func_wrapper(*args, **kwargs):
# Convert the possibly blocking helper function into async
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(common_executor, func, *args, **kwargs)
return result
return await _run_in_common_executor(func, *args, **kwargs)

return func_wrapper


# When we call request.json() in asynchronous function, a generator
# will be returned. Upon calling next(), the generator will either :
#
# 1) return the next data as usual,
# - OR -
# 2) throw StopIteration, with its first argument as the data
# (this is for indicating that no more data is available)
#
# Not sure why aiohttp's request generator is implemented this way, but
# the following function will handle both of the cases mentioned above.
def get_data_from_generator(data_generator):
data = None
try:
data = next(data_generator)
except StopIteration as e:
data = e.args[0]
return data
def async_web_handler_in_common_executor(func):
"""Decorator for aiohttp endpoint handlers, run in the common executor thread pool.

Use this for handlers wired up as aiohttp routes (the ones aiohttp invokes
with a `web.Request`). Note that the decorated handler does NOT receive the
raw request: the decorator reads the request on the event loop and passes a
`RequestContext` in its place, so the handler's signature is
`(self, ctx: RequestContext)`, not `(self, request)`. This conversion is
required because the aiohttp request object is not thread-safe and must not
be touched from the executor thread.

For standalone functions that do not handle a web request, use `async_in_common_executor`
instead.
"""

async def func_wrapper(self, request: web.Request, *args, **kwargs):
# aiohttp >= 2.3 exposes can_read_body; aiohttp 2.1.0 used on
# older distros (rocko/dunfell) only has the equivalent payload check
# via `request.content.at_eof()``.
can_read_body = getattr(request, "can_read_body", None)
if can_read_body is None:
can_read_body = not request.content.at_eof()
ctx = RequestContext(
method=request.method,
path=request.path,
json_data=await request.json() if can_read_body else {},
)
return await _run_in_common_executor(func, self, ctx, *args, **kwargs)

return func_wrapper


def get_endpoints(path: str):
Expand Down
4 changes: 2 additions & 2 deletions common/recipes-rest/rest-api/files/redfish_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import redfish_chassis_helper
import rest_pal_legacy
from aiohttp import web
from common_utils import common_force_async, dumps_bytestr, parse_expand_level
from common_utils import async_in_common_executor, dumps_bytestr, parse_expand_level
from redfish_base import validate_keys

try:
Expand Down Expand Up @@ -99,7 +99,7 @@ async def get_redfish_sensor_handler(request: web.Request) -> web.Response:
return web.json_response(body, dumps=dumps_bytestr)


@common_force_async
@async_in_common_executor
def _get_sensor_members(parent_resource: str, fru_names: t.List[str], expand: bool):
assert _SENSOR_LOCK.locked()
members_json = []
Expand Down