Skip to content

refactor: introduce typed executor result schema#52

Open
kaiitunnz wants to merge 19 commits into
mainfrom
kaiitunnz/refactor/result-schema
Open

refactor: introduce typed executor result schema#52
kaiitunnz wants to merge 19 commits into
mainfrom
kaiitunnz/refactor/result-schema

Conversation

@kaiitunnz
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz commented May 18, 2026

Purpose

This PR partially addresses a TODO item in #48 (Pydantic schema simplification).

Replace the untyped dict[str, Any] shape executor results have always flowed through with a typed BaseExecutorResult schema, so the wire format between worker and server stops being a string-keyed dict that every consumer has to .get() defensively. Also removes the jsonl_export echo and its server-side path-rewriter — both redundant with the existing _artifacts resolution mechanism.

Changes

  • Schemas. New BaseExecutorResult, ArtifactRef, and ArtifactContext in src/shared/schemas/. The base class carries the cross-cutting fields (children, artifacts aliased to _artifacts); extra="allow" lets the server round-trip subclass payloads, and SerializeAsAny[...] on BaseExecutorResult.children / ResultEnvelope.result keeps subclass-specific fields through the declared-base-type serialization.
  • Executors. Every executor now declares a typed *Result(BaseExecutorResult) subclass and constructs it directly in run(). Artifact-bearing fields are typed as ArtifactRef | None rather than raw {path: ...} dicts.
  • Runner / dispatcher. Runner._write_results, GovernanceMixin._dump_to_governance, write_executor_result, and Dispatcher._load_stage_result are strictly typed on BaseExecutorResult / ResultEnvelope. _dig_path walks BaseModel attributes (extras still resolve via __pydantic_extra__); _render_artifact_ref reads ctx.base_url / ctx.base_dir directly.
  • Server. GET /results/{task_id} returns a typed BaseExecutorResult (FastAPI serializes via Pydantic). _rewrite_jsonl_export_paths is gone, along with the inference-side jsonl_export echo.
  • SDK. Mirrored BaseExecutorResult / ArtifactRef / ArtifactContext so materialize() walks typed attributes instead of poking raw dicts. New tests/sdk/test_schema_compat rows guard SDK/server drift.
  • Tests. New tests/shared/test_executor_result.py covers subclass round-trip through the base class, both directions of the _artifacts alias, recursive children, and the envelope write/read path. Existing worker fixtures (mp lifecycle, connector logging, agent connector) switch to typed result subclasses so attribute access works naturally.
  • Docs. docs/ARCHITECTURE.md and docs/EXECUTORS.md describe the typed schema and the contract executor authors actually need.

Design

The base class is the only thing the wire actually needs to know about. The server deserializes any incoming ResultEnvelope as BaseExecutorResult; subclass-specific fields ride through as __pydantic_extra__ and re-serialize cleanly thanks to SerializeAsAny. That keeps the server decoupled from the executor registry — no discriminator, no union codec, no need to know whether a stage produced a VLLMResult or a DataRetrievalResult before reading the cross-cutting fields it cares about (children, _artifacts).

Removing jsonl_export falls out of that framing. The server's _rewrite_jsonl_export_paths was the only executor-specific field the server ever read, and it was redundant with _artifacts + relative paths — artifact_to_source / _render_artifact_ref already turn any {path: rel} into a URL or local path on demand. The asymmetry was visible in the tree: LoRA's final_lora / checkpoints_dir shipped as ArtifactRef and were never rewritten by the server, yet downstream stages still resolved them correctly through the generic mechanism.

Test Plan

uv run pre-commit run --all-files
uv run pytest tests/ --ignore=tests/worker/test_mp_executor_cleanup_gpu.py

Separately ran a local stack with 2 GPU workers and submitted a representative workflow per executor sequentially to exercise the dispatcher's typed-envelope walk and the placeholder + artifact-ref resolution paths the refactor touched most.

Test Result

$ uv run pre-commit run --all-files
# All hooks passed (gitleaks, isort, black, ruff, codespell, mypy, sync-requirements)

$ uv run pytest tests/ --ignore=tests/worker/test_mp_executor_cleanup_gpu.py
All tests passed

Local stack e2e (2× GPU): 13/13 reached DONE — echo_local, echo_three_node_graph, conditional_echo, inference_vllm_tiny, inference_hf_tiny, omni_text2image, omni_text2speech, sft_llama_1b, lora_sft_llama, dpo_training_llama_1b, ppo_training_llama_1b, dag_inference_example, lora_then_inference.


Pre-submission Checklist
  • I have read the contribution guidelines.
  • I have run pre-commit run --all-files and fixed any issues.
  • I have added or updated tests covering my changes (if applicable).
  • I have verified that uv run pytest tests/ passes locally.
  • If I changed shared schemas or proto definitions, I have checked downstream compatibility across Server and Worker.
  • If I changed the SDK or CLI, I have verified the affected packages work (uv sync --all-packages --group ci --frozen).
  • If this is a breaking change, I have prefixed the PR title with [BREAKING] and described migration steps above.
  • I have updated documentation or config examples if user-facing behavior changed.

kaiitunnz added 19 commits May 18, 2026 14:38
The inference mixin stamped a `jsonl_export` dict into the executor
result and the server's results router rewrote its `path` / `url` /
`relative_path` on subsequent file uploads. Both are redundant: the
JSONL file lives at `artifacts/{spec.postprocess.jsonl_export.path}`,
the spec already declares that relative path, and the existing
`_artifacts` context plus the generic ref resolvers already turn any
`{path: rel}` into a URL or local path on demand. Removing the echo
also drops the only executor-specific field the server was reading
out of an opaque result payload.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Introduce ``BaseExecutorResult`` as the shared shape every executor's
result conforms to, plus ``ArtifactRef`` and ``ArtifactContext`` for the
existing relative-path resolution model. ``extra="allow"`` on the base
class lets the server deserialize executor-specific subclass payloads
without losing fields, so the wire format stays decoupled from the
executor registry. No callers yet — wiring lands in follow-up commits.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
``ResultEnvelope.result`` is now ``BaseExecutorResult`` instead of
``dict[str, Any]``. ``artifact_ref`` returns ``ArtifactRef`` and
``build_artifact_context`` returns ``ArtifactContext``.
``write_executor_result`` accepts either form and validates dict input
into the base model before stamping the artifact context. The wire
format is preserved by serializing under the ``_artifacts`` alias.
Executors continue to return ``dict[str, Any]`` and are migrated to the
typed subclasses in follow-up commits; the server flattens the typed
``result`` back to a dict at ``_load_stage_result`` so the existing
ref-resolution path is unaffected.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Each migrated executor now declares its own ``BaseExecutorResult``
subclass (``EchoResult``, ``APIResult``, ``SSHResult``,
``DataProfilingResult``, ``DataRetrievalResult``) and ``run()`` returns
the typed model in place of an opaque ``dict[str, Any]``. The runner
and the governance mixin accept either form during the transition
(``BaseExecutorResult | dict[str, Any]``); inference and training
executors migrate in follow-up commits. The base ``Executor.run`` return
type is widened correspondingly. ``APIResult.response_json`` carries the
``json`` wire-format alias to avoid shadowing ``BaseModel.json``.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
VLLM, transformers, diffusers, omni, RAG, agent, and the MP wrapper now
return typed ``BaseExecutorResult`` subclasses (``VLLMResult``,
``TransformersResult``, ``DiffusersResult``, ``OmniResult``,
``RAGResult``, ``AgentResult``). Internals keep building the result as a
dict and validate into the model at the return boundary, preserving the
wire shape. ``MPExecutor`` widens its return signature to pass through
either form from the inner executor. The transformers inference test
switches to attribute access on the typed result.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
SFT, LoRA-SFT, DPO, and PPO executors return typed
``BaseExecutorResult`` subclasses (``SFTResult``, ``LoRAResult``,
``DPOResult``, ``PPOResult``). The non-LoRA training shapes are kept
permissive (most fields optional) because the distributed-spawn paths
return a partial dict, either loaded from the subprocess IPC file or
synthesized by the parent. Artifact-bearing fields are typed as
``ArtifactRef`` so the on-disk shape is validated end-to-end.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…alization

Add round-trip tests for the executor result schema: subclass payload
preservation through the base class, ``_artifacts`` alias in both
directions, recursive ``children``, and the full envelope write→read
path.

