-
Notifications
You must be signed in to change notification settings - Fork 54
Cl 1679 release 2025 09 rpa crud workflow executions #141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
17b2cd2
99eb20f
90550d4
a11880f
74a7ca9
230df82
48650c7
09466d2
9fc035b
361f257
237e2c0
dccd7d1
3d3abb2
345cbb2
2bf8c7d
ebbe43b
c51ac93
06f1ffa
97371de
6214505
4725024
0c579d1
6055bd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """Execution models and services for the chat API.""" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| from fastapi import Depends | ||
|
|
||
| from askui.chat.api.dependencies import SettingsDep | ||
| from askui.chat.api.settings import Settings | ||
| from askui.chat.api.threads.dependencies import ThreadFacadeDep | ||
| from askui.chat.api.threads.facade import ThreadFacade | ||
| from askui.chat.api.workflow_executions.service import ExecutionService | ||
| from askui.chat.api.workflows.dependencies import WorkflowServiceDep | ||
| from askui.chat.api.workflows.service import WorkflowService | ||
|
|
||
|
|
||
| def get_execution_service( | ||
| settings: Settings = SettingsDep, | ||
| workflow_service: WorkflowService = WorkflowServiceDep, | ||
| thread_facade: ThreadFacade = ThreadFacadeDep, | ||
| ) -> ExecutionService: | ||
| """Get ExecutionService instance.""" | ||
| return ExecutionService(settings.data_dir, workflow_service, thread_facade) | ||
|
|
||
|
|
||
| ExecutionServiceDep = Depends(get_execution_service) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| import datetime | ||
| from typing import Literal | ||
|
|
||
| from pydantic import BaseModel | ||
|
|
||
| from askui.chat.api.models import ( | ||
| RunId, | ||
| ThreadId, | ||
| WorkflowExecutionId, | ||
| WorkspaceId, | ||
| WorkspaceResource, | ||
| ) | ||
| from askui.chat.api.workflows.models import WorkflowId | ||
| from askui.utils.datetime_utils import now | ||
| from askui.utils.id_utils import generate_time_ordered_id | ||
|
|
||
|
|
||
| class WorkflowExecutionCreateParams(BaseModel): | ||
| """ | ||
| Parameters for creating a workflow execution via API. | ||
| """ | ||
|
|
||
| workflow_id: WorkflowId | ||
|
|
||
|
|
||
| class WorkflowExecution(WorkspaceResource): | ||
| """ | ||
| A workflow execution resource in the chat API. | ||
|
|
||
| Args: | ||
| id (WorkflowExecutionId): The id of the execution. Must start with the 'exec_' prefix and be | ||
| followed by one or more alphanumerical characters. | ||
| object (Literal['execution']): The object type, always 'execution'. | ||
| created_at (datetime.datetime): The creation time as a datetime. | ||
| workflow (WorkflowId): The id of the workflow being executed. Must start with the 'wf_' prefix. | ||
| thread (ThreadId): The id of the thread this execution is associated with. Must start with the 'thread_' prefix. | ||
| status (ExecutionStatus): The current status of the workflow execution. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is the status? Is that not essential for it to know wether it succeeded or not? (see issue) |
||
| workspace_id (WorkspaceId | None, optional): The workspace this execution belongs to. | ||
| """ | ||
|
|
||
| id: WorkflowExecutionId | ||
| object: Literal["workflow_execution"] = "workflow_execution" | ||
| created_at: datetime.datetime | ||
| workflow_id: WorkflowId | ||
| thread_id: ThreadId | ||
| run_id: RunId | ||
|
|
||
| @classmethod | ||
| def create( | ||
| cls, | ||
| workspace_id: WorkspaceId, | ||
| workflow_id: WorkflowId, | ||
| run_id: RunId, | ||
| thread_id: ThreadId, | ||
| ) -> "WorkflowExecution": | ||
| return cls( | ||
| id=generate_time_ordered_id("wfexec"), | ||
| created_at=now(), | ||
| workspace_id=workspace_id, | ||
| run_id=run_id, | ||
| thread_id=thread_id, | ||
| workflow_id=workflow_id, | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| from collections.abc import AsyncGenerator | ||
| from typing import Annotated | ||
|
|
||
| from fastapi import APIRouter, BackgroundTasks, Header, Query, Response, status | ||
| from fastapi.responses import JSONResponse, StreamingResponse | ||
| from pydantic import BaseModel | ||
|
|
||
| from askui.chat.api.dependencies import ListQueryDep | ||
| from askui.chat.api.models import ThreadId, WorkspaceId | ||
| from askui.chat.api.workflow_executions.dependencies import ExecutionServiceDep | ||
| from askui.chat.api.workflow_executions.models import ( | ||
| WorkflowExecution, | ||
| WorkflowExecutionCreateParams, | ||
| WorkflowExecutionId, | ||
| ) | ||
| from askui.chat.api.workflow_executions.service import ExecutionService | ||
| from askui.chat.api.workflows.models import WorkflowId | ||
| from askui.utils.api_utils import ListQuery, ListResponse | ||
|
|
||
| router = APIRouter(prefix="/workflow-executions", tags=["workflow-executions"]) | ||
|
|
||
|
|
||
| @router.get("/") | ||
| def list_workflow_executions( | ||
| askui_workspace: Annotated[WorkspaceId, Header()], | ||
| query: ListQuery = ListQueryDep, | ||
| workflow_id: Annotated[WorkflowId | None, Query()] = None, | ||
| thread_id: Annotated[ThreadId | None, Query()] = None, | ||
| execution_service: ExecutionService = ExecutionServiceDep, | ||
| ) -> ListResponse[WorkflowExecution]: | ||
| """List executions with optional filtering by workflow and/or thread.""" | ||
| return execution_service.list_( | ||
| workspace_id=askui_workspace, | ||
| query=query, | ||
| workflow_id=workflow_id, | ||
| thread_id=thread_id, | ||
| ) | ||
|
|
||
|
|
||
| @router.post("/") | ||
| async def create_workflow_execution( | ||
| askui_workspace: Annotated[WorkspaceId, Header()], | ||
| params: WorkflowExecutionCreateParams, | ||
| background_tasks: BackgroundTasks, | ||
| execution_service: ExecutionService = ExecutionServiceDep, | ||
| ) -> WorkflowExecution: | ||
| """Create a new workflow execution.""" | ||
| execution, async_generator = await execution_service.create( | ||
| workspace_id=askui_workspace, params=params | ||
| ) | ||
|
|
||
| async def _run_async_generator() -> None: | ||
| async for _ in async_generator: | ||
| pass | ||
|
|
||
| background_tasks.add_task(_run_async_generator) | ||
|
Comment on lines
+52
to
+56
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should separate the |
||
| return execution | ||
|
|
||
|
|
||
| @router.get("/{execution_id}") | ||
| def retrieve_workflow_execution( | ||
| askui_workspace: Annotated[WorkspaceId, Header()], | ||
| execution_id: WorkflowExecutionId, | ||
| execution_service: ExecutionService = ExecutionServiceDep, | ||
| ) -> WorkflowExecution: | ||
| """Retrieve a specific execution by ID.""" | ||
| return execution_service.retrieve( | ||
| workspace_id=askui_workspace, execution_id=execution_id | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| from collections.abc import AsyncGenerator | ||
| from pathlib import Path | ||
| from typing import Callable | ||
|
|
||
| from askui.chat.api.messages.models import MessageCreateParams | ||
| from askui.chat.api.models import ThreadId, WorkspaceId | ||
| from askui.chat.api.runs.models import ThreadAndRunCreateParams | ||
| from askui.chat.api.runs.runner.events.events import Events | ||
| from askui.chat.api.threads.facade import ThreadFacade | ||
| from askui.chat.api.threads.models import ThreadCreateParams | ||
| from askui.chat.api.utils import build_workspace_filter_fn | ||
| from askui.chat.api.workflow_executions.models import ( | ||
| WorkflowExecution, | ||
| WorkflowExecutionCreateParams, | ||
| WorkflowExecutionId, | ||
| ) | ||
| from askui.chat.api.workflows.models import WorkflowId | ||
| from askui.chat.api.workflows.service import WorkflowService | ||
| from askui.utils.api_utils import ( | ||
| ConflictError, | ||
| ListQuery, | ||
| ListResponse, | ||
| NotFoundError, | ||
| list_resources, | ||
| ) | ||
|
|
||
|
|
||
| def _build_execution_filter_fn( | ||
| workspace_id: WorkspaceId, | ||
| workflow_id: WorkflowId | None = None, | ||
| thread_id: ThreadId | None = None, | ||
| ) -> Callable[[WorkflowExecution], bool]: | ||
| """Build filter function for executions with optional workflow and thread filters.""" | ||
| workspace_filter: Callable[[WorkflowExecution], bool] = build_workspace_filter_fn( | ||
| workspace_id, WorkflowExecution | ||
| ) | ||
|
|
||
| def filter_fn(execution: WorkflowExecution) -> bool: | ||
| if not workspace_filter(execution): | ||
| return False | ||
| if workflow_id is not None and execution.workflow_id != workflow_id: | ||
| return False | ||
| if thread_id is not None and execution.thread_id != thread_id: | ||
| return False | ||
| return True | ||
|
|
||
| return filter_fn | ||
|
|
||
|
|
||
| class ExecutionService: | ||
| """Service for managing Execution resources with filesystem persistence.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| base_dir: Path, | ||
| workflow_service: WorkflowService, | ||
| thread_facade: ThreadFacade, | ||
| ) -> None: | ||
| self._base_dir = base_dir | ||
| self._executions_dir = base_dir / "executions" | ||
| self._workflow_service = workflow_service | ||
| self._thread_facade = thread_facade | ||
|
|
||
| def _get_execution_path( | ||
| self, execution_id: WorkflowExecutionId, new: bool = False | ||
| ) -> Path: | ||
| """Get the file path for an execution.""" | ||
| execution_path = self._executions_dir / f"{execution_id}.json" | ||
| exists = execution_path.exists() | ||
| if new and exists: | ||
| error_msg = f"Execution {execution_id} already exists" | ||
| raise ConflictError(error_msg) | ||
| if not new and not exists: | ||
| error_msg = f"Execution {execution_id} not found" | ||
| raise NotFoundError(error_msg) | ||
| return execution_path | ||
|
|
||
| def list_( | ||
| self, | ||
| workspace_id: WorkspaceId, | ||
| query: ListQuery, | ||
| workflow_id: WorkflowId | None = None, | ||
| thread_id: ThreadId | None = None, | ||
| ) -> ListResponse[WorkflowExecution]: | ||
| """List executions with optional filtering by workflow and/or thread.""" | ||
| return list_resources( | ||
| base_dir=self._executions_dir, | ||
| query=query, | ||
| resource_type=WorkflowExecution, | ||
| filter_fn=_build_execution_filter_fn(workspace_id, workflow_id, thread_id), | ||
| ) | ||
|
|
||
| def retrieve( | ||
| self, workspace_id: WorkspaceId, execution_id: WorkflowExecutionId | ||
| ) -> WorkflowExecution: | ||
| """Retrieve a specific execution by ID.""" | ||
| try: | ||
| execution_path = self._get_execution_path(execution_id) | ||
| execution = WorkflowExecution.model_validate_json( | ||
| execution_path.read_text() | ||
| ) | ||
|
|
||
| # Check workspace access - allow if workspace_id is None (global access) | ||
| # or if execution workspace matches or execution has no workspace | ||
|
Comment on lines
+103
to
+104
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rm? |
||
| if execution.workspace_id != workspace_id: | ||
| error_msg = f"Execution {execution_id} not found" | ||
| raise NotFoundError(error_msg) | ||
|
|
||
| except FileNotFoundError as e: | ||
| error_msg = f"Execution {execution_id} not found" | ||
| raise NotFoundError(error_msg) from e | ||
| else: | ||
| return execution | ||
|
|
||
| async def create( | ||
| self, workspace_id: WorkspaceId, params: WorkflowExecutionCreateParams | ||
| ) -> tuple[WorkflowExecution, AsyncGenerator[Events, None]]: | ||
| """Create a new execution.""" | ||
| # Validate that the workflow exists in the same workspace | ||
| workflow = self._workflow_service.retrieve(workspace_id, params.workflow_id) | ||
|
|
||
| # Create a thread and a run | ||
| run, async_generator = await self._thread_facade.create_thread_and_run( | ||
| workspace_id, | ||
| ThreadAndRunCreateParams( | ||
| assistant_id=workflow.assistant_id, | ||
| thread=ThreadCreateParams( | ||
| name=workflow.name, | ||
| messages=[ | ||
| MessageCreateParams(role="user", content=workflow.description) | ||
| ], | ||
| ), | ||
| ), | ||
| ) | ||
|
|
||
| execution = WorkflowExecution.create( | ||
| workspace_id, params.workflow_id, run.id, run.thread_id | ||
| ) | ||
| self._save(execution, new=True) | ||
| return execution, async_generator | ||
|
|
||
| def _save(self, execution: WorkflowExecution, new: bool = False) -> None: | ||
| """Save execution to filesystem.""" | ||
| self._executions_dir.mkdir(parents=True, exist_ok=True) | ||
| execution_file = self._get_execution_path(execution.id, new=new) | ||
| execution_file.write_text(execution.model_dump_json(), encoding="utf-8") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,12 +2,15 @@ | |
|
|
||
| from pydantic import BaseModel, Field | ||
|
|
||
| from askui.chat.api.models import WorkspaceId, WorkspaceResource | ||
| from askui.chat.api.models import ( | ||
| AssistantId, | ||
| WorkflowId, | ||
| WorkspaceId, | ||
| WorkspaceResource, | ||
| ) | ||
| from askui.utils.datetime_utils import UnixDatetime, now | ||
| from askui.utils.id_utils import IdField, generate_time_ordered_id | ||
|
|
||
| WorkflowId = Annotated[str, IdField("wf")] | ||
|
|
||
|
|
||
| class WorkflowCreateParams(BaseModel): | ||
| """ | ||
|
|
@@ -16,6 +19,7 @@ class WorkflowCreateParams(BaseModel): | |
|
|
||
| name: str | ||
| description: str | ||
| assistant_id: AssistantId | ||
| tags: list[str] = Field(default_factory=list) | ||
|
|
||
|
|
||
|
|
@@ -42,19 +46,20 @@ class Workflow(WorkspaceResource): | |
| description (str): A detailed description of the workflow's purpose and steps. | ||
| tags (list[str], optional): Tags associated with the workflow for filtering or | ||
| categorization. Default is an empty list. | ||
| workspace_id (WorkspaceId | None, optional): The workspace this workflow belongs to. | ||
| workspace_id (WorkspaceId, optional): The workspace this workflow belongs to. | ||
| """ | ||
|
|
||
| id: WorkflowId | ||
| object: Literal["workflow"] = "workflow" | ||
| assistant_id: AssistantId | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need the assistants id here? I would expect the workflow to be independent of the assistant for now. (Btw.: this is also not documented above but not necessary if we decide to remove it) |
||
| created_at: UnixDatetime | ||
| name: str | ||
| description: str | ||
| tags: list[str] = Field(default_factory=list) | ||
|
|
||
| @classmethod | ||
| def create( | ||
| cls, workspace_id: WorkspaceId | None, params: WorkflowCreateParams | ||
| cls, workspace_id: WorkspaceId, params: WorkflowCreateParams | ||
| ) -> "Workflow": | ||
| return cls( | ||
| id=generate_time_ordered_id("wf"), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this to a separate automation agent that similarly to the testing agent that can use other agents (computer agent and webagent). WDYT?