Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/askui/chat/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from askui.chat.api.messages.router import router as messages_router
from askui.chat.api.runs.router import router as runs_router
from askui.chat.api.threads.router import router as threads_router
from askui.chat.api.workflows.router import router as workflows_router
from askui.utils.api_utils import (
ConflictError,
FileTooLargeError,
Expand Down Expand Up @@ -52,6 +53,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # noqa: ARG001
v1_router.include_router(runs_router)
v1_router.include_router(mcp_configs_router)
v1_router.include_router(files_router)
v1_router.include_router(workflows_router)
v1_router.include_router(health_router)
app.include_router(v1_router)

Expand Down
Empty file.
13 changes: 13 additions & 0 deletions src/askui/chat/api/workflows/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from fastapi import Depends

from askui.chat.api.dependencies import SettingsDep
from askui.chat.api.settings import Settings
from askui.chat.api.workflows.service import WorkflowService


def get_workflow_service(settings: Settings = SettingsDep) -> WorkflowService:
"""Get WorkflowService instance."""
return WorkflowService(settings.data_dir)


WorkflowServiceDep = Depends(get_workflow_service)
73 changes: 73 additions & 0 deletions src/askui/chat/api/workflows/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from typing import Annotated, Literal

from pydantic import BaseModel, Field

from askui.chat.api.models import WorkspaceId, WorkspaceResource
from askui.utils.datetime_utils import UnixDatetime, now
from askui.utils.id_utils import IdField, generate_time_ordered_id

WorkflowId = Annotated[str, IdField("wf")]


class WorkflowCreateParams(BaseModel):
"""
Parameters for creating a workflow via API.
"""

name: str
description: str
tags: list[str] = Field(default_factory=list)


class WorkflowModifyParams(BaseModel):
"""
Parameters for modifying a workflow via API.
"""

name: str | None = None
description: str | None = None
tags: list[str] | None = None


class Workflow(WorkspaceResource):
"""
A workflow resource in the chat API.

Args:
id (WorkflowId): The id of the workflow. Must start with the 'wf_' prefix and be
followed by one or more alphanumerical characters.
object (Literal['workflow']): The object type, always 'workflow'.
created_at (UnixDatetime): The creation time as a Unix timestamp.
name (str): The name or title of the workflow.
description (str): A detailed description of the workflow's purpose and steps.
tags (list[str], optional): Tags associated with the workflow for filtering or
categorization. Default is an empty list.
workspace_id (WorkspaceId | None, optional): The workspace this workflow belongs to.
"""

id: WorkflowId
object: Literal["workflow"] = "workflow"
created_at: UnixDatetime
name: str
description: str
tags: list[str] = Field(default_factory=list)

@classmethod
def create(
cls, workspace_id: WorkspaceId | None, params: WorkflowCreateParams
) -> "Workflow":
return cls(
id=generate_time_ordered_id("wf"),
created_at=now(),
workspace_id=workspace_id,
**params.model_dump(),
)

def modify(self, params: WorkflowModifyParams) -> "Workflow":
update_data = {k: v for k, v in params.model_dump().items() if v is not None}
return Workflow.model_validate(
{
**self.model_dump(),
**update_data,
}
)
111 changes: 111 additions & 0 deletions src/askui/chat/api/workflows/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from typing import Annotated

from fastapi import APIRouter, Header, Path, Query, status

from askui.chat.api.dependencies import ListQueryDep
from askui.chat.api.models import WorkspaceId
from askui.chat.api.workflows.dependencies import WorkflowServiceDep
from askui.chat.api.workflows.models import (
Workflow,
WorkflowCreateParams,
WorkflowId,
WorkflowModifyParams,
)
from askui.chat.api.workflows.service import WorkflowService
from askui.utils.api_utils import ListQuery, ListResponse

router = APIRouter(prefix="/workflows", tags=["workflows"])


@router.get("")
def list_workflows(
askui_workspace: Annotated[WorkspaceId | None, Header()],
tags: Annotated[list[str] | None, Query()] = None,
query: ListQuery = ListQueryDep,
workflow_service: WorkflowService = WorkflowServiceDep,
) -> ListResponse[Workflow]:
"""
List workflows with optional tag filtering.

Args:
askui_workspace: The workspace ID from header
tags: Optional list of tags to filter by
query: Standard list query parameters (limit, after, before, order)
workflow_service: Injected workflow service

Returns:
ListResponse containing workflows matching the criteria
"""
return workflow_service.list_(workspace_id=askui_workspace, query=query, tags=tags)


@router.post("", status_code=status.HTTP_201_CREATED)
def create_workflow(
askui_workspace: Annotated[WorkspaceId | None, Header()],
params: WorkflowCreateParams,
workflow_service: WorkflowService = WorkflowServiceDep,
) -> Workflow:
"""
Create a new workflow.

Args:
askui_workspace: The workspace ID from header
params: Workflow creation parameters (name, description, tags)
workflow_service: Injected workflow service

Returns:
The created workflow
"""
return workflow_service.create(workspace_id=askui_workspace, params=params)


@router.get("/{workflow_id}")
def retrieve_workflow(
askui_workspace: Annotated[WorkspaceId | None, Header()],
workflow_id: Annotated[WorkflowId, Path(...)],
workflow_service: WorkflowService = WorkflowServiceDep,
) -> Workflow:
"""
Retrieve a specific workflow by ID.

Args:
askui_workspace: The workspace ID from header
workflow_id: The workflow ID to retrieve
workflow_service: Injected workflow service

Returns:
The requested workflow

Raises:
NotFoundError: If workflow doesn't exist or user doesn't have access
"""
return workflow_service.retrieve(
workspace_id=askui_workspace, workflow_id=workflow_id
)


@router.patch("/{workflow_id}")
def modify_workflow(
askui_workspace: Annotated[WorkspaceId | None, Header()],
workflow_id: Annotated[WorkflowId, Path(...)],
params: WorkflowModifyParams,
workflow_service: WorkflowService = WorkflowServiceDep,
) -> Workflow:
"""
Modify an existing workflow.

Args:
askui_workspace: The workspace ID from header
workflow_id: The workflow ID to modify
params: Workflow modification parameters (name, description, tags)
workflow_service: Injected workflow service

Returns:
The modified workflow

Raises:
NotFoundError: If workflow doesn't exist or user doesn't have access
"""
return workflow_service.modify(
workspace_id=askui_workspace, workflow_id=workflow_id, params=params
)
107 changes: 107 additions & 0 deletions src/askui/chat/api/workflows/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from pathlib import Path
from typing import Callable

from askui.chat.api.models import WorkspaceId
from askui.chat.api.utils import build_workspace_filter_fn
from askui.chat.api.workflows.models import (
Workflow,
WorkflowCreateParams,
WorkflowId,
WorkflowModifyParams,
)
from askui.utils.api_utils import (
ConflictError,
ListQuery,
ListResponse,
NotFoundError,
list_resources,
)


def _build_workflow_filter_fn(
workspace_id: WorkspaceId | None,
tags: list[str] | None = None,
) -> Callable[[Workflow], bool]:
workspace_filter: Callable[[Workflow], bool] = build_workspace_filter_fn(
workspace_id, Workflow
)

def filter_fn(workflow: Workflow) -> bool:
if not workspace_filter(workflow):
return False
if tags is not None:
return any(tag in workflow.tags for tag in tags)
return True

return filter_fn


class WorkflowService:
def __init__(self, base_dir: Path) -> None:
self._base_dir = base_dir
self._workflows_dir = base_dir / "workflows"

def _get_workflow_path(self, workflow_id: WorkflowId, new: bool = False) -> Path:
workflow_path = self._workflows_dir / f"{workflow_id}.json"
exists = workflow_path.exists()
if new and exists:
error_msg = f"Workflow {workflow_id} already exists"
raise ConflictError(error_msg)
if not new and not exists:
error_msg = f"Workflow {workflow_id} not found"
raise NotFoundError(error_msg)
return workflow_path

def list_(
self,
workspace_id: WorkspaceId | None,
query: ListQuery,
tags: list[str] | None = None,
) -> ListResponse[Workflow]:
return list_resources(
base_dir=self._workflows_dir,
query=query,
resource_type=Workflow,
filter_fn=_build_workflow_filter_fn(workspace_id, tags=tags),
)

def retrieve(
self, workspace_id: WorkspaceId | None, workflow_id: WorkflowId
) -> Workflow:
try:
workflow_path = self._get_workflow_path(workflow_id)
workflow = Workflow.model_validate_json(workflow_path.read_text())

# Check workspace access
if workspace_id is not None and workflow.workspace_id != workspace_id:
error_msg = f"Workflow {workflow_id} not found"
raise NotFoundError(error_msg)

except FileNotFoundError as e:
error_msg = f"Workflow {workflow_id} not found"
raise NotFoundError(error_msg) from e
else:
return workflow

def create(
self, workspace_id: WorkspaceId | None, params: WorkflowCreateParams
) -> Workflow:
workflow = Workflow.create(workspace_id, params)
self._save(workflow, new=True)
return workflow

def modify(
self,
workspace_id: WorkspaceId | None,
workflow_id: WorkflowId,
params: WorkflowModifyParams,
) -> Workflow:
workflow = self.retrieve(workspace_id, workflow_id)
modified = workflow.modify(params)
self._save(modified)
return modified

def _save(self, workflow: Workflow, new: bool = False) -> None:
self._workflows_dir.mkdir(parents=True, exist_ok=True)
workflow_file = self._get_workflow_path(workflow.id, new=new)
workflow_file.write_text(workflow.model_dump_json(), encoding="utf-8")