The recursive-children test exposed a wire-format bug — Pydantic
defaults to serializing fields using their declared type, so a
subclass instance assigned to a ``BaseExecutorResult`` field (or
nested inside ``children``) lost its executor-specific fields on
``model_dump_json``. Pass ``serialize_as_any=True`` at every
worker-side serialization seam (``write_result_in_envelope``,
``_maybe_emit_http``, ``_dump_to_governance``) and on the server's
stage-result load so the actual subclass shape reaches disk and
downstream consumers.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Architecture's task-merging bullet now points at the typed
``BaseExecutorResult``, and EXECUTORS.md gains a Result schema section
covering the cross-cutting base fields, per-executor subclass
placement, artifact resolution, and the ``serialize_as_any`` seam.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Apply isort / black formatting and resolve mypy errors surfaced by
``pre-commit run --all-files``:

- ``BaseExecutorResult.artifacts`` switches to
  ``validation_alias=AliasChoices("artifacts", "_artifacts")`` +
  ``serialization_alias="_artifacts"`` so ``artifacts=...`` is the
  canonical constructor name while still accepting both keys on
  the wire.
- ``write_result_in_envelope`` validates a dict input into
  ``BaseExecutorResult`` before constructing the envelope.
- Dispatcher's skip-envelope path constructs ``BaseExecutorResult()``
  instead of an empty dict.
- ``SSHResult.command`` / ``entrypoint`` are typed ``list[str] | None``
  to match ``SSHConfig``.
- ``DiffusersExecutor`` types ``generated_images`` as
  ``list[ArtifactRef]``.
- ``DPOExecutor`` narrows the ``model_name`` resolution to ``str |
  None`` via a conditional expression.
- ``get_result`` returns ``result.model_dump(serialize_as_any=True)``
  to match the router's ``dict[str, Any]`` signature.
- Two MP-executor tests assert ``isinstance(result, dict)`` so mypy
  can subscript through the union.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
``Executor.run`` is now strictly typed as ``-> BaseExecutorResult``
(no dict union), and every executor constructs its typed subclass
directly instead of validating a dict at the return boundary.

Schema: ``BaseExecutorResult`` is consolidated into
``shared/schemas/result.py``; ``children`` and ``ResultEnvelope.result``
use ``SerializeAsAny[...]`` so subclass-specific fields survive the
declared-type annotation without per-call ``serialize_as_any=True``.
``upstreamResults`` on ``TaskSpecStrictBase`` /
``TaskSpecTemplateBase`` is typed as ``dict[str, BaseExecutorResult]``.

Plumbing: ``write_result_in_envelope`` is folded into
``write_executor_result`` since the dict-input path is no longer
needed. ``_dump_to_governance``, ``_populate_table``, and
``_maybe_export_jsonl`` take the values they actually use (typed
result, items list) instead of mutating an opaque payload. The
dispatcher's ``_load_stage_result`` returns ``ResultEnvelope``;
``_dig_path`` walks ``BaseModel`` attributes; ``_render_artifact_ref``
reads ``ctx.base_url`` / ``ctx.base_dir`` directly.

Omni: ``OmniResult`` becomes strict (``executor``, ``mode``, ``items``,
``model``) and each concrete omni executor declares its own
``OmniText2*Result`` subclass with default ``executor`` / ``mode``
literals.

Test fixtures (``test_mp_executor_lifecycle``, ``test_connector_logging``)
define typed result subclasses so assertions use natural attribute
access; the multiprocessing boundary pickles the subclass through to
the parent.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Tightens the worker-side mirror of the server's ``_render_artifact_ref``
work: ``artifact_to_source`` and ``maybe_resolve_artifact_ref`` take
``context: dict[str, BaseExecutorResult] | None`` and read
``node_result.artifacts`` directly instead of walking dict shapes
defensively. The legacy "result key" fallback (where some executors
nested their payload under a ``"result"`` key) is removed — no
executor produces that shape after the result-schema refactor.
``GovernanceMixin._spec_upstream_results`` and the data-mixin
``context`` local are retyped accordingly, and ``_evaluate_expr``
grows a ``BaseModel`` branch (with sentinel-default ``getattr``) so
expressions can walk into typed upstream results.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The ``artifact_ref(rel_path)`` helper was a one-liner around
``ArtifactRef(path=rel_path)``; now that ``ArtifactRef`` is the typed
schema callers reach for directly, the wrapper is dead weight. Drop
it and the corresponding test, and inline the constructor at every
call site across the nine executors that used it.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
``GET /results/{task_id}`` now returns the validated
``BaseExecutorResult`` directly instead of immediately flattening it
back to ``dict[str, Any]`` via ``model_dump``. FastAPI serializes the
model via Pydantic (alias and ``extra="allow"`` preserved), and the
endpoint's signature documents the actual shape the client sees.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…walker

