Skip to content

Replace _handle_request elif chains with a handler registry #65827

@ferruzzi

Description

@ferruzzi

Description

Replace _handle_request elif chains with a handler registry

Follow-up to #65570.

Once the shared handler extraction in #65570 is complete, every supervisor's _handle_request will still contain a cascading if isinstance chain that calls the helper functions:

if isinstance(msg, GetXCom):
    resp, dump_opts = handle_get_xcom(self.client, msg)
elif isinstance(msg, GetTICount):
    resp, dump_opts = handle_get_ti_count(self.client, msg)
elif isinstance(msg, GetTaskStates):
    resp, dump_opts = handle_get_task_states(self.client, msg)
# ... 15 more identical patterns

This is all boilerplate and every new message type requires touching that chain in every supervisor. We can eliminate this by adding a small decorator-based registry to request_handlers.py.

Proposed Design

Registry (added to request_handlers.py)

_HANDLER_REGISTRY: dict[type, Callable] = {}


def handles(msg_type):
    """Register a function as the handler for a message type."""
    def decorator(fn):
        _HANDLER_REGISTRY[msg_type] = fn
        return fn
    return decorator


def get_handler(msg_type: type) -> Callable:
    handler = _HANDLER_REGISTRY.get(msg_type)
    if handler is None:
        raise TypeError(f"No handler registered for {msg_type.__name__}")
    return handler

Handler registration

Each existing shared handler gets a decorator. No signature changes required:

@handles(GetConnection)
def handle_get_connection(client: Client, msg: GetConnection) -> tuple[BaseModel | None, dict[str, bool]]:
    # ... existing code unchanged ...

Move client to WatchedSubprocess

Currently each subclass defines its own client: Client attribute and all of the shared handlers need it, so moving that up to WatchedSubprocess:

@attrs.define(kw_only=True)
class WatchedSubprocess:
    client: Client
    # ... existing attributes ...

Default dispatch in WatchedSubprocess

Replace the current raise NotImplementedError() with the registry dispatch as the default implementation:

# WatchedSubprocess
def _handle_request(self, msg, log: FilteringBoundLogger, req_id: int) -> None:
    resp, dump_opts = get_handler(type(msg))(self.client, msg)
    self.send_msg(resp, request_id=req_id, error=None, **dump_opts)

Subclass overrides via super()

Subclasses handle their supervisor-specific messages inline, then call super() for everything else:

For CallbackSubprocess, every message that it supports is already in the registry, so we can drop that entire 28-line method and inherit from the parent straight up.

For DagFileProcessorProcess, we only have one that is not in the registry, so it would be something like these 6 lines replacing the existing 118:

def _handle_request(self, msg, log, req_id):
    if isinstance(msg, DagFileParsingResult):
        self.parsing_result = msg
        self.send_msg(None, request_id=req_id, error=None)
        return
    super()._handle_request(msg, log, req_id)

TriggerRunnerSupervisor ends up something like:

def _handle_request(self, msg, log, req_id):
    if isinstance(msg, messages.TriggerStateChanges):
        # ... complex state management ...
        self.send_msg(response, request_id=req_id, error=None)
        return
    if isinstance(msg, UpdateHITLDetail):
        # ... inline ...
        self.send_msg(resp, request_id=req_id, error=None)
        return
    if isinstance(msg, GetHITLDetailResponse):
        # ... inline ...
        self.send_msg(resp, request_id=req_id, error=None)
        return
    super()._handle_request(msg, log, req_id)

and the beast, ActivitySubprocess looks something like:

def _handle_request(self, msg, log, req_id):
    if isinstance(msg, TaskState):
        self._terminal_state = msg.state
        self._task_end_time_monotonic = time.monotonic()
        self._rendered_map_index = msg.rendered_map_index
        self.send_msg(None, request_id=req_id, error=None)
        return
    if isinstance(msg, DeferTask):
        # ...
        return
    # ... other supervisor-specific handlers ...

    # Shared handlers via registry
    super()._handle_request(msg, log, req_id)

Error handling

get_handler raises TypeError for unregistered message types. This is caught by the existing exception handling in WatchedSubprocess.handle_requests, which sends an error response back to the subprocess. Since the type unions (ToSupervisor, ToTriggerSupervisor, etc.) control what the decoder accepts, an unregistered type reaching _handle_request is a developer error (type added to union, handler not registered), and failing loudly is the correct behavior.

Normalizations required

Two existing handlers need minor adjustments to fit the uniform (client, msg) -> (resp, dump_opts) contract:

  • handle_mask_secret currently returns None. Change to return (None, {}).
  • handle_get_prev_successful_dag_run currently takes (client, subprocess_id) instead of (client, msg). Either add a field to GetPrevSuccessfulDagRun so the handler can read it from msg, or keep this as one of the inline special cases in the supervisors that use it.

Scope

This should only cover the shared stateless handlers that live in request_handlers.py. Supervisor-specific handlers that touch internal state (TaskState, DeferTask, TriggerStateChanges, DagFileParsingResult, etc.) stay inline per #65570 rule 7.

Impact by supervisor

  • CallbackSubprocess: All handlers are shared. No override needed; inherits the base class implementation directly.
  • DagFileProcessorProcess: 2 inline special cases (DagFileParsingResult, GetPrevSuccessfulDagRun), then super().
  • TriggerRunnerSupervisor: 3 inline special cases (TriggerStateChanges, UpdateHITLDetail, GetHITLDetailResponse), then super().
  • ActivitySubprocess: ~20 inline special cases remain, then super() for ~18 shared handlers. Cuts the method roughly in half.

Benefits

  • Adding a new shared message type becomes: define the type in comms, add it to the union, write a decorated handler function. No touching dispatch logic in any supervisor.
  • Eliminates duplicated elif isinstance boilerplate across all four supervisors.
  • The registry provides a single source of truth for which message types have shared handlers.
  • The super() pattern means each supervisor only declares what is different about it, which is the whole point of inheritance.

Future Work

Out of scope for this issue, but we can later extend the pattern. Once the shared registry is done, we could add per-supervisor registries for the supervisor-specific handlers too. Each subclass would declare its own registry, and the base class would check that first before falling back to the shared one. Something like:

class WatchedSubprocess:
    _handler_registry: ClassVar[dict[type, Callable]] = {}

    def _handle_request(self, msg, log, req_id):
        handler = self._handler_registry.get(type(msg)) or get_handler(type(msg))
        resp, dump_opts = handler(self.client, msg)
        self.send_msg(resp, request_id=req_id, error=None, **dump_opts)
class ActivitySubprocess(WatchedSubprocess):
    _handler_registry = {
        TaskState: handle_task_state,
        SucceedTask: handle_succeed_task,
        DeferTask: handle_defer_task,
        # ...
    }

That would eliminate the remaining inline if isinstance chains entirely, making _handle_request a zero-line override in every supervisor. The supervisor-specific handlers would need to take (manager, msg) instead of (client, msg) to access self state, but that's a straightforward signature change. This issue gets us the shared registry and the super() pattern; per-supervisor registries can follow once the approach has proven itself.

Use case/motivation

Clean up internal repeated code

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind:featureFeature Requestsneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions