Skip to content
Merged

Tasks #273

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions tests/actor/code_act/test_prompt_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ def get_tools(self) -> dict:
return {}


class _DummyToolEnv(_DummyEnv):
"""Minimal environment stub with configurable tool names."""

def __init__(self, prompt_context: str, tools: dict[str, Any]):
super().__init__(prompt_context)
self._tools = tools

def get_tools(self) -> dict:
return self._tools


def _real_envs_mixed() -> Mapping[str, Any]:
"""Real environments that produce self-contained prompt context."""
from unity.function_manager.primitives import ComputerPrimitives
Expand Down Expand Up @@ -111,6 +122,33 @@ def test_code_act_prompt_includes_diverse_examples_sessions_computer_primitives_
assert "execute_function vs execute_code decision" in prompt


@pytest.mark.timeout(30)
def test_code_act_prompt_includes_task_workflow_guidance_only_with_task_primitives():
prompt_with_tasks = build_code_act_prompt(
environments={
"primitives": _DummyToolEnv(
"Task primitives are available.",
{"primitives.tasks.update": object()},
),
},
tools={},
)
prompt_without_tasks = build_code_act_prompt(
environments={
"primitives": _DummyToolEnv(
"Only contact primitives are available.",
{"primitives.contacts.ask": object()},
),
},
tools={},
)

assert "Durable Scheduled And Triggered Workflows" in prompt_with_tasks
assert "`entrypoint=None`" in prompt_with_tasks
assert "primitives.tasks.execute(task_id=...)" in prompt_with_tasks
assert "Durable Scheduled And Triggered Workflows" not in prompt_without_tasks


@pytest.mark.timeout(30)
def test_code_act_prompt_teaches_refresh_token_oauth_helper():
actor = CodeActActor()
Expand Down
71 changes: 71 additions & 0 deletions tests/actor/code_act/test_storage_on_stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pytest

from unity.actor.code_act_actor import CodeActActor, _StorageCheckHandle
from unity.common.task_execution_context import PostRunReviewContext

# ---------------------------------------------------------------------------
# Symbolic: _StorageCheckHandle runs Phase 2 after stop
Expand Down Expand Up @@ -230,6 +231,76 @@ async def _stop(**kwargs):
), f"StorageCheck incoming event must have a non-null instructions kwarg, got {instructions!r}"


@pytest.mark.asyncio
@pytest.mark.timeout(30)
async def test_task_entrypoint_review_uses_reusable_workflow_event_label():
result_future: asyncio.Future[str] = asyncio.get_event_loop().create_future()

inner = MagicMock()

async def _await_result():
return await result_future

inner.result = _await_result
inner.next_notification = AsyncMock(side_effect=lambda: asyncio.Event().wait())

async def _stop(**kwargs):
if not result_future.done():
result_future.set_result("done")

inner.stop = AsyncMock(side_effect=_stop)
inner._client = MagicMock(messages=[{"role": "user", "content": "do task"}])

mock_task = MagicMock()
mock_task.get_ask_tools = MagicMock(return_value={})
mock_task.get_completed_tool_metadata = MagicMock(return_value={})
inner._task = mock_task

actor = MagicMock()
actor.function_manager = None
actor.guidance_manager = None

review_context = PostRunReviewContext(
display_label="Storing reusable workflow",
instructions="Review the completed recurring workflow.",
extensions={"task_entrypoint_review": {"metadata": {"task_id": 1}}},
)

with (
patch("unity.actor.code_act_actor._start_storage_check_loop") as mock_loop,
patch(
"unity.actor.code_act_actor.publish_manager_method_event",
new_callable=AsyncMock,
) as mock_publish,
):
mock_loop.return_value = None
handle = _StorageCheckHandle(
inner=inner,
actor=actor,
post_run_review_context=review_context,
)

result_future.set_result("done")

deadline = asyncio.get_event_loop().time() + 10
while not handle.done():
if asyncio.get_event_loop().time() > deadline:
raise TimeoutError("Handle did not complete")
await asyncio.sleep(0.1)