Replaces ``_dig_path``'s ``hasattr`` + ``getattr`` double-lookup with
the same sentinel-default pattern ``_evaluate_expr`` already uses, and
unifies the constant name (``_MISSING`` → ``_SENTINEL``) across both
files so the two dynamic-attribute walkers stay aligned.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Mirrors the server's ``BaseExecutorResult``, ``ArtifactContext``, and
``ArtifactRef`` Pydantic models on the SDK side so ``materialize()``
walks typed attributes instead of poking raw dicts under
``envelope.result["_artifacts"]``. ``ResultEnvelope.result`` becomes
``SerializeAsAny[BaseExecutorResult]`` to preserve subclass payloads
through the declared base type, matching the server's wire shape.

Shared ``ArtifactContext.base_url`` gains ``exclude_if=lambda v: v is
None`` so setting the field to ``None`` drops the key on serialization,
replacing the previous ``ctx.pop("base_url", None)`` dict mutation.

``tests/sdk/test_schema_compat.py`` parametrizes the three new model
pairs so server / SDK drift is caught at CI time.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Each typed executor result (``AgentResult``, ``DataRetrievalResult``,
``DPOResult``, ``EchoResult``, ``LoRAResult``, ``PPOResult``,
``RAGResult``, ``SFTResult``) is now built with every field passed at
construction time instead of being mutated afterwards
(``result.final_model = ArtifactRef(...)``). Conditional ``ArtifactRef``
fields are folded into the constructor call as inline ternaries; the
SFT/LoRA pair stages locals first because the truthy branch also
runs ``archive_model_dir`` / sets ``self._final_model_dir``. Executors
whose result schema already pins a default (``ok=True``,
``executor=EXECUTOR_NAME``) drop the redundant kwarg at the call site.

``Runner._write_results`` now reads ``result.children`` directly: the
caller already guarantees a ``BaseExecutorResult``, so the
``isinstance`` + ``result.get("children")`` dict fallback is dead.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
Both ARCHITECTURE.md and EXECUTORS.md pointed at
``src/shared/schemas/executor_result.py``, which was consolidated into
``src/shared/schemas/result.py`` later in the refactor. Update the
paths.

While here, drop the ``extra="allow"`` round-trip explanation and the
``SerializeAsAny`` paragraph from the EXECUTORS.md result schema
section — both are implementation justifications that already live in
``BaseExecutorResult``'s docstring / field annotations, and the
executor-author guide only needs the contract: return type, base
fields, and how artifact refs resolve.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
The four Omni result subclasses declared their singular summary field
(``image`` / ``audio``) as ``dict[str, Any]``, but each constructor
passed ``items[0]["image" | "audio"]`` — an ``ArtifactRef`` instance —
which Pydantic v2 rejects with ``Input should be a valid dictionary``.
The validation fired on the success path (whenever ``items`` was
non-empty), so every successful Omni run would have raised
``ValidationError`` at result construction.

Retype each field as ``ArtifactRef | None`` and replace the empty-dict
fallback with ``None`` so both the populated and empty-items paths
validate cleanly.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
…ecutors

Drops the inner ``task_id`` field from ``SFTResult`` and ``LoRAResult``
— ``ResultEnvelope`` already owns ``task_id`` at the envelope level, so
duplicating it inside the result is redundant — and loosens
``LoRAResult``'s formerly-required fields (``training_successful``,
``training_time_seconds``, ``output_dir``, ``checkpoints_dir``) to the
default-bearing shape ``DPOResult`` / ``PPOResult`` / ``SFTResult``
already use. The lax defaults are alive code: ``SFTExecutor``'s
torchrun-spawn early-return path constructs a partial ``SFTResult``
that omits ``checkpoints_dir`` and the error / timing fields.

Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
@kaiitunnz kaiitunnz marked this pull request as ready for review May 19, 2026 23:26
@kaiitunnz kaiitunnz requested a review from timzsu as a code owner May 19, 2026 23:26
@timzsu timzsu mentioned this pull request May 20, 2026
11 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant