Description
Replace _handle_request elif chains with a handler registry
Follow-up to #65570.
Once the shared handler extraction in #65570 is complete, every supervisor's _handle_request will still contain a cascading if isinstance chain that calls the helper functions:
if isinstance(msg, GetXCom):
resp, dump_opts = handle_get_xcom(self.client, msg)
elif isinstance(msg, GetTICount):
resp, dump_opts = handle_get_ti_count(self.client, msg)
elif isinstance(msg, GetTaskStates):
resp, dump_opts = handle_get_task_states(self.client, msg)
# ... 15 more identical patterns
This is all boilerplate and every new message type requires touching that chain in every supervisor. We can eliminate this by adding a small decorator-based registry to request_handlers.py.
Proposed Design
Registry (added to request_handlers.py)
_HANDLER_REGISTRY: dict[type, Callable] = {}
def handles(msg_type):
"""Register a function as the handler for a message type."""
def decorator(fn):
_HANDLER_REGISTRY[msg_type] = fn
return fn
return decorator
def get_handler(msg_type: type) -> Callable:
handler = _HANDLER_REGISTRY.get(msg_type)
if handler is None:
raise TypeError(f"No handler registered for {msg_type.__name__}")
return handler
Handler registration
Each existing shared handler gets a decorator. No signature changes required:
@handles(GetConnection)
def handle_get_connection(client: Client, msg: GetConnection) -> tuple[BaseModel | None, dict[str, bool]]:
# ... existing code unchanged ...
Move client to WatchedSubprocess
Currently each subclass defines its own client: Client attribute and all of the shared handlers need it, so moving that up to WatchedSubprocess:
@attrs.define(kw_only=True)
class WatchedSubprocess:
client: Client
# ... existing attributes ...
Default dispatch in WatchedSubprocess
Replace the current raise NotImplementedError() with the registry dispatch as the default implementation:
# WatchedSubprocess
def _handle_request(self, msg, log: FilteringBoundLogger, req_id: int) -> None:
resp, dump_opts = get_handler(type(msg))(self.client, msg)
self.send_msg(resp, request_id=req_id, error=None, **dump_opts)
Subclass overrides via super()
Subclasses handle their supervisor-specific messages inline, then call super() for everything else:
For CallbackSubprocess, every message that it supports is already in the registry, so we can drop that entire 28-line method and inherit from the parent straight up.
For DagFileProcessorProcess, we only have one that is not in the registry, so it would be something like these 6 lines replacing the existing 118:
def _handle_request(self, msg, log, req_id):
if isinstance(msg, DagFileParsingResult):
self.parsing_result = msg
self.send_msg(None, request_id=req_id, error=None)
return
super()._handle_request(msg, log, req_id)
TriggerRunnerSupervisor ends up something like:
def _handle_request(self, msg, log, req_id):
if isinstance(msg, messages.TriggerStateChanges):
# ... complex state management ...
self.send_msg(response, request_id=req_id, error=None)
return
if isinstance(msg, UpdateHITLDetail):
# ... inline ...
self.send_msg(resp, request_id=req_id, error=None)
return
if isinstance(msg, GetHITLDetailResponse):
# ... inline ...
self.send_msg(resp, request_id=req_id, error=None)
return
super()._handle_request(msg, log, req_id)
and the beast, ActivitySubprocess looks something like:
def _handle_request(self, msg, log, req_id):
if isinstance(msg, TaskState):
self._terminal_state = msg.state
self._task_end_time_monotonic = time.monotonic()
self._rendered_map_index = msg.rendered_map_index
self.send_msg(None, request_id=req_id, error=None)
return
if isinstance(msg, DeferTask):
# ...
return
# ... other supervisor-specific handlers ...
# Shared handlers via registry
super()._handle_request(msg, log, req_id)
Error handling
get_handler raises TypeError for unregistered message types. This is caught by the existing exception handling in WatchedSubprocess.handle_requests, which sends an error response back to the subprocess. Since the type unions (ToSupervisor, ToTriggerSupervisor, etc.) control what the decoder accepts, an unregistered type reaching _handle_request is a developer error (type added to union, handler not registered), and failing loudly is the correct behavior.
Normalizations required
Two existing handlers need minor adjustments to fit the uniform (client, msg) -> (resp, dump_opts) contract:
handle_mask_secret currently returns None. Change to return (None, {}).
handle_get_prev_successful_dag_run currently takes (client, subprocess_id) instead of (client, msg). Either add a field to GetPrevSuccessfulDagRun so the handler can read it from msg, or keep this as one of the inline special cases in the supervisors that use it.
Scope
This should only cover the shared stateless handlers that live in request_handlers.py. Supervisor-specific handlers that touch internal state (TaskState, DeferTask, TriggerStateChanges, DagFileParsingResult, etc.) stay inline per #65570 rule 7.
Impact by supervisor
- CallbackSubprocess: All handlers are shared. No override needed; inherits the base class implementation directly.
- DagFileProcessorProcess: 2 inline special cases (
DagFileParsingResult, GetPrevSuccessfulDagRun), then super().
- TriggerRunnerSupervisor: 3 inline special cases (
TriggerStateChanges, UpdateHITLDetail, GetHITLDetailResponse), then super().
- ActivitySubprocess: ~20 inline special cases remain, then
super() for ~18 shared handlers. Cuts the method roughly in half.
Benefits
- Adding a new shared message type becomes: define the type in comms, add it to the union, write a decorated handler function. No touching dispatch logic in any supervisor.
- Eliminates duplicated
elif isinstance boilerplate across all four supervisors.
- The registry provides a single source of truth for which message types have shared handlers.
- The
super() pattern means each supervisor only declares what is different about it, which is the whole point of inheritance.
Future Work
Out of scope for this issue, but we can later extend the pattern. Once the shared registry is done, we could add per-supervisor registries for the supervisor-specific handlers too. Each subclass would declare its own registry, and the base class would check that first before falling back to the shared one. Something like:
class WatchedSubprocess:
_handler_registry: ClassVar[dict[type, Callable]] = {}
def _handle_request(self, msg, log, req_id):
handler = self._handler_registry.get(type(msg)) or get_handler(type(msg))
resp, dump_opts = handler(self.client, msg)
self.send_msg(resp, request_id=req_id, error=None, **dump_opts)
class ActivitySubprocess(WatchedSubprocess):
_handler_registry = {
TaskState: handle_task_state,
SucceedTask: handle_succeed_task,
DeferTask: handle_defer_task,
# ...
}
That would eliminate the remaining inline if isinstance chains entirely, making _handle_request a zero-line override in every supervisor. The supervisor-specific handlers would need to take (manager, msg) instead of (client, msg) to access self state, but that's a straightforward signature change. This issue gets us the shared registry and the super() pattern; per-supervisor registries can follow once the approach has proven itself.
Use case/motivation
Clean up internal repeated code
Related issues
No response
Are you willing to submit a PR?
Code of Conduct
Description
Replace
_handle_requestelif chains with a handler registryFollow-up to #65570.
Once the shared handler extraction in #65570 is complete, every supervisor's
_handle_requestwill still contain a cascadingif isinstancechain that calls the helper functions:This is all boilerplate and every new message type requires touching that chain in every supervisor. We can eliminate this by adding a small decorator-based registry to
request_handlers.py.Proposed Design
Registry (added to
request_handlers.py)Handler registration
Each existing shared handler gets a decorator. No signature changes required:
Move
clienttoWatchedSubprocessCurrently each subclass defines its own
client: Clientattribute and all of the shared handlers need it, so moving that up toWatchedSubprocess:Default dispatch in
WatchedSubprocessReplace the current
raise NotImplementedError()with the registry dispatch as the default implementation:Subclass overrides via
super()Subclasses handle their supervisor-specific messages inline, then call
super()for everything else:For
CallbackSubprocess, every message that it supports is already in the registry, so we can drop that entire 28-line method and inherit from the parent straight up.For
DagFileProcessorProcess, we only have one that is not in the registry, so it would be something like these 6 lines replacing the existing 118:TriggerRunnerSupervisorends up something like:and the beast,
ActivitySubprocesslooks something like:Error handling
get_handlerraisesTypeErrorfor unregistered message types. This is caught by the existing exception handling inWatchedSubprocess.handle_requests, which sends an error response back to the subprocess. Since the type unions (ToSupervisor,ToTriggerSupervisor, etc.) control what the decoder accepts, an unregistered type reaching_handle_requestis a developer error (type added to union, handler not registered), and failing loudly is the correct behavior.Normalizations required
Two existing handlers need minor adjustments to fit the uniform
(client, msg) -> (resp, dump_opts)contract:handle_mask_secretcurrently returnsNone. Change to return(None, {}).handle_get_prev_successful_dag_runcurrently takes(client, subprocess_id)instead of(client, msg). Either add a field toGetPrevSuccessfulDagRunso the handler can read it frommsg, or keep this as one of the inline special cases in the supervisors that use it.Scope
This should only cover the shared stateless handlers that live in
request_handlers.py. Supervisor-specific handlers that touch internal state (TaskState,DeferTask,TriggerStateChanges,DagFileParsingResult, etc.) stay inline per #65570 rule 7.Impact by supervisor
DagFileParsingResult,GetPrevSuccessfulDagRun), thensuper().TriggerStateChanges,UpdateHITLDetail,GetHITLDetailResponse), thensuper().super()for ~18 shared handlers. Cuts the method roughly in half.Benefits
elif isinstanceboilerplate across all four supervisors.super()pattern means each supervisor only declares what is different about it, which is the whole point of inheritance.Future Work
Out of scope for this issue, but we can later extend the pattern. Once the shared registry is done, we could add per-supervisor registries for the supervisor-specific handlers too. Each subclass would declare its own registry, and the base class would check that first before falling back to the shared one. Something like:
That would eliminate the remaining inline
if isinstancechains entirely, making_handle_requesta zero-line override in every supervisor. The supervisor-specific handlers would need to take(manager, msg)instead of(client, msg)to accessselfstate, but that's a straightforward signature change. This issue gets us the shared registry and thesuper()pattern; per-supervisor registries can follow once the approach has proven itself.Use case/motivation
Clean up internal repeated code
Related issues
No response
Are you willing to submit a PR?
Code of Conduct