incoming_calls = [
call
for call in mock_publish.call_args_list
if call.kwargs.get("phase") == "incoming" and call.args[2] == "StorageCheck"
]
assert incoming_calls[0].kwargs["display_label"] == "Storing reusable workflow"
assert (
incoming_calls[0].kwargs["instructions"]
== "Review the completed recurring workflow."
)
assert mock_loop.call_args.kwargs["post_run_review_context"] is review_context


# ---------------------------------------------------------------------------
# Eval: persist=True + stop with memoize intent stores a function
# ---------------------------------------------------------------------------
Expand Down
57 changes: 56 additions & 1 deletion tests/actor/code_act/test_task_execution_delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import pytest

from tests.helpers import _handle_project
from unity.actor.code_act_actor import CodeActActor
from unity.actor.code_act_actor import CodeActActor, _CodeActTaskExecutionDelegate
from unity.actor.environments.state_managers import StateManagerEnvironment
from unity.actor.simulated import SimulatedActor
from unity.common.task_execution_context import current_task_execution_delegate
from unity.function_manager.function_manager import FunctionManager
from unity.function_manager.primitives import PrimitiveScope, Primitives
Expand All @@ -15,6 +16,60 @@
from unity.task_scheduler.types.status import Status


@pytest.mark.asyncio
async def test_codeact_task_delegate_runs_description_tasks_in_child_actor_slot():
calls = []
actor = SimulatedActor(steps=0)

original_act = actor.act

async def _spy_act(*args, **kwargs):
calls.append(kwargs)
return await original_act(*args, **kwargs)

actor.act = _spy_act # type: ignore[method-assign]
delegate = _CodeActTaskExecutionDelegate(actor) # type: ignore[arg-type]

handle = await delegate.start_task_run(
task_description="Run the description-driven task.",
entrypoint=None,
parent_chat_context=None,
clarification_up_q=None,
clarification_down_q=None,
)
await handle.result()

assert calls[0]["_reuse_actor_slot"] is False
assert calls[0]["persist"] is False


@pytest.mark.asyncio
async def test_codeact_task_delegate_reuses_actor_slot_for_entrypoint_tasks():
calls = []
actor = SimulatedActor(steps=0)

original_act = actor.act

async def _spy_act(*args, **kwargs):
calls.append(kwargs)
return await original_act(*args, **kwargs)

actor.act = _spy_act # type: ignore[method-assign]
delegate = _CodeActTaskExecutionDelegate(actor) # type: ignore[arg-type]

handle = await delegate.start_task_run(
task_description="Run the function-backed task.",
entrypoint=123,
parent_chat_context=None,
clarification_up_q=None,
clarification_down_q=None,
)
await handle.result()

assert calls[0]["_reuse_actor_slot"] is True
assert calls[0]["entrypoint"] == 123


@pytest.mark.asyncio
@pytest.mark.llm_call
@pytest.mark.timeout(120)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import annotations

import pytest

from tests.actor.state_managers.utils import make_code_act_actor
from unity.task_scheduler.types.status import Status

pytestmark = [pytest.mark.eval, pytest.mark.llm_call]


@pytest.mark.asyncio
@pytest.mark.timeout(300)
async def test_code_act_creates_live_recurring_task_with_null_entrypoint():
async with make_code_act_actor(impl="real", exposed_managers={"tasks"}) as (
actor,
primitives,
calls,
):
handle = await actor.act(
(
"Create exactly one live scheduled recurring task using "
"primitives.tasks.update. Name it exactly 'Controlled weekly AI report'. "
"Description: Every Monday at 12:00 UTC, research important AI and "
"agentic AI work from the previous week, summarize the key developments, "
"and email me a concise report. Set the first run for the next Monday "
"at 12:00 UTC and repeat weekly. Do not create or attach any entrypoint "
"function, do not mark it offline, and do not execute it now."
),
clarification_enabled=False,
)
result = await handle.result()

assert result is not None
assert "primitives.tasks.update" in set(calls)

rows = primitives.tasks._filter_tasks(filter="task_id >= 0")
task = [row for row in rows if row.name == "Controlled weekly AI report"][0]
assert task.offline is False
assert task.entrypoint is None
assert task.schedule is not None
assert task.repeat is not None
assert task.status == Status.scheduled


