diff --git a/.gitignore b/.gitignore index 246d7df9..0c6eb7b2 100644 --- a/.gitignore +++ b/.gitignore @@ -129,6 +129,7 @@ venv/ ENV/ env.bak/ venv.bak/ +.env.tmp # Spyder project settings .spyderproject diff --git a/src/askui/chat/api/runs/events/io_publisher.py b/src/askui/chat/api/runs/events/io_publisher.py new file mode 100644 index 00000000..f0893010 --- /dev/null +++ b/src/askui/chat/api/runs/events/io_publisher.py @@ -0,0 +1,43 @@ +"""IO publisher for publishing events to stdout.""" + +import json +import sys +from typing import Any + +from askui.chat.api.runs.events.events import Event +from askui.chat.api.settings import Settings + + +class IOPublisher: + """Publisher that serializes events to JSON and writes to stdout.""" + + def __init__(self, enabled: bool) -> None: + """ + Initialize the IO publisher. + + Args: + settings: The settings instance containing configuration for the IO publisher. + """ + self._enabled = enabled + + def publish(self, event: Event) -> None: + """ + Publish an event by serializing it to JSON and writing to stdout. + + If the publisher is disabled, this method does nothing. + + Args: + event: The event to publish + """ + if not self._enabled: + return + + try: + event_dict: dict[str, Any] = event.model_dump(mode="json") + event_json = json.dumps(event_dict) + + sys.stdout.write(event_json + "\n") + sys.stdout.flush() + except (TypeError, ValueError, AttributeError, OSError) as e: + sys.stderr.write(f"Error publishing event: {e}\n") + sys.stderr.flush() diff --git a/src/askui/chat/api/runs/service.py b/src/askui/chat/api/runs/service.py index 5e785935..726c29f9 100644 --- a/src/askui/chat/api/runs/service.py +++ b/src/askui/chat/api/runs/service.py @@ -12,6 +12,7 @@ from askui.chat.api.messages.chat_history_manager import ChatHistoryManager from askui.chat.api.models import RunId, ThreadId, WorkspaceId from askui.chat.api.runs.events.events import DoneEvent, ErrorEvent, Event, RunEvent +from askui.chat.api.runs.events.io_publisher import IOPublisher from askui.chat.api.runs.events.service import EventService from askui.chat.api.runs.models import ( Run, @@ -43,6 +44,7 @@ def __init__( self._chat_history_manager = chat_history_manager self._settings = settings self._event_service = EventService(settings.data_dir, self) + self._io_publisher = IOPublisher(settings.enable_io_events) def _find_by_id( self, workspace_id: WorkspaceId | None, thread_id: ThreadId, run_id: RunId @@ -136,6 +138,7 @@ async def run_runner() -> None: if isinstance(event, DoneEvent) or isinstance( event, ErrorEvent ): + self._io_publisher.publish(event) break except anyio.EndOfStream: break diff --git a/src/askui/chat/api/settings.py b/src/askui/chat/api/settings.py index 7b463d00..59f7a117 100644 --- a/src/askui/chat/api/settings.py +++ b/src/askui/chat/api/settings.py @@ -118,3 +118,7 @@ class Settings(BaseSettings): default_factory=OtelSettings, description="OpenTelemetry configuration settings", ) + enable_io_events: bool = Field( + default=False, + description="Whether to enable the publishing events to stdout", + )