-
Notifications
You must be signed in to change notification settings - Fork 5
Unified Event Streaming API #80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+5,076
−2,423
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- Add event.proto with EventService.Subscribe RPC - Define Event message with payload types for task, message, agent, model, modelprovider, and tool events - Add EventAction enum (created, updated, deleted, unspecified) - Remove obsolete Subscribe RPC and related types from task.proto - Support glob pattern filtering, task scope filtering, and message replay Co-authored-by: construct-agent <noreply@construct.sh>
- Add EventRouter with pattern-based event filtering and subscription management - Implement glob pattern matching for event types (*, entity.*, *.action, exact) - Support task scope filtering for subscriptions - Add comprehensive tests for router functionality - Remove MessageHub completely (direct replacement approach) - Update TaskReconciler and Runtime to remove MessageHub dependencies - Update API handlers to remove MessageHub references - Add domain types for tool events (ToolCallEvent, ToolResultEvent) - Update ToolEventPublisher to use domain event types Co-authored-by: construct-agent <noreply@construct.sh>
- Add backend/tool/types package with ToolInput, ToolOutput, ToolCallEvent, and ToolResultEvent types to avoid import cycles between packages - Add ToolInputFrom and ToolOutputFrom conversion functions with proper error handling for unknown types - Add event constructors for all event types (task, message, agent, model, model provider, and tool events) - Update codeact interceptor to use the new tool/types package - Add comprehensive tests using cmp.Diff pattern Co-authored-by: construct-agent <noreply@construct.sh>
Register hooks on memory.Client for agent, model, modelprovider, and task entities to automatically publish events when entities are created, updated, or deleted. Events published: - agent.created, agent.updated, agent.deleted - model.created, model.updated, model.deleted - modelprovider.created, modelprovider.updated, modelprovider.deleted - task.created, task.deleted (NOT task.updated - that comes from reconciler) Co-authored-by: construct-agent <noreply@construct.sh>
- Add eventRouter field to TaskReconciler and Runtime structs - Create EventRouter in runtime initialization and register ent hooks - Implement eventRouterToolPublisher to publish tool.called and tool.result events via EventRouter instead of the noop publisher - Update TaskReconciler to emit events: - task.updated on phase changes (with previous_phase, in a transaction) - message.created when model responses are persisted - message.chunk during streaming (with chunk index tracking) - Remove unused v1 proto import from task_reconciler.go - Remove deprecated publishMessage/publishTaskEvent TODOs Co-authored-by: construct-agent <noreply@construct.sh>
This commit completes the Event Streaming API implementation: - Add EventService handler (backend/api/event.go) implementing Subscribe RPC with message replay support - Add domain-to-proto conversion functions (backend/api/conv/event.go) - Register EventHandler in API server and pass EventRouter - Add EventServiceClient to API client package - Update CLI commands (new, exec, resume) to use EventService.Subscribe instead of the old TaskService.Subscribe - Fix TaskEvent proto usage in terminal session Co-authored-by: construct-agent <noreply@construct.sh>
… proto - Add MessageChunkEvent handling in message_feed.go for streaming text - Update new.go and resume.go to process message.chunk events from EventService - Remove ContentStatus enum from message.proto (streaming now uses MessageChunkEvent) - Remove WithStatus function from runtime.go as it's no longer needed Co-authored-by: construct-agent <noreply@construct.sh>
- Add ToolCalledEvent and ToolResultEvent handlers in message_feed.go Update - Subscribe to tool.* events in both new.go and resume.go - Process tool events to display tool calls in real-time Tool calls are now displayed as they stream, rather than only when the final message arrives. Co-authored-by: construct-agent <noreply@construct.sh>
The reconciler publishes user messages when it processes them, not when they're created via the API. This ensures user messages are displayed even when sent while the agent is working. Co-authored-by: construct-agent <noreply@construct.sh>
When subscribing to events with a TaskID but no ReplayAfterMessageID, the EventService now replays all messages for that task. This allows the resume command to show the full conversation history. Previously, replay only worked when a specific message ID was provided to replay after, which meant resumed sessions started with a blank message feed. Co-authored-by: construct-agent <noreply@construct.sh>
Consolidate two event systems (Bus and EventRouter) into a single EventRouter by routing internal coordination events through it using the internal.* prefix pattern. - Add internal.task.trigger and internal.task.suspend event types - Update TaskReconciler to subscribe to internal events via EventRouter - Update MessageHandler and TaskHandler to publish internal events - Remove EventBus from API handlers and Runtime - Add filtering to prevent external consumers from subscribing to internal events (both pattern filtering and match-time filtering) - Delete deprecated files: bus.go, events.go, metrics.go, eventbus_test.go, message_stream.go Co-authored-by: construct-agent <noreply@construct.sh>
Ent is not threadsafe
Use a slice instead of a map to store provider entries, ensuring providers are deleted in the order specified on the command line. Map iteration order in Go is not guaranteed, which caused test flakiness. Co-authored-by: construct-agent <noreply@construct.sh>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
Replaces the task-scoped
MessageHubwith a unified Event Streaming API that supports pattern filtering, task scoping, and message replay.Changes
Event Infrastructure
EventServiceproto withSubscribeRPC for streaming eventsEventRouterwith glob pattern matching (*,entity.*,*.action)task.*,message.*,agent.*,model.*,modelprovider.*,tool.*Task Reconciler Integration
task.updatedon phase changesmessage.created,message.updated,message.chunkduring conversation flowtool.calledandtool.resultfor real-time tool visibilityAPI & Replay
EventServicehandler with message replay supportCLI Updates
task.*,message.*,tool.*events innewandresumecommandsMessageChunkEventfor streaming text displayToolCalledEventandToolResultEventfor real-time tool displayCleanup
MessageHub,EventBus, and related metricsContentStatusfrom message proto (replaced byMessageChunkEvent)Known Issues
Tool calls display as empty messages on task resume due to different conversion paths for live vs replayed tool data. Tracked for follow-up work.