From 3486da97f69d1bb0b7f37e27e123510e4674a44a Mon Sep 17 00:00:00 2001 From: Haris Mahmood Date: Tue, 12 May 2026 15:34:35 +0500 Subject: [PATCH 1/6] feat(tasks): run description tasks through contained actors Route task execution through the active actor context instead of silently falling back to a simulated actor, and add workflow-specific post-run review plumbing for recurring and triggerable description-driven tasks. --- unity/actor/code_act_actor.py | 120 ++++++++++++++++++++-- unity/common/task_execution_context.py | 23 ++++- unity/task_scheduler/active_task.py | 99 +++++++++++------- unity/task_scheduler/prompt_builders.py | 85 +++++++++++++++- unity/task_scheduler/task_scheduler.py | 129 +++++++++++++++++++++--- 5 files changed, 389 insertions(+), 67 deletions(-) diff --git a/unity/actor/code_act_actor.py b/unity/actor/code_act_actor.py index b4a32b778..f1d18de25 100644 --- a/unity/actor/code_act_actor.py +++ b/unity/actor/code_act_actor.py @@ -41,6 +41,8 @@ start_async_tool_loop, ) from unity.common.task_execution_context import ( + PostRunReviewContext, + current_post_run_review_context, TaskExecutionDelegate, current_task_execution_delegate, ) @@ -365,14 +367,16 @@ async def start_task_run( """Start one task run using this actor's CodeAct execution machinery.""" _ = images + task_guidelines = kwargs.pop("guidelines", None) return await self._actor.act( task_description, + guidelines=task_guidelines, _parent_chat_context=parent_chat_context, _clarification_up_q=clarification_up_q, _clarification_down_q=clarification_down_q, entrypoint=entrypoint, persist=False, - _reuse_actor_slot=True, + _reuse_actor_slot=entrypoint is not None, **kwargs, ) @@ -381,6 +385,11 @@ async def start_task_run( # Shared storage-review prompt sections # --------------------------------------------------------------------------- +_DEFAULT_STORAGE_REVIEW_LABEL = "Storing reusable skills" +_DEFAULT_STORAGE_REVIEW_INSTRUCTIONS = ( + "Review the trajectory and store any reusable functions and compositional guidance." +) + _STORAGE_WHAT_CAN_BE_STORED = ( "## What Can Be Stored\n\n" "Any code that executed successfully in `execute_code` during " @@ -623,6 +632,7 @@ def _build_storage_tools( actor: "CodeActActor", ask_tools: dict, completed_tool_metadata: dict | None = None, + task_entrypoint_review: dict[str, Any] | None = None, ) -> tuple[Dict[str, Callable], list[str]]: """Build the tool dict shared by both post-processing and proactive storage loops. @@ -801,6 +811,45 @@ async def resume_inner_storage(tool_name: str) -> str: tools["pause_inner_storage"] = pause_inner_storage tools["resume_inner_storage"] = resume_inner_storage + if task_entrypoint_review: + attach_entrypoint = task_entrypoint_review.get("attach_entrypoint") + metadata = dict(task_entrypoint_review.get("metadata") or {}) + task_id = metadata.get("task_id") + instance_id = metadata.get("instance_id") + task_name = metadata.get("task_name") or metadata.get("name") or "the task" + + async def attach_entrypoint_to_recurring_task( + function_id: int, + rationale: str, + ) -> str: + """Attach a stored FunctionManager entrypoint to future runs of this task. + + Use this only after you have reviewed the completed trajectory and + decided that the stored function captures a stable reusable workflow + for future scheduled or triggered instances. Leaving the task + description-driven is valid when future runs still need broad + planning or tool discovery. + """ + + if not callable(attach_entrypoint): + return "No task entrypoint attachment hook is available." + return str( + attach_entrypoint( + function_id=int(function_id), + rationale=str(rationale), + ), + ) + + attach_entrypoint_to_recurring_task.__doc__ += ( + f"\n\nCurrent task: {task_name} " + f"(task_id={task_id}, completed instance_id={instance_id}). " + "The tool only patches future non-terminal instances; it never " + "rewrites the completed run." + ) + tools["attach_entrypoint_to_recurring_task"] = ( + attach_entrypoint_to_recurring_task + ) + return tools, storage_active_lines @@ -819,6 +868,7 @@ def _start_storage_check_loop( parent_lineage: list[str] | None = None, stop_reason: str | None = None, proactive_summaries: list[str] | None = None, + post_run_review_context: PostRunReviewContext | None = None, ) -> "AsyncToolLoopHandle | None": """Start a loop that reviews a completed trajectory for reusable knowledge. @@ -837,11 +887,17 @@ def _start_storage_check_loop( gm = actor.guidance_manager if fm is None or gm is None: return None + task_entrypoint_review = ( + post_run_review_context.extensions.get("task_entrypoint_review") + if post_run_review_context is not None + else None + ) tools, storage_active_lines = _build_storage_tools( actor=actor, ask_tools=ask_tools, completed_tool_metadata=completed_tool_metadata, + task_entrypoint_review=task_entrypoint_review, ) # ── Build prompt ────────────────────────────────────────────────── @@ -926,6 +982,31 @@ def _start_storage_check_loop( "be worth storing.\n\n" ) + task_entrypoint_section = "" + if task_entrypoint_review: + metadata = dict(task_entrypoint_review.get("metadata") or {}) + metadata_json = json.dumps(metadata, indent=2, default=str) + task_entrypoint_section = ( + "## Recurring Task Entrypoint Review\n\n" + "This trajectory completed a scheduled or triggered task that had " + "no stored entrypoint when it ran. You must explicitly consider " + "whether the successful run revealed a stable reusable workflow " + "worth attaching to future task instances.\n\n" + "No-op is valid: keep the task description-driven if future runs " + "need broad planning, changing tool discovery, or open-ended " + "judgment. If the workflow can be stabilized as code, it may still " + "use focused `reason(...)` calls for bounded semantic substeps " + "such as summarization, classification, ranking, drafting, or " + "source selection.\n\n" + "If you store a FunctionManager function and decide it is stable " + "enough for future runs, call " + "`attach_entrypoint_to_recurring_task(function_id=..., " + "rationale=...)`. Do not call that tool unless the function has " + "already been persisted and you have the numeric function_id.\n\n" + "Task metadata:\n" + f"```json\n{metadata_json}\n```\n\n" + ) + system_prompt = ( "You are a skill librarian. A CodeActActor has just completed a task. " "Your job is to review the execution trajectory and decide whether " @@ -936,6 +1017,7 @@ def _start_storage_check_loop( "## Final Result\n\n" f"{original_result}\n\n" f"{stop_context_section}" + f"{task_entrypoint_section}" f"{inner_storage_section}" f"{proactive_storage_section}" f"{_STORAGE_WHAT_CAN_BE_STORED}" @@ -1095,9 +1177,11 @@ def __init__( *, inner: "AsyncToolLoopHandle", actor: "CodeActActor", + post_run_review_context: PostRunReviewContext | None = None, ) -> None: self._inner = inner self._actor = actor + self._post_run_review_context = post_run_review_context self._notification_q: asyncio.Queue[dict] = asyncio.Queue() self._task_done_event = asyncio.Event() self._completion_event = asyncio.Event() @@ -1167,9 +1251,11 @@ async def _run_lifecycle(self) -> None: try: self._original_result = await self._inner.result() + task_succeeded = not self._stopped except asyncio.CancelledError: raise except Exception as exc: + task_succeeded = False self._original_result = ( f"Error: inner task failed: {type(exc).__name__}: {exc}" ) @@ -1230,14 +1316,27 @@ async def _run_lifecycle(self) -> None: _sc_suffix_token = _PENDING_LOOP_SUFFIX.set(_sc_suffix) try: + active_review_context = ( + self._post_run_review_context if task_succeeded else None + ) + review_display_label = ( + active_review_context.display_label + if active_review_context is not None + else _DEFAULT_STORAGE_REVIEW_LABEL + ) + review_instructions = ( + active_review_context.instructions + if active_review_context is not None + else _DEFAULT_STORAGE_REVIEW_INSTRUCTIONS + ) await publish_manager_method_event( _sc_call_id, "CodeActActor", "StorageCheck", phase="incoming", - display_label="Storing reusable skills", + display_label=review_display_label, hierarchy=_sc_hierarchy, - instructions="Review the trajectory and store any reusable functions and compositional guidance.", + instructions=review_instructions, ) proactive_summaries: list[str] = [] @@ -1259,6 +1358,7 @@ async def _run_lifecycle(self) -> None: parent_lineage=_sc_parent_lineage, stop_reason=self._stop_reason, proactive_summaries=proactive_summaries or None, + post_run_review_context=active_review_context, ) if storage_handle is None: @@ -1267,7 +1367,7 @@ async def _run_lifecycle(self) -> None: "CodeActActor", "StorageCheck", phase="outgoing", - display_label="Storing reusable skills", + display_label=review_display_label, hierarchy=_sc_hierarchy, ) else: @@ -1284,7 +1384,7 @@ async def _run_lifecycle(self) -> None: "CodeActActor", "StorageCheck", phase="outgoing", - display_label="Storing reusable skills", + display_label=review_display_label, hierarchy=_sc_hierarchy, ) finally: @@ -4063,9 +4163,15 @@ async def _resume_with_propagation(**kwargs: Any) -> None: # Update agent context with handle reference new_ctx.handle = handle + post_run_review_context = current_post_run_review_context.get() + # Wrap in StorageCheckHandle for post-completion function review. - if effective_can_store: - handle = _StorageCheckHandle(inner=handle, actor=self) + if effective_can_store or post_run_review_context is not None: + handle = _StorageCheckHandle( + inner=handle, + actor=self, + post_run_review_context=post_run_review_context, + ) return handle diff --git a/unity/common/task_execution_context.py b/unity/common/task_execution_context.py index 78ae1bfd9..4c75f51ab 100644 --- a/unity/common/task_execution_context.py +++ b/unity/common/task_execution_context.py @@ -23,7 +23,8 @@ Here, it enables run-scoped delegation: - **Run-scoped**: a delegate is set at the top of an async execution context and - reset in a `finally` block. + reset in a `finally` block. The delegate owns how the task run is contained + inside that environment, such as starting a child actor run for one task. - **Async-safe**: `ContextVar` propagation ensures each async task tree sees the correct delegate under concurrency. - **No leakage**: callers must reset to prevent delegates persisting across runs. @@ -50,6 +51,7 @@ from __future__ import annotations import asyncio +from dataclasses import dataclass, field from contextvars import ContextVar from typing import Any, Optional, Protocol, TYPE_CHECKING, runtime_checkable @@ -74,8 +76,8 @@ class TaskExecutionDelegate(Protocol): Usage ----- This protocol is used by task execution routing to run tasks through the - same execution environment that initiated the task, rather than spawning a - new one. + execution environment that initiated the task while preserving one task run + per returned handle. """ async def start_task_run( @@ -125,3 +127,18 @@ async def start_task_run( "current_task_execution_delegate", default=None, ) + + +@dataclass(frozen=True) +class PostRunReviewContext: + """Run-scoped metadata for an optional post-completion storage review.""" + + display_label: str + instructions: str + extensions: dict[str, Any] = field(default_factory=dict) + + +current_post_run_review_context: ContextVar[PostRunReviewContext | None] = ContextVar( + "current_post_run_review_context", + default=None, +) diff --git a/unity/task_scheduler/active_task.py b/unity/task_scheduler/active_task.py index 1349b673b..16e4276c4 100644 --- a/unity/task_scheduler/active_task.py +++ b/unity/task_scheduler/active_task.py @@ -18,7 +18,11 @@ from .base import BaseActiveTask from ..actor.base import BaseActor from unity.common.async_tool_loop import SteerableToolHandle -from unity.common.task_execution_context import current_task_execution_delegate +from unity.common.task_execution_context import ( + PostRunReviewContext, + current_post_run_review_context, + current_task_execution_delegate, +) from unity.common._async_tool.messages import forward_handle_call from .machine_state import ( TaskRunProvenance, @@ -167,6 +171,8 @@ async def create( entrypoint: Optional[int] = None, task_run_reference: Optional[TaskRunReference] = None, task_run_provenance: Optional[TaskRunProvenance] = None, + task_entrypoint_review: Optional[dict[str, Any]] = None, + task_guidelines: Optional[str] = None, ) -> "ActiveTask": """ Create an ActiveTask by starting work through a delegate or fallback actor. @@ -177,44 +183,63 @@ async def create( because execution is routed through the delegate instead. """ delegate = current_task_execution_delegate.get() + review_token = None + if task_entrypoint_review is not None: + review_token = current_post_run_review_context.set( + PostRunReviewContext( + display_label="Storing reusable workflow", + instructions=( + "Review the successful task trajectory and decide whether " + "a stable reusable workflow should be stored and attached " + "to future scheduled or triggered task instances." + ), + extensions={"task_entrypoint_review": task_entrypoint_review}, + ), + ) try: - if delegate is not None: - actor_steerable_handle = await delegate.start_task_run( - task_description=task_description, - entrypoint=entrypoint, - parent_chat_context=_parent_chat_context, - clarification_up_q=_clarification_up_q, - clarification_down_q=_clarification_down_q, - ) - else: - if fallback_actor is None: - raise RuntimeError( - "Task execution requires an actor when no run-scoped delegate is active.", + try: + if delegate is not None: + actor_steerable_handle = await delegate.start_task_run( + task_description=task_description, + entrypoint=entrypoint, + parent_chat_context=_parent_chat_context, + clarification_up_q=_clarification_up_q, + clarification_down_q=_clarification_down_q, + guidelines=task_guidelines, ) - actor_steerable_handle = await fallback_actor.act( - task_description, - _parent_chat_context=_parent_chat_context, - _clarification_up_q=_clarification_up_q, - _clarification_down_q=_clarification_down_q, - # Always pass entrypoint to the actor so it can immediately run the function - entrypoint=entrypoint, - persist=False, # Scheduler-run plans should complete instead of pausing for interjection - ) - except Exception as exc: - if task_run_reference is not None: - await asyncio.to_thread( - update_task_run_record, - task_run_reference, - { - "state": "failed", - "completed_at": _now_iso(), - "error": _truncate_task_run_text(str(exc)), - "result_summary": _truncate_task_run_text( - f"Task failed before execution fully started: {type(exc).__name__}({exc})", - ), - }, - ) - raise + else: + if fallback_actor is None: + raise RuntimeError( + "Task execution requires an actor when no run-scoped delegate is active.", + ) + actor_steerable_handle = await fallback_actor.act( + task_description, + guidelines=task_guidelines, + _parent_chat_context=_parent_chat_context, + _clarification_up_q=_clarification_up_q, + _clarification_down_q=_clarification_down_q, + # Always pass entrypoint to the actor so it can immediately run the function + entrypoint=entrypoint, + persist=False, # Scheduler-run plans should complete instead of pausing for interjection + ) + except Exception as exc: + if task_run_reference is not None: + await asyncio.to_thread( + update_task_run_record, + task_run_reference, + { + "state": "failed", + "completed_at": _now_iso(), + "error": _truncate_task_run_text(str(exc)), + "result_summary": _truncate_task_run_text( + f"Task failed before execution fully started: {type(exc).__name__}({exc})", + ), + }, + ) + raise + finally: + if review_token is not None: + current_post_run_review_context.reset(review_token) materialized_task_run_reference = task_run_reference if materialized_task_run_reference is None and task_run_provenance is not None: try: diff --git a/unity/task_scheduler/prompt_builders.py b/unity/task_scheduler/prompt_builders.py index 780d70341..6c8fd6430 100644 --- a/unity/task_scheduler/prompt_builders.py +++ b/unity/task_scheduler/prompt_builders.py @@ -8,17 +8,15 @@ from __future__ import annotations +import json from typing import Dict, Callable, Union, List from .types.task import Task +from .types.activated_by import ActivatedBy from ..common.prompt_helpers import ( - clarification_guidance, - sig_dict, - now, tool_name, require_tools, get_custom_columns, - # New standardized composer utilities PromptSpec, PromptParts, compose_system_prompt, @@ -33,6 +31,69 @@ # ───────────────────────────────────────────────────────────────────────────── +def build_task_execution_request(task: Task) -> str: + """Build the actor-facing request for one task instance.""" + + lines = [ + "Execute this TaskScheduler task as a contained task run.", + "", + f"Task id: {task.task_id}", + f"Instance id: {task.instance_id}", + f"Task name: {task.name}", + "", + "Task description:", + task.description or task.name, + ] + if task.response_policy: + lines.extend(["", "Task response policy:", task.response_policy]) + if task.schedule is not None: + lines.extend( + [ + "", + "Schedule metadata:", + json.dumps(task.schedule.model_dump(mode="json"), default=str), + ], + ) + if task.trigger is not None: + lines.extend( + [ + "", + "Trigger metadata:", + json.dumps(task.trigger.model_dump(mode="json"), default=str), + ], + ) + if task.repeat is not None: + lines.extend( + [ + "", + "Repeat metadata:", + json.dumps( + [r.model_dump(mode="json") for r in task.repeat], + default=str, + ), + ], + ) + return "\n".join(lines) + + +def build_task_run_guidelines(task: Task, reason: ActivatedBy) -> str: + """Build execution guidelines for a contained actor task run.""" + + return ( + "You are executing exactly one TaskScheduler task. Treat the task " + "name, description, schedule, trigger, repeat, and response policy " + "as the authoritative instruction for this run. Complete the task " + "itself; do not create another task unless the task description " + "explicitly asks you to create or modify tasks. If this task has no " + "stored entrypoint, interpret the natural-language description " + "directly using the available primitives and functions. Keep any " + "progress notifications focused on this task run.\n\n" + f"Activation reason: {reason.value}\n" + f"Task id: {task.task_id}\n" + f"Instance id: {task.instance_id}" + ) + + def build_ask_prompt( tools: Dict[str, Callable], num_tasks: int, @@ -360,6 +421,21 @@ def build_update_prompt( usage_examples_lines.extend( [ + "", + "Recurring and triggered workflows", + "---------------------------------", + '• For requests like "do this every Monday" or "send this report daily", create a live scheduled task with `schedule.start_at` for the first run and `repeat` for the cadence.', + "• For requests like \"whenever Alice emails about invoices\", create a live triggerable task with `trigger` and status 'triggerable'. Use contact lookup first when the trigger references a person.", + "• A scheduled/triggered live task may have `entrypoint=None`. This is the normal default for newly described natural-language workflows: execution will wake a contained actor run that interprets the description.", + "• Do not create an entrypoint function merely because a recurring task is being created. Entrypoint creation should follow an explicit user request or a successful run that has been reviewed as stable enough to store.", + "• If the user asks to repeat a workflow that just succeeded interactively and also wants hidden/offline execution, the workflow must first be stored as a function-backed skill; offline tasks require a numeric `entrypoint`.", + "• A stored entrypoint can still call `reason(...)` for bounded semantic judgment such as summarization, classification, ranking, drafting, or source selection. Keep broad planning and changing tool discovery actor-driven.", + "", + "Repeat field examples", + "---------------------", + "• Daily at a fixed time: set `schedule.start_at` to the first due datetime and `repeat=[{'frequency':'daily','interval':1}]`.", + "• Weekly on Monday at 12:00 UTC: set first `schedule.start_at` to the next Monday 12:00 UTC and `repeat=[{'frequency':'weekly','interval':1,'weekdays':['MO'],'time_of_day':'12:00'}]`.", + "• End after N runs: include `count`. End after a date: include `until`.", "", "Schedule/Queue invariants (must-follow)", "---------------------------------------", @@ -385,6 +461,7 @@ def build_update_prompt( "Triggers vs Schedules", "----------------------", f"• A task with a `trigger` must be in state 'triggerable'. Use `{update_task_fname}(task_id=, trigger=...)` to add/remove triggers. Do not set `start_at` on trigger‑based tasks.", + "• `schedule` and `trigger` are mutually exclusive. Use `repeat` with `schedule` for cadence-based tasks; use `trigger` for inbound-event tasks.", ], ) diff --git a/unity/task_scheduler/task_scheduler.py b/unity/task_scheduler/task_scheduler.py index caf816d03..0c9c7b9ec 100644 --- a/unity/task_scheduler/task_scheduler.py +++ b/unity/task_scheduler/task_scheduler.py @@ -19,7 +19,6 @@ from typing import Literal, overload from pydantic import BaseModel from dataclasses import dataclass -from functools import cached_property from ..settings import SETTINGS from ..common.embed_utils import ensure_vector_column @@ -64,11 +63,12 @@ from ..common.model_to_fields import model_to_fields from .prompt_builders import ( build_ask_prompt, + build_task_execution_request, + build_task_run_guidelines, build_update_prompt, ) from .base import BaseTaskScheduler from ..actor.base import BaseActor -from ..actor.simulated import SimulatedActor from .active_task import ActiveTask from .active_queue import ActiveQueue @@ -155,8 +155,9 @@ def __init__( Parameters ---------- actor : BaseActor | None, default ``None`` - Actor used to execute the steps of an active task. When ``None``, a - ``SimulatedActor(duration=20)`` is used. + Explicit fallback actor used for direct scheduler execution when no + run-scoped execution delegate is active. When ``None``, direct + execution fails loudly instead of creating an implicit actor. rolling_summary_in_prompts : bool, default ``True`` Whether to inject the rolling activity summary into system prompts sent to the LLM. @@ -296,18 +297,105 @@ def __init__( # this cache remains coherent without extra backend reads between tool calls. self._num_tasks_cached: Optional[int] = None - @cached_property - def _actor(self) -> BaseActor: - if self.__actor is None: - self.__actor = SimulatedActor(duration=SETTINGS.task.SIM_ACTOR_DURATION) - return self.__actor - def _actor_for_task_run(self) -> BaseActor | None: """Return the fallback actor only when task execution is not delegated.""" if current_task_execution_delegate.get() is not None: return None - return self._actor + return self.__actor + + def _build_task_entrypoint_review( + self, + *, + task: Task, + reason: ActivatedBy, + ) -> dict[str, Any] | None: + """Return post-run entrypoint review context for description-driven tasks.""" + + if task.entrypoint is not None: + return None + if task.repeat is None and task.trigger is None: + return None + + metadata: dict[str, Any] = { + "task_id": task.task_id, + "instance_id": task.instance_id, + "task_name": task.name, + "task_description": task.description, + "activation_reason": reason.value, + "response_policy": task.response_policy, + "schedule": ( + task.schedule.model_dump(mode="json") + if task.schedule is not None + else None + ), + "trigger": ( + task.trigger.model_dump(mode="json") + if task.trigger is not None + else None + ), + "repeat": ( + [pattern.model_dump(mode="json") for pattern in task.repeat] + if task.repeat is not None + else None + ), + } + + def _attach_entrypoint(*, function_id: int, rationale: str) -> dict[str, Any]: + return self._attach_entrypoint_to_future_instances( + task_id=task.task_id, + completed_instance_id=task.instance_id, + function_id=function_id, + rationale=rationale, + ) + + return { + "metadata": metadata, + "attach_entrypoint": _attach_entrypoint, + } + + def _attach_entrypoint_to_future_instances( + self, + *, + task_id: int, + completed_instance_id: int, + function_id: int, + rationale: str, + ) -> dict[str, Any]: + """Attach an entrypoint to future non-terminal instances of a logical task.""" + + if function_id < 0: + raise ValueError("function_id must be a non-negative integer.") + future_logs = self._view.get_rows( + filter=( + f"task_id == {task_id} and instance_id > {completed_instance_id} " + "and entrypoint is None and status not in ('completed','cancelled','failed','active')" + ), + return_ids_only=False, + ) + if not future_logs: + return { + "outcome": "no_future_instances", + "task_id": task_id, + "completed_instance_id": completed_instance_id, + "function_id": function_id, + "rationale": rationale, + } + + log_ids = [log.id for log in future_logs] + self._write_log_entries( + logs=log_ids, + entries={"entrypoint": int(function_id)}, + ) + return { + "outcome": "attached", + "task_id": task_id, + "patched_instance_ids": [ + log.entries.get("instance_id") for log in future_logs + ], + "function_id": int(function_id), + "rationale": rationale, + } # ------------------------------ Provisioning ----------------------------- # def warm_embeddings(self) -> None: @@ -746,6 +834,13 @@ async def _execute_internal( else: reason = ActivatedBy.explicit + fallback_actor = self._actor_for_task_run() + if fallback_actor is None and current_task_execution_delegate.get() is None: + raise RuntimeError( + "TaskScheduler.execute requires a run-scoped actor delegate or an explicit actor. " + "Description-driven tasks should be executed from Actor.act via primitives.tasks.execute(...).", + ) + task_run_source_type = ( "triggered" if trigger_attempt_token @@ -770,12 +865,9 @@ async def _execute_internal( unlink_from_prev=unlink_from_prev, ) - # Start task execution (delegated to the current execution environment when available) - # and wrap the resulting handle for Tasks-table synchronization. - handle = await ActiveTask.create( - self._actor_for_task_run(), - task_description=task.description or task.name, + fallback_actor, + task_description=build_task_execution_request(task), _parent_chat_context=parent_chat_context, _clarification_up_q=clarification_up_q, _clarification_down_q=clarification_down_q, @@ -784,6 +876,11 @@ async def _execute_internal( scheduler=self, entrypoint=task.entrypoint, task_run_provenance=task_run_provenance, + task_entrypoint_review=self._build_task_entrypoint_review( + task=task, + reason=reason, + ), + task_guidelines=build_task_run_guidelines(task, reason), ) self._active_task = TaskScheduler.ActivePointer( From 079d7c23aa31c9277b515d9c6407a5f95f61b668 Mon Sep 17 00:00:00 2001 From: Haris Mahmood Date: Tue, 12 May 2026 15:34:39 +0500 Subject: [PATCH 2/6] fix(tasks): preserve schedule as generic dict storage Mark schedule payloads with explicit dict typing so queue linkage and datetime schedules can coexist without backend type inference conflicts. --- unity/task_scheduler/storage.py | 27 ++++++++++++++++++++++++--- unity/task_scheduler/types/task.py | 12 ++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/unity/task_scheduler/storage.py b/unity/task_scheduler/storage.py index c708b5ce1..06a7b8b5b 100644 --- a/unity/task_scheduler/storage.py +++ b/unity/task_scheduler/storage.py @@ -357,6 +357,22 @@ def _norm(v: Any) -> Any: return [TasksStore._norm(x) for x in v] return v + @staticmethod + def _with_explicit_task_types(entries: Any) -> Any: + if isinstance(entries, list): + return [TasksStore._with_explicit_task_types(entry) for entry in entries] + if not isinstance(entries, dict): + return entries + if entries.get("schedule") is None: + return entries + out = dict(entries) + explicit_types = dict(out.get("explicit_types") or {}) + schedule_types = dict(explicit_types.get("schedule") or {}) + schedule_types["type"] = "dict" + explicit_types["schedule"] = schedule_types + out["explicit_types"] = explicit_types + return out + # ------------------------------- Writes -------------------------------- def update( self, @@ -392,7 +408,9 @@ def _strip_nones(value: Any, *, top_level: bool) -> Any: ] return value - norm_entries = _strip_nones(TasksStore._norm(entries), top_level=True) + norm_entries = TasksStore._with_explicit_task_types( + _strip_nones(TasksStore._norm(entries), top_level=True), + ) return unify.update_logs( logs=logs, context=self._ctx, @@ -401,7 +419,7 @@ def _strip_nones(value: Any, *, top_level: bool) -> Any: ) def log(self, *, entries: Dict[str, Any], new: bool = True) -> unify.Log: - norm_entries = TasksStore._norm(entries) + norm_entries = TasksStore._with_explicit_task_types(TasksStore._norm(entries)) # Create with expanded fields so auto-counting applies when ids are omitted return unity_log( project=self._project, @@ -420,7 +438,10 @@ def create_many(self, *, entries_list: List[Dict[str, Any]]) -> Dict[str, Any]: with auto-incremented row identifiers. """ - normalised = [{**TasksStore._norm(e)} for e in entries_list] + normalised = [ + TasksStore._with_explicit_task_types({**TasksStore._norm(e)}) + for e in entries_list + ] try: return unity_create_logs( project=self._project, diff --git a/unity/task_scheduler/types/task.py b/unity/task_scheduler/types/task.py index 2f3ae42ba..a8708d633 100644 --- a/unity/task_scheduler/types/task.py +++ b/unity/task_scheduler/types/task.py @@ -30,6 +30,7 @@ class TaskBase(BaseModel): schedule: Optional[Schedule] = Field( default=None, description="Information about task scheduling, including adjacent tasks in the queue and ideal start time", + json_schema_extra={"unify_type": "dict"}, ) trigger: Optional[Trigger] = Field( default=None, @@ -41,7 +42,11 @@ class TaskBase(BaseModel): ) repeat: Optional[List[RepeatPattern]] = Field( default=None, - description="Pattern defining how the task recurs over time", + description=( + "Pattern defining how the task recurs over time. Recurring live tasks " + "may begin with entrypoint=null and execute from the natural-language " + "description until a post-run review stores a stable function." + ), ) priority: Priority = Field( description="Importance level of the task (low, normal, high, urgent)", @@ -58,7 +63,10 @@ class TaskBase(BaseModel): default=None, description=( "Optional function_id from the Functions table that should be invoked to perform this task. " - "When null, the task is executed by an Actor interpreting the free-form description on the fly." + "When null, a live task is executed by a contained Actor run interpreting the free-form " + "description on the fly. Do not set this for a newly described workflow unless the user " + "explicitly asks for a stored function-backed workflow or a successful execution has been " + "reviewed and distilled into a stable function." ), ) offline: bool = Field( From a5de8004c2a2bbec20ae16366eca33f3299d15d3 Mon Sep 17 00:00:00 2001 From: Haris Mahmood Date: Tue, 12 May 2026 15:34:44 +0500 Subject: [PATCH 3/6] docs(tasks): clarify live recurring workflow semantics Teach actor and scheduler prompts that new scheduled or triggered workflows should usually remain live and description-driven unless a stored function is explicitly requested or later distilled. --- unity/actor/prompt_builders.py | 31 +++++++++++++++++++++++++++++++ unity/actor/prompt_examples.py | 33 +++++++++++++++++++++++++++++++++ unity/task_scheduler/README.md | 24 ++++++++++++++++++++++++ unity/task_scheduler/base.py | 29 ++++++++++++++++++++++++----- 4 files changed, 112 insertions(+), 5 deletions(-) diff --git a/unity/actor/prompt_builders.py b/unity/actor/prompt_builders.py index a6d59f121..226ac6b3c 100644 --- a/unity/actor/prompt_builders.py +++ b/unity/actor/prompt_builders.py @@ -464,6 +464,34 @@ straight to `compress_context`. """).strip() +_TASK_SCHEDULING_WORKFLOWS = textwrap.dedent(""" + ### Durable Scheduled And Triggered Workflows + + When the user asks for work to happen later, repeatedly, or in response to + future inbound events, represent that durable intent with the task + primitives rather than only doing the work once. + + Use `primitives.tasks.update(...)` for requests like: + - "Repeat this every Monday at 12:00 UTC" + - "Send me this report every day" + - "Whenever Alice emails about invoices, summarize it and draft a reply" + - "Turn what we just did into a recurring workflow" + + Natural-language recurring tasks should normally start as live + description-driven tasks with `entrypoint=None`. The future due wake will + call `primitives.tasks.execute(task_id=...)`; execution then runs a + contained child actor dedicated to that task. Do not write and attach an + untested entrypoint function at task creation unless the user explicitly + requested a stored function-backed workflow. + + If a workflow has just been completed interactively and the user wants it + repeated, include the relevant context in the task description. Use + `store_skills` or direct FunctionManager writes only when the user asks to + store the workflow, or when the completed trajectory clearly reveals a + reusable function worth saving. Offline tasks require a stored entrypoint; + description-only recurring work should remain live. +""").strip() + _EXTERNAL_APP_INTEGRATION = textwrap.dedent(""" ### External App Integration @@ -745,12 +773,15 @@ def _build_code_act_rules_and_examples( _has_computer = any( k.startswith("primitives.computer.") for k in env.get_tools() ) + _has_tasks = any(k.startswith("primitives.tasks.") for k in env.get_tools()) _has_state = any( k.startswith("primitives.") and not k.startswith("primitives.computer.") and not k.startswith("primitives.actor.") for k in env.get_tools() ) + if _has_tasks: + parts.append(_TASK_SCHEDULING_WORKFLOWS) if _has_computer and _has_state: from unity.actor.prompt_examples import get_mixed_examples diff --git a/unity/actor/prompt_examples.py b/unity/actor/prompt_examples.py index b8782531f..c8475baf6 100644 --- a/unity/actor/prompt_examples.py +++ b/unity/actor/prompt_examples.py @@ -901,6 +901,38 @@ class TaskIdResult(BaseModel): ''' +def get_primitives_task_recurring_creation_example() -> str: + """Example: creating durable scheduled and triggered tasks.""" + + return """ +# Example: durable recurring and triggered workflow creation +async def create_description_driven_recurring_tasks() -> str: + # User: "Every Monday at 12:00 UTC, research AI/agentic AI work from + # the last week and email me a summary document." + scheduled = await primitives.tasks.update( + "Create a live scheduled recurring task. Name: Weekly AI research report. " + "Description: Every Monday at 12:00 UTC, research important AI and agentic AI " + "work from the previous week, summarize the most important developments, " + "create a concise document, and email it to me. Set the first start_at to " + "the next Monday 12:00 UTC and repeat weekly on Monday at 12:00 UTC. " + "Leave entrypoint as null unless there is already a proven stored function. " + "Do not mark it offline." + ) + scheduled_result = await scheduled.result() + + # User: "Whenever Alice emails about invoices, summarize it and draft a reply." + triggered = await primitives.tasks.update( + "Create a live triggerable task. Name: Alice invoice email follow-up. " + "Description: When Alice emails about invoices, summarize the inbound email, " + "identify what action is needed, and draft a reply for review. Resolve Alice " + "to the right contact id before setting trigger filters. Leave entrypoint as " + "null; this should wake a live actor to interpret the description." + ) + triggered_result = await triggered.result() + return f"{scheduled_result}\\n{triggered_result}" +""" + + def get_primitives_dynamic_methods_example() -> str: """Example: using dynamic handle methods.""" @@ -1929,6 +1961,7 @@ def get_example_function_map() -> dict[str, callable]: "get_primitives_contact_update_example": get_primitives_contact_update_example, # Tasks "get_primitives_task_execute_example": get_primitives_task_execute_example, + "get_primitives_task_recurring_creation_example": get_primitives_task_recurring_creation_example, "get_primitives_dynamic_methods_example": get_primitives_dynamic_methods_example, # Knowledge "get_primitives_knowledge_ask_example": get_primitives_cross_manager_example, diff --git a/unity/task_scheduler/README.md b/unity/task_scheduler/README.md index 4a93d3a24..5cb72489e 100644 --- a/unity/task_scheduler/README.md +++ b/unity/task_scheduler/README.md @@ -72,6 +72,23 @@ This package manages the creation, scheduling, execution, and re‑ordering of t 3) Execute (run now) - Guards single‑active. If given a numeric id, can run in isolation (detach, followers keep schedule) or as a chain (preserve links). This path does not use an async LLM tool loop or an execute system prompt; it returns an `ActiveQueue` handle (direct delegation for isolated/single‑task). +4) Scheduled activation + - User-authored scheduled task rows are projected by Orchestra into machine-facing activation rows. + - Communication materializes scheduled live activations as Cloud Tasks targeting the adapters `/scheduled/tasks/due` endpoint. + - The live wake reason is delivered to ConversationManager, which asks the slow brain to start with `primitives.tasks.execute(task_id=...)`. + - Cloud Scheduler is used for platform maintenance jobs; per-task cadence is delivered by dynamic Cloud Tasks. + +5) Trigger activation + - Trigger definitions are projected into activation rows and mechanically matched by medium/contact filters when inbound communication events arrive. + - Live trigger candidates are surfaced to the slow brain, which performs semantic acceptance and calls `primitives.tasks.execute(task_id=..., trigger_attempt_token=...)` so the run adopts the exact inbound provenance. + - Recurring triggerable tasks clone a future triggerable instance before the current instance is marked active. + +6) Offline activation + - Offline means the hidden headless lane: the live ConversationManager and main actor are not woken. + - Offline scheduled activations use Cloud Tasks targeting Communication's offline-dispatch endpoint, which creates a short-lived Unity Kubernetes job. + - The job runs `offline_runner.py`, which executes exactly one stored FunctionManager entrypoint through `SingleFunctionActor(headless=True)`. + - Offline tasks require an entrypoint. Description-only tasks should remain live unless a later successful run is distilled into a stored function. + ### Queue/schedule invariants (enforced centrally) @@ -100,6 +117,13 @@ This package manages the creation, scheduling, execution, and re‑ordering of t - `ActiveTask`: internal steerable handle for a single running task; mirrors status and clears the scheduler’s active pointer when done. - `ActiveQueue`: public execution handle that sequences tasks using persisted `next_task` links, supports interjection routing across the queue, and provides a completion summary. Uses direct delegation when the queue is a singleton/isolated. +### Entrypoints and description-driven execution + +- `entrypoint` is optional for live tasks. When it is null, execution is actor-driven: a contained child actor run interprets the task name, description, schedule/trigger metadata, repeat pattern, and response policy. +- `entrypoint` is required for offline tasks because the headless lane executes one stored function without booting the live assistant runtime. +- Direct `TaskScheduler.execute(...)` needs either a run-scoped actor delegate or an explicitly configured actor. A production live wake normally reaches execution through `Actor.act` and `primitives.tasks.execute(...)`; tests can still inject a simulated actor explicitly. +- After a successful recurring or triggerable description-driven run, the actor always runs a storage review that considers whether the observed trajectory is stable enough to store as a function. The write is conditional: if future runs still need broad planning or tool discovery, the task remains description-driven. Stored functions may still use focused `reason(...)` calls for bounded judgment. + ### Clarification and contacts diff --git a/unity/task_scheduler/base.py b/unity/task_scheduler/base.py index 9ef4a76ba..cbb6de648 100644 --- a/unity/task_scheduler/base.py +++ b/unity/task_scheduler/base.py @@ -246,6 +246,24 @@ async def update( If the task is to be started *immediately*, then just put the current datetime as the `start_at`, and omit the deadline if one is not specified. + Entrypoints, live tasks, and offline tasks + ----------------------------------------- + A live scheduled or triggered task may start with ``entrypoint=None``. + In that case, execution wakes a contained actor run that interprets the + task's natural-language name/description and metadata. This is the + normal default for newly described recurring workflows. + + Offline tasks run in the hidden headless lane and must have a numeric + entrypoint before ``offline=True`` is set. Do not create a description-only + offline task. + + Do not create an entrypoint function merely because a new recurring task + was described. Entrypoint persistence should follow an explicit user + request or a successful execution reviewed as stable enough to store. + Stored functions may still use focused ``reason(...)`` calls for bounded + semantic judgment, but open-ended planning/tool discovery should remain + actor-driven. + All parameters mirror :pymeth:`ask`; refer there for detailed semantics. """ @@ -290,11 +308,12 @@ async def execute( Execution delegation -------------------- - When a run-scoped execution environment is available, task execution may be - delegated to that environment to maintain context continuity. Otherwise, - execution proceeds through the scheduler's configured execution strategy. - In both cases, a live steerable handle is returned that supports the full - steering interface. + When a run-scoped execution environment is available, task execution is + delegated through that environment while keeping one returned handle per + task run. A task without an entrypoint is executed by a contained actor + run dedicated to that task. Without a run-scoped delegate, direct + execution requires an explicitly configured actor; otherwise execution + fails loudly instead of silently using a simulated fallback. Returns ------- From 89abdc38200a59bd8b88b35e771a070f327d69f9 Mon Sep 17 00:00:00 2001 From: Haris Mahmood Date: Tue, 12 May 2026 15:34:48 +0500 Subject: [PATCH 4/6] test(actor): cover task workflow execution guidance Verify child actor slot selection, reusable workflow review labeling, and real actor creation of live recurring and triggerable tasks with null entrypoints. --- tests/actor/code_act/test_prompt_builders.py | 38 ++++++++++ tests/actor/code_act/test_storage_on_stop.py | 71 +++++++++++++++++ .../code_act/test_task_execution_delegate.py | 57 +++++++++++++- .../tasks/test_recurring_creation_code_act.py | 76 +++++++++++++++++++ 4 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 tests/actor/state_managers/real/tasks/test_recurring_creation_code_act.py diff --git a/tests/actor/code_act/test_prompt_builders.py b/tests/actor/code_act/test_prompt_builders.py index 13401f9bb..1552b785f 100644 --- a/tests/actor/code_act/test_prompt_builders.py +++ b/tests/actor/code_act/test_prompt_builders.py @@ -32,6 +32,17 @@ def get_tools(self) -> dict: return {} +class _DummyToolEnv(_DummyEnv): + """Minimal environment stub with configurable tool names.""" + + def __init__(self, prompt_context: str, tools: dict[str, Any]): + super().__init__(prompt_context) + self._tools = tools + + def get_tools(self) -> dict: + return self._tools + + def _real_envs_mixed() -> Mapping[str, Any]: """Real environments that produce self-contained prompt context.""" from unity.function_manager.primitives import ComputerPrimitives @@ -111,6 +122,33 @@ def test_code_act_prompt_includes_diverse_examples_sessions_computer_primitives_ assert "execute_function vs execute_code decision" in prompt +@pytest.mark.timeout(30) +def test_code_act_prompt_includes_task_workflow_guidance_only_with_task_primitives(): + prompt_with_tasks = build_code_act_prompt( + environments={ + "primitives": _DummyToolEnv( + "Task primitives are available.", + {"primitives.tasks.update": object()}, + ), + }, + tools={}, + ) + prompt_without_tasks = build_code_act_prompt( + environments={ + "primitives": _DummyToolEnv( + "Only contact primitives are available.", + {"primitives.contacts.ask": object()}, + ), + }, + tools={}, + ) + + assert "Durable Scheduled And Triggered Workflows" in prompt_with_tasks + assert "`entrypoint=None`" in prompt_with_tasks + assert "primitives.tasks.execute(task_id=...)" in prompt_with_tasks + assert "Durable Scheduled And Triggered Workflows" not in prompt_without_tasks + + @pytest.mark.timeout(30) def test_code_act_prompt_teaches_refresh_token_oauth_helper(): actor = CodeActActor() diff --git a/tests/actor/code_act/test_storage_on_stop.py b/tests/actor/code_act/test_storage_on_stop.py index 725bc06fd..7121d05d5 100644 --- a/tests/actor/code_act/test_storage_on_stop.py +++ b/tests/actor/code_act/test_storage_on_stop.py @@ -19,6 +19,7 @@ import pytest from unity.actor.code_act_actor import CodeActActor, _StorageCheckHandle +from unity.common.task_execution_context import PostRunReviewContext # --------------------------------------------------------------------------- # Symbolic: _StorageCheckHandle runs Phase 2 after stop @@ -230,6 +231,76 @@ async def _stop(**kwargs): ), f"StorageCheck incoming event must have a non-null instructions kwarg, got {instructions!r}" +@pytest.mark.asyncio +@pytest.mark.timeout(30) +async def test_task_entrypoint_review_uses_reusable_workflow_event_label(): + result_future: asyncio.Future[str] = asyncio.get_event_loop().create_future() + + inner = MagicMock() + + async def _await_result(): + return await result_future + + inner.result = _await_result + inner.next_notification = AsyncMock(side_effect=lambda: asyncio.Event().wait()) + + async def _stop(**kwargs): + if not result_future.done(): + result_future.set_result("done") + + inner.stop = AsyncMock(side_effect=_stop) + inner._client = MagicMock(messages=[{"role": "user", "content": "do task"}]) + + mock_task = MagicMock() + mock_task.get_ask_tools = MagicMock(return_value={}) + mock_task.get_completed_tool_metadata = MagicMock(return_value={}) + inner._task = mock_task + + actor = MagicMock() + actor.function_manager = None + actor.guidance_manager = None + + review_context = PostRunReviewContext( + display_label="Storing reusable workflow", + instructions="Review the completed recurring workflow.", + extensions={"task_entrypoint_review": {"metadata": {"task_id": 1}}}, + ) + + with ( + patch("unity.actor.code_act_actor._start_storage_check_loop") as mock_loop, + patch( + "unity.actor.code_act_actor.publish_manager_method_event", + new_callable=AsyncMock, + ) as mock_publish, + ): + mock_loop.return_value = None + handle = _StorageCheckHandle( + inner=inner, + actor=actor, + post_run_review_context=review_context, + ) + + result_future.set_result("done") + + deadline = asyncio.get_event_loop().time() + 10 + while not handle.done(): + if asyncio.get_event_loop().time() > deadline: + raise TimeoutError("Handle did not complete") + await asyncio.sleep(0.1) + + incoming_calls = [ + call + for call in mock_publish.call_args_list + if call.kwargs.get("phase") == "incoming" and call.args[2] == "StorageCheck" + ] + assert incoming_calls[0].kwargs["display_label"] == "Storing reusable workflow" + assert ( + incoming_calls[0].kwargs["instructions"] + == "Review the completed recurring workflow." + ) + assert mock_loop.call_args.kwargs["post_run_review_context"] is review_context + + # --------------------------------------------------------------------------- # Eval: persist=True + stop with memoize intent stores a function # --------------------------------------------------------------------------- diff --git a/tests/actor/code_act/test_task_execution_delegate.py b/tests/actor/code_act/test_task_execution_delegate.py index 6fc925a00..17b205904 100644 --- a/tests/actor/code_act/test_task_execution_delegate.py +++ b/tests/actor/code_act/test_task_execution_delegate.py @@ -5,8 +5,9 @@ import pytest from tests.helpers import _handle_project -from unity.actor.code_act_actor import CodeActActor +from unity.actor.code_act_actor import CodeActActor, _CodeActTaskExecutionDelegate from unity.actor.environments.state_managers import StateManagerEnvironment +from unity.actor.simulated import SimulatedActor from unity.common.task_execution_context import current_task_execution_delegate from unity.function_manager.function_manager import FunctionManager from unity.function_manager.primitives import PrimitiveScope, Primitives @@ -15,6 +16,60 @@ from unity.task_scheduler.types.status import Status +@pytest.mark.asyncio +async def test_codeact_task_delegate_runs_description_tasks_in_child_actor_slot(): + calls = [] + actor = SimulatedActor(steps=0) + + original_act = actor.act + + async def _spy_act(*args, **kwargs): + calls.append(kwargs) + return await original_act(*args, **kwargs) + + actor.act = _spy_act # type: ignore[method-assign] + delegate = _CodeActTaskExecutionDelegate(actor) # type: ignore[arg-type] + + handle = await delegate.start_task_run( + task_description="Run the description-driven task.", + entrypoint=None, + parent_chat_context=None, + clarification_up_q=None, + clarification_down_q=None, + ) + await handle.result() + + assert calls[0]["_reuse_actor_slot"] is False + assert calls[0]["persist"] is False + + +@pytest.mark.asyncio +async def test_codeact_task_delegate_reuses_actor_slot_for_entrypoint_tasks(): + calls = [] + actor = SimulatedActor(steps=0) + + original_act = actor.act + + async def _spy_act(*args, **kwargs): + calls.append(kwargs) + return await original_act(*args, **kwargs) + + actor.act = _spy_act # type: ignore[method-assign] + delegate = _CodeActTaskExecutionDelegate(actor) # type: ignore[arg-type] + + handle = await delegate.start_task_run( + task_description="Run the function-backed task.", + entrypoint=123, + parent_chat_context=None, + clarification_up_q=None, + clarification_down_q=None, + ) + await handle.result() + + assert calls[0]["_reuse_actor_slot"] is True + assert calls[0]["entrypoint"] == 123 + + @pytest.mark.asyncio @pytest.mark.llm_call @pytest.mark.timeout(120) diff --git a/tests/actor/state_managers/real/tasks/test_recurring_creation_code_act.py b/tests/actor/state_managers/real/tasks/test_recurring_creation_code_act.py new file mode 100644 index 000000000..943a4408b --- /dev/null +++ b/tests/actor/state_managers/real/tasks/test_recurring_creation_code_act.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import pytest + +from tests.actor.state_managers.utils import make_code_act_actor +from unity.task_scheduler.types.status import Status + +pytestmark = [pytest.mark.eval, pytest.mark.llm_call] + + +@pytest.mark.asyncio +@pytest.mark.timeout(300) +async def test_code_act_creates_live_recurring_task_with_null_entrypoint(): + async with make_code_act_actor(impl="real", exposed_managers={"tasks"}) as ( + actor, + primitives, + calls, + ): + handle = await actor.act( + ( + "Create exactly one live scheduled recurring task using " + "primitives.tasks.update. Name it exactly 'Controlled weekly AI report'. " + "Description: Every Monday at 12:00 UTC, research important AI and " + "agentic AI work from the previous week, summarize the key developments, " + "and email me a concise report. Set the first run for the next Monday " + "at 12:00 UTC and repeat weekly. Do not create or attach any entrypoint " + "function, do not mark it offline, and do not execute it now." + ), + clarification_enabled=False, + ) + result = await handle.result() + + assert result is not None + assert "primitives.tasks.update" in set(calls) + + rows = primitives.tasks._filter_tasks(filter="task_id >= 0") + task = [row for row in rows if row.name == "Controlled weekly AI report"][0] + assert task.offline is False + assert task.entrypoint is None + assert task.schedule is not None + assert task.repeat is not None + assert task.status == Status.scheduled + + +@pytest.mark.asyncio +@pytest.mark.timeout(300) +async def test_code_act_creates_live_triggerable_task_with_null_entrypoint(): + async with make_code_act_actor(impl="real", exposed_managers={"tasks"}) as ( + actor, + primitives, + calls, + ): + handle = await actor.act( + ( + "Create exactly one live triggerable task using primitives.tasks.update. " + "Name it exactly 'Controlled invoice email follow-up'. Description: " + "Whenever an inbound email about invoices arrives, summarize the email, " + "identify the needed action, and draft a reply for review. Use an email " + "trigger, leave entrypoint null, do not mark it offline, and do not " + "execute it now." + ), + clarification_enabled=False, + ) + result = await handle.result() + + assert result is not None + assert "primitives.tasks.update" in set(calls) + + rows = primitives.tasks._filter_tasks(filter="task_id >= 0") + task = [ + row for row in rows if row.name == "Controlled invoice email follow-up" + ][0] + assert task.offline is False + assert task.entrypoint is None + assert task.trigger is not None + assert task.status == Status.triggerable From a21612fd527de8259a8fb775ddc363365af8177c Mon Sep 17 00:00:00 2001 From: Haris Mahmood Date: Tue, 12 May 2026 15:35:05 +0500 Subject: [PATCH 5/6] test(tasks): cover description-driven recurring execution Add coverage for explicit actor requirements, entrypoint review context propagation, recurring clone timing, future instance patching, and task execution prompt builders. --- tests/task_scheduler/test_execute.py | 62 +++++++++++++- tests/task_scheduler/test_prompt_builders.py | 57 +++++++++++++ tests/task_scheduler/test_repetition.py | 86 ++++++++++++++++++++ tests/task_scheduler/test_trigger.py | 17 +++- 4 files changed, 220 insertions(+), 2 deletions(-) create mode 100644 tests/task_scheduler/test_prompt_builders.py diff --git a/tests/task_scheduler/test_execute.py b/tests/task_scheduler/test_execute.py index 26cf5232d..49482c15d 100644 --- a/tests/task_scheduler/test_execute.py +++ b/tests/task_scheduler/test_execute.py @@ -23,7 +23,9 @@ from unity.actor.simulated import SimulatedActorHandle from unity.task_scheduler.types.schedule import Schedule from unity.task_scheduler.types.activated_by import ActivatedBy +from unity.task_scheduler.types.repetition import Frequency, RepeatPattern from unity.task_scheduler.types.status import Status +from unity.common.task_execution_context import current_post_run_review_context # The helper used in the existing test‑suite – applies project‑level monkey‐ # patches (e.g. env vars, tracers) so we keep behaviour consistent. @@ -231,7 +233,7 @@ async def test_execute_interject(monkeypatch): @functools.wraps(original_interject) async def spy_interject(self, instruction: str, *, images=None) -> None: # type: ignore[override] calls["interject"] += 1 - await original_interject(self, instruction, images=images) + await original_interject(self, instruction) monkeypatch.setattr(SimulatedActorHandle, "interject", spy_interject, raising=True) @@ -344,6 +346,7 @@ async def test_execute_result_and_done(): # Perform an interjection for activity, then stop explicitly await task.interject("Provide initial outline first.") await task.stop(cancel=False) + await task.result() assert task.done(), "`done()` must return True after explicit stop" @@ -546,6 +549,63 @@ async def test_execute_sets_activated_by_explicit(): assert any(r.activated_by == ActivatedBy.explicit for r in rows) +@pytest.mark.asyncio +@_handle_project +async def test_execute_without_delegate_or_actor_fails_before_mutation(): + ts = TaskScheduler() + task_id = ts._create_task(name="Needs actor", description="Needs actor")["details"][ + "task_id" + ] + initial_status = ts._get_task_or_raise(task_id).status + + with pytest.raises(RuntimeError, match="run-scoped actor delegate"): + await ts.execute(task_id=task_id) + + row = ts._get_task_or_raise(task_id) + assert row.status == initial_status + assert ts._active_task is None + + +@pytest.mark.asyncio +@_handle_project +async def test_direct_description_driven_recurring_execution_passes_entrypoint_review(): + calls = [] + actor = SimulatedActor(steps=0) + original_act = actor.act + + async def _spy_act(*args, **kwargs): + calls.append( + { + "kwargs": kwargs, + "post_run_review_context": current_post_run_review_context.get(), + }, + ) + return await original_act(*args, **kwargs) + + actor.act = _spy_act # type: ignore[method-assign] + ts = TaskScheduler(actor=actor) + task_id = ts._create_task( + name="Recurring no-entrypoint task", + description="Run from the natural-language description every day.", + status=Status.scheduled, + schedule=Schedule(start_at=datetime.now(timezone.utc)), + repeat=[RepeatPattern(frequency=Frequency.DAILY)], + )["details"]["task_id"] + + handle = await ts.execute(task_id=task_id) + await handle.result() + + assert "task_entrypoint_review" not in calls[0]["kwargs"] + post_run_review_context = calls[0]["post_run_review_context"] + assert post_run_review_context is not None + assert post_run_review_context.display_label == "Storing reusable workflow" + review = post_run_review_context.extensions.get("task_entrypoint_review") + assert review is not None + assert review["metadata"]["task_id"] == task_id + assert review["metadata"]["task_name"] == "Recurring no-entrypoint task" + assert callable(review["attach_entrypoint"]) + + @pytest.mark.asyncio @_handle_project async def test_update_status_cannot_force_active(): diff --git a/tests/task_scheduler/test_prompt_builders.py b/tests/task_scheduler/test_prompt_builders.py new file mode 100644 index 000000000..27669f1e3 --- /dev/null +++ b/tests/task_scheduler/test_prompt_builders.py @@ -0,0 +1,57 @@ +from datetime import datetime, timezone + +from unity.task_scheduler.prompt_builders import ( + build_task_execution_request, + build_task_run_guidelines, +) +from unity.task_scheduler.types.activated_by import ActivatedBy +from unity.task_scheduler.types.priority import Priority +from unity.task_scheduler.types.repetition import Frequency, RepeatPattern +from unity.task_scheduler.types.schedule import Schedule +from unity.task_scheduler.types.status import Status +from unity.task_scheduler.types.task import Task + + +def test_build_task_execution_request_includes_run_metadata(): + task = Task( + task_id=7, + instance_id=2, + name="Weekly AI report", + description="Summarize the previous week's AI research.", + status=Status.scheduled, + priority=Priority.normal, + response_policy="Email the user a concise document.", + schedule=Schedule(start_at=datetime(2026, 5, 18, 12, 0, tzinfo=timezone.utc)), + repeat=[RepeatPattern(frequency=Frequency.WEEKLY)], + ) + + request = build_task_execution_request(task) + + assert "Execute this TaskScheduler task as a contained task run." in request + assert "Task id: 7" in request + assert "Instance id: 2" in request + assert "Weekly AI report" in request + assert "Summarize the previous week's AI research." in request + assert "Task response policy:" in request + assert "Schedule metadata:" in request + assert "Repeat metadata:" in request + + +def test_build_task_run_guidelines_keep_child_actor_focused_on_one_task(): + task = Task( + task_id=3, + instance_id=1, + name="Invoice follow-up", + description="Draft an invoice reply.", + status=Status.triggerable, + priority=Priority.normal, + ) + + guidelines = build_task_run_guidelines(task, ActivatedBy.trigger) + + assert "executing exactly one TaskScheduler task" in guidelines + assert "do not create another task" in guidelines + assert "interpret the natural-language description" in guidelines + assert "Activation reason: trigger" in guidelines + assert "Task id: 3" in guidelines + assert "Instance id: 1" in guidelines diff --git a/tests/task_scheduler/test_repetition.py b/tests/task_scheduler/test_repetition.py index ac289978e..233258187 100644 --- a/tests/task_scheduler/test_repetition.py +++ b/tests/task_scheduler/test_repetition.py @@ -1,6 +1,9 @@ from datetime import datetime, timedelta, timezone +import pytest + from tests.helpers import _handle_project +from unity.actor.simulated import SimulatedActor from unity.task_scheduler.task_scheduler import TaskScheduler from unity.task_scheduler.types.repetition import ( Frequency, @@ -76,6 +79,89 @@ def test_clone_task_instance_rearms_recurring_scheduled_task(): assert latest.instance_id == 1 assert latest.status == Status.scheduled assert latest.schedule_start_at == initial_start + timedelta(days=1) + assert latest.entrypoint is None + + +@_handle_project +def test_entrypoint_review_patches_future_description_driven_instances(): + scheduler = TaskScheduler() + initial_start = datetime.now(timezone.utc).replace(microsecond=0) - timedelta( + hours=1, + ) + scheduler._create_task( + name="Daily description-driven summary", + description="Summarize updates every day.", + status=Status.scheduled, + schedule=Schedule(start_at=initial_start.isoformat()), + repeat=[RepeatPattern(frequency=Frequency.DAILY)], + ) + + current = scheduler._get_task_or_raise(0) + scheduler._clone_task_instance(current) + result = scheduler._attach_entrypoint_to_future_instances( + task_id=0, + completed_instance_id=0, + function_id=321, + rationale="The successful run revealed a stable workflow.", + ) + + rows = scheduler._filter_tasks(filter="task_id == 0") + current_row = min(rows, key=lambda task: task.instance_id) + future_row = max(rows, key=lambda task: task.instance_id) + + assert result["outcome"] == "attached" + assert current_row.entrypoint is None + assert future_row.entrypoint == 321 + + +@pytest.mark.asyncio +@_handle_project +async def test_recurring_execution_clones_before_entrypoint_review_patch(): + scheduler = TaskScheduler(actor=SimulatedActor(steps=0)) + initial_start = datetime.now(timezone.utc).replace(microsecond=0) - timedelta( + hours=1, + ) + scheduler._create_task( + name="Daily report", + description="Run the daily report from the task description.", + status=Status.scheduled, + schedule=Schedule(start_at=initial_start.isoformat()), + repeat=[RepeatPattern(frequency=Frequency.DAILY)], + ) + + handle = await scheduler.execute(task_id=0) + await handle.result() + + rows_after_run = sorted( + scheduler._filter_tasks(filter="task_id == 0"), + key=lambda task: task.instance_id, + ) + assert [row.instance_id for row in rows_after_run] == [0, 1] + assert rows_after_run[0].entrypoint is None + assert rows_after_run[1].entrypoint is None + + result = scheduler._attach_entrypoint_to_future_instances( + task_id=0, + completed_instance_id=0, + function_id=321, + rationale="The completed run was stable enough to reuse.", + ) + assert result["outcome"] == "attached" + + patched_next = [ + row + for row in scheduler._filter_tasks(filter="task_id == 0") + if row.instance_id == 1 + ][0] + assert patched_next.entrypoint == 321 + + scheduler._clone_task_instance(patched_next) + cloned_from_patched = [ + row + for row in scheduler._filter_tasks(filter="task_id == 0") + if row.instance_id == 2 + ][0] + assert cloned_from_patched.entrypoint == 321 @_handle_project diff --git a/tests/task_scheduler/test_trigger.py b/tests/task_scheduler/test_trigger.py index 8cb910c96..0126f9d99 100644 --- a/tests/task_scheduler/test_trigger.py +++ b/tests/task_scheduler/test_trigger.py @@ -8,6 +8,7 @@ import pytest from tests.helpers import _handle_project +from unity.actor.simulated import SimulatedActor from unity.task_scheduler.task_scheduler import TaskScheduler from unity.task_scheduler.types.status import Status from unity.task_scheduler.types.schedule import Schedule @@ -178,7 +179,7 @@ async def test_triggerable_start_clones_instance(): • create a **new** row with the same `task_id` but `instance_id` 1 that remains in the *triggerable* state """ - ts = TaskScheduler() + ts = TaskScheduler(actor=SimulatedActor(steps=None, duration=None)) trig = Trigger(medium=Medium.EMAIL, recurring=False) tid = ts._create_task( @@ -202,6 +203,20 @@ async def test_triggerable_start_clones_instance(): assert status_by_inst[0] == Status.active assert status_by_inst[1] == Status.triggerable + result = ts._attach_entrypoint_to_future_instances( + task_id=tid, + completed_instance_id=0, + function_id=654, + rationale="The triggered run revealed a stable reusable workflow.", + ) + assert result["outcome"] == "attached" + future_row = [ + row + for row in ts._filter_tasks(filter=f"task_id == {tid}") + if row.instance_id == 1 + ][0] + assert future_row.entrypoint == 654 + # Clean-up (avoid background thread leaks) await handle.stop(cancel=True) await handle.result() From c06f487ca84f6c662c9b4fd89df89fb2a218553e Mon Sep 17 00:00:00 2001 From: Haris Mahmood Date: Tue, 12 May 2026 15:35:11 +0500 Subject: [PATCH 6/6] test(tasks): inject simulated actors in execution tests Update scheduler tests to provide explicit simulated actors now that direct execution no longer creates an implicit fallback actor. --- tests/task_scheduler/test_active_queue.py | 23 +++++++++++++++++++--- tests/task_scheduler/test_active_task.py | 2 +- tests/task_scheduler/test_event_logging.py | 5 +++-- tests/task_scheduler/test_reintegration.py | 19 ++++++++++++++++-- 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/tests/task_scheduler/test_active_queue.py b/tests/task_scheduler/test_active_queue.py index c2d878795..1652627b2 100644 --- a/tests/task_scheduler/test_active_queue.py +++ b/tests/task_scheduler/test_active_queue.py @@ -8,12 +8,26 @@ import pytest from tests.helpers import _handle_project -from unity.task_scheduler.task_scheduler import TaskScheduler +from unity.task_scheduler import task_scheduler as task_scheduler_module +from unity.task_scheduler.task_scheduler import TaskScheduler as _TaskScheduler from unity.task_scheduler.types.task import Task -from unity.actor.simulated import SimulatedActor, SimulatedActorHandle +from unity.actor.simulated import ( + SimulatedActor, + SimulatedActorHandle, + _StaticAnswerHandle, +) from unity.task_scheduler.types.status import Status import inspect +task_scheduler_module.SimulatedActor = SimulatedActor + + +def TaskScheduler(*args, **kwargs): + if "actor" not in kwargs: + actor_cls = getattr(task_scheduler_module, "SimulatedActor", SimulatedActor) + kwargs["actor"] = actor_cls() + return _TaskScheduler(*args, **kwargs) + async def _make_ordered_queue( ts: TaskScheduler, @@ -625,6 +639,9 @@ class _FakeQueueClient: def __init__(self, *a, **kw): pass + def set_on_log_file_pending(self, callback): + return None + def set_system_message(self, sys_msg): try: prompt_capture["system"] = str(sys_msg) @@ -1203,7 +1220,7 @@ async def spy_actor_ask(self, question: str): # type: ignore[override] self.simulate_step() except Exception: pass - return "OK" + return _StaticAnswerHandle("OK") async def spy_actor_interject(self, instruction: str, *, images=None): # type: ignore[override] interject_calls["count"] += 1 diff --git a/tests/task_scheduler/test_active_task.py b/tests/task_scheduler/test_active_task.py index 4b6e03f40..d7643f7b6 100644 --- a/tests/task_scheduler/test_active_task.py +++ b/tests/task_scheduler/test_active_task.py @@ -118,7 +118,7 @@ async def test_interject(monkeypatch): @functools.wraps(original_interject) async def spy_interject(self, instruction: str, *, images=None) -> None: # type: ignore[override] calls["interject"] += 1 - return await original_interject(self, instruction, images=images) + return await original_interject(self, instruction) monkeypatch.setattr(SimulatedActorHandle, "interject", spy_interject, raising=True) diff --git a/tests/task_scheduler/test_event_logging.py b/tests/task_scheduler/test_event_logging.py index facbd7bbb..63d3150d4 100644 --- a/tests/task_scheduler/test_event_logging.py +++ b/tests/task_scheduler/test_event_logging.py @@ -2,6 +2,7 @@ import pytest +from unity.actor.simulated import SimulatedActor from unity.task_scheduler.task_scheduler import TaskScheduler from tests.helpers import _handle_project, capture_events @@ -66,7 +67,7 @@ async def test_managermethod_events_for_update(): @pytest.mark.asyncio @_handle_project async def test_managermethod_events_for_execute(): - ts = TaskScheduler() + ts = TaskScheduler(actor=SimulatedActor(steps=0)) # create a simple task first outcome = ts._create_task(name="Demo", description="Run a demo task") @@ -81,7 +82,7 @@ async def test_managermethod_events_for_execute(): for e in events if e.payload.get("manager") == "TaskScheduler" and e.payload.get("method") == "execute" - and e.payload.get("request") == task_id + and e.payload.get("phase") == "incoming" ] assert incoming call_id = incoming[0].calling_id diff --git a/tests/task_scheduler/test_reintegration.py b/tests/task_scheduler/test_reintegration.py index 9be0b3b30..bc748fa1c 100644 --- a/tests/task_scheduler/test_reintegration.py +++ b/tests/task_scheduler/test_reintegration.py @@ -5,13 +5,25 @@ import pytest from tests.helpers import _handle_project -from unity.task_scheduler.task_scheduler import TaskScheduler +from unity.actor.simulated import SimulatedActor +from unity.task_scheduler import task_scheduler as task_scheduler_module +from unity.task_scheduler.task_scheduler import TaskScheduler as _TaskScheduler from unity.task_scheduler.types.schedule import Schedule from unity.task_scheduler.types.status import Status from unity.task_scheduler.types.trigger import Trigger, Medium pytestmark = pytest.mark.llm_call +task_scheduler_module.SimulatedActor = SimulatedActor + + +def TaskScheduler(*args, **kwargs): + if "actor" not in kwargs: + actor_cls = getattr(task_scheduler_module, "SimulatedActor", SimulatedActor) + kwargs["actor"] = actor_cls() + return _TaskScheduler(*args, **kwargs) + + # Speed up only this module's SimulatedActor by monkeypatching the class symbols # used by TaskScheduler to a shorter-duration variant. This does not affect # other test modules. @@ -424,7 +436,9 @@ async def test_reintegration_plan_clears_on_completion(): async def test_chain_then_defer_restores_next_head_start_at(monkeypatch): from datetime import datetime, timezone, timedelta - ts = TaskScheduler() + ts = _TaskScheduler( + actor=SimulatedActor(steps=None, duration=None, hold_completion=True), + ) # Chain execution is the default; no environment variable required. @@ -455,6 +469,7 @@ async def test_chain_then_defer_restores_next_head_start_at(monkeypatch): # Start the head in chain mode but only allow the head to complete handle = await ts.execute(task_id=head_id) + handle._current_handle._actor_handle.trigger_completion() # Wait for just the head to finish await handle._active_task_done()