@pytest.mark.asyncio
@pytest.mark.timeout(300)
async def test_code_act_creates_live_triggerable_task_with_null_entrypoint():
async with make_code_act_actor(impl="real", exposed_managers={"tasks"}) as (
actor,
primitives,
calls,
):
handle = await actor.act(
(
"Create exactly one live triggerable task using primitives.tasks.update. "
"Name it exactly 'Controlled invoice email follow-up'. Description: "
"Whenever an inbound email about invoices arrives, summarize the email, "
"identify the needed action, and draft a reply for review. Use an email "
"trigger, leave entrypoint null, do not mark it offline, and do not "
"execute it now."
),
clarification_enabled=False,
)
result = await handle.result()

assert result is not None
assert "primitives.tasks.update" in set(calls)

rows = primitives.tasks._filter_tasks(filter="task_id >= 0")
task = [
row for row in rows if row.name == "Controlled invoice email follow-up"
][0]
assert task.offline is False
assert task.entrypoint is None
assert task.trigger is not None
assert task.status == Status.triggerable
23 changes: 20 additions & 3 deletions tests/task_scheduler/test_active_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,26 @@
import pytest

from tests.helpers import _handle_project
from unity.task_scheduler.task_scheduler import TaskScheduler
from unity.task_scheduler import task_scheduler as task_scheduler_module
from unity.task_scheduler.task_scheduler import TaskScheduler as _TaskScheduler
from unity.task_scheduler.types.task import Task
from unity.actor.simulated import SimulatedActor, SimulatedActorHandle
from unity.actor.simulated import (
SimulatedActor,
SimulatedActorHandle,
_StaticAnswerHandle,
)
from unity.task_scheduler.types.status import Status
import inspect

task_scheduler_module.SimulatedActor = SimulatedActor


def TaskScheduler(*args, **kwargs):
if "actor" not in kwargs:
actor_cls = getattr(task_scheduler_module, "SimulatedActor", SimulatedActor)
kwargs["actor"] = actor_cls()
return _TaskScheduler(*args, **kwargs)


async def _make_ordered_queue(
ts: TaskScheduler,
Expand Down Expand Up @@ -625,6 +639,9 @@ class _FakeQueueClient:
def __init__(self, *a, **kw):
pass

def set_on_log_file_pending(self, callback):
return None

def set_system_message(self, sys_msg):
try:
prompt_capture["system"] = str(sys_msg)
Expand Down Expand Up @@ -1203,7 +1220,7 @@ async def spy_actor_ask(self, question: str): # type: ignore[override]
self.simulate_step()
except Exception:
pass
return "OK"
return _StaticAnswerHandle("OK")

async def spy_actor_interject(self, instruction: str, *, images=None): # type: ignore[override]
interject_calls["count"] += 1
Expand Down
2 changes: 1 addition & 1 deletion tests/task_scheduler/test_active_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async def test_interject(monkeypatch):
@functools.wraps(original_interject)
async def spy_interject(self, instruction: str, *, images=None) -> None: # type: ignore[override]
calls["interject"] += 1
return await original_interject(self, instruction, images=images)
return await original_interject(self, instruction)

monkeypatch.setattr(SimulatedActorHandle, "interject", spy_interject, raising=True)

Expand Down
5 changes: 3 additions & 2 deletions tests/task_scheduler/test_event_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from unity.actor.simulated import SimulatedActor
from unity.task_scheduler.task_scheduler import TaskScheduler
from tests.helpers import _handle_project, capture_events

Expand Down Expand Up @@ -66,7 +67,7 @@ async def test_managermethod_events_for_update():
@pytest.mark.asyncio
@_handle_project
async def test_managermethod_events_for_execute():
ts = TaskScheduler()
ts = TaskScheduler(actor=SimulatedActor(steps=0))

# create a simple task first
outcome = ts._create_task(name="Demo", description="Run a demo task")
Expand All @@ -81,7 +82,7 @@ async def test_managermethod_events_for_execute():
for e in events
if e.payload.get("manager") == "TaskScheduler"
and e.payload.get("method") == "execute"
and e.payload.get("request") == task_id
and e.payload.get("phase") == "incoming"
]
assert incoming
call_id = incoming[0].calling_id
Expand Down
Loading