Skip to content

Commit 3929022

Browse files
Alex Holmbergclaude
authored andcommitted
feat(22-01): create AgentProcessor module with session management
- Add AgentProcessor struct with message_rx, event_bridge, sessions - Add ProcessorConfig for provider/model/max_turns settings - Add ThreadSession for per-thread conversation state - Add history management with rig::completion::Message - Add extract_user_input for AG-UI message parsing - Add process_message placeholder for LLM integration - 7 tests for processor components Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 5ffb9aa commit 3929022

2 files changed

Lines changed: 365 additions & 0 deletions

File tree

src/server/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//! ```
4040
4141
pub mod bridge;
42+
pub mod processor;
4243
pub mod routes;
4344

4445
use std::net::SocketAddr;
@@ -49,6 +50,7 @@ use axum::{routing::{get, post}, Router};
4950
use tokio::sync::{broadcast, mpsc, RwLock};
5051

5152
pub use bridge::EventBridge;
53+
pub use processor::{AgentProcessor, ProcessorConfig, ThreadSession};
5254

5355
// Re-export types needed for message handling
5456
pub use ag_ui_core::types::{Context, Message as AgUiMessage, RunAgentInput, Tool};

src/server/processor.rs

Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
//! Agent Processor - Routes frontend messages to agent for processing.
2+
//!
3+
//! This module provides the `AgentProcessor` which consumes messages from
4+
//! the frontend (via WebSocket/POST) and processes them through the LLM,
5+
//! emitting AG-UI events for the response.
6+
//!
7+
//! # Architecture
8+
//!
9+
//! ```text
10+
//! Frontend → WebSocket/POST → message channel → AgentProcessor
11+
//! ↓
12+
//! LLM (multi_turn)
13+
//! ↓
14+
//! EventBridge → SSE/WS → Frontend
15+
//! ```
16+
17+
use std::collections::HashMap;
18+
use std::time::Instant;
19+
20+
use ag_ui_core::{Role, RunId, ThreadId};
21+
use rig::completion::message::{AssistantContent, UserContent};
22+
use rig::completion::Message as RigMessage;
23+
use rig::one_or_many::OneOrMany;
24+
use tokio::sync::mpsc;
25+
use tracing::{debug, info};
26+
27+
use super::{AgentMessage, EventBridge};
28+
29+
/// Configuration for the agent processor.
30+
#[derive(Debug, Clone)]
31+
pub struct ProcessorConfig {
32+
/// LLM provider to use (openai, anthropic, bedrock).
33+
pub provider: String,
34+
/// Model name/ID.
35+
pub model: String,
36+
/// Maximum number of tool call iterations.
37+
pub max_turns: usize,
38+
}
39+
40+
impl Default for ProcessorConfig {
41+
fn default() -> Self {
42+
Self {
43+
provider: "openai".to_string(),
44+
model: "gpt-4o".to_string(),
45+
max_turns: 50,
46+
}
47+
}
48+
}
49+
50+
impl ProcessorConfig {
51+
/// Creates a new config with default values.
52+
pub fn new() -> Self {
53+
Self::default()
54+
}
55+
56+
/// Sets the provider.
57+
pub fn with_provider(mut self, provider: impl Into<String>) -> Self {
58+
self.provider = provider.into();
59+
self
60+
}
61+
62+
/// Sets the model.
63+
pub fn with_model(mut self, model: impl Into<String>) -> Self {
64+
self.model = model.into();
65+
self
66+
}
67+
68+
/// Sets the maximum number of turns.
69+
pub fn with_max_turns(mut self, max_turns: usize) -> Self {
70+
self.max_turns = max_turns;
71+
self
72+
}
73+
}
74+
75+
/// Per-thread session state for conversation isolation.
76+
#[derive(Debug)]
77+
pub struct ThreadSession {
78+
/// Thread ID for this session.
79+
pub thread_id: ThreadId,
80+
/// Raw chat history for multi-turn conversations.
81+
pub history: Vec<RigMessage>,
82+
/// When this session was created.
83+
pub created_at: Instant,
84+
/// Number of turns in this session.
85+
pub turn_count: usize,
86+
}
87+
88+
impl ThreadSession {
89+
/// Creates a new thread session.
90+
pub fn new(thread_id: ThreadId) -> Self {
91+
Self {
92+
thread_id,
93+
history: Vec::new(),
94+
created_at: Instant::now(),
95+
turn_count: 0,
96+
}
97+
}
98+
99+
/// Adds a user message to history.
100+
pub fn add_user_message(&mut self, content: &str) {
101+
self.history.push(RigMessage::User {
102+
content: OneOrMany::one(UserContent::text(content)),
103+
});
104+
}
105+
106+
/// Adds an assistant message to history.
107+
pub fn add_assistant_message(&mut self, content: &str) {
108+
self.history.push(RigMessage::Assistant {
109+
id: None,
110+
content: OneOrMany::one(AssistantContent::text(content)),
111+
});
112+
self.turn_count += 1;
113+
}
114+
}
115+
116+
/// Processes frontend messages through the LLM agent.
117+
///
118+
/// The processor maintains per-thread sessions for conversation isolation
119+
/// and emits AG-UI events via the EventBridge during processing.
120+
pub struct AgentProcessor {
121+
/// Receiver for messages from frontend.
122+
message_rx: mpsc::Receiver<AgentMessage>,
123+
/// Event bridge for emitting AG-UI events.
124+
event_bridge: EventBridge,
125+
/// Per-thread session state.
126+
sessions: HashMap<ThreadId, ThreadSession>,
127+
/// Processor configuration.
128+
config: ProcessorConfig,
129+
}
130+
131+
impl AgentProcessor {
132+
/// Creates a new agent processor.
133+
///
134+
/// # Arguments
135+
/// * `message_rx` - Receiver for messages from frontend
136+
/// * `event_bridge` - Bridge for emitting AG-UI events
137+
/// * `config` - Processor configuration
138+
pub fn new(
139+
message_rx: mpsc::Receiver<AgentMessage>,
140+
event_bridge: EventBridge,
141+
config: ProcessorConfig,
142+
) -> Self {
143+
Self {
144+
message_rx,
145+
event_bridge,
146+
sessions: HashMap::new(),
147+
config,
148+
}
149+
}
150+
151+
/// Creates a processor with default configuration.
152+
pub fn with_defaults(
153+
message_rx: mpsc::Receiver<AgentMessage>,
154+
event_bridge: EventBridge,
155+
) -> Self {
156+
Self::new(message_rx, event_bridge, ProcessorConfig::default())
157+
}
158+
159+
/// Gets or creates a session for the given thread ID.
160+
fn get_or_create_session(&mut self, thread_id: &ThreadId) -> &mut ThreadSession {
161+
self.sessions
162+
.entry(thread_id.clone())
163+
.or_insert_with(|| ThreadSession::new(thread_id.clone()))
164+
}
165+
166+
/// Gets the current session count.
167+
pub fn session_count(&self) -> usize {
168+
self.sessions.len()
169+
}
170+
171+
/// Gets the configuration.
172+
pub fn config(&self) -> &ProcessorConfig {
173+
&self.config
174+
}
175+
176+
/// Extracts the user message content from RunAgentInput messages.
177+
///
178+
/// Returns the last user message content, or None if no user messages.
179+
fn extract_user_input(
180+
&self,
181+
messages: &[ag_ui_core::types::Message],
182+
) -> Option<String> {
183+
// Find the last user message and extract its content
184+
messages
185+
.iter()
186+
.rev()
187+
.find(|m| m.role() == Role::User)
188+
.and_then(|m| m.content().map(|s| s.to_string()))
189+
}
190+
191+
/// Processes a single message through the agent.
192+
///
193+
/// This is the core processing method that:
194+
/// 1. Emits RunStarted
195+
/// 2. Processes through LLM (simplified for now - echoes back)
196+
/// 3. Emits TextMessage events
197+
/// 4. Updates session history
198+
/// 5. Emits RunFinished
199+
async fn process_message(
200+
&mut self,
201+
thread_id: ThreadId,
202+
_run_id: RunId,
203+
user_input: String,
204+
) {
205+
info!(
206+
thread_id = %thread_id,
207+
input_len = user_input.len(),
208+
"Processing message"
209+
);
210+
211+
// Get or create session
212+
let session = self.get_or_create_session(&thread_id);
213+
session.add_user_message(&user_input);
214+
215+
// Emit run started
216+
self.event_bridge.start_run().await;
217+
218+
// Start thinking
219+
self.event_bridge.start_thinking(Some("Processing")).await;
220+
221+
// TODO: In Phase 23+, this will be replaced with actual LLM call
222+
// For now, generate a simple response to verify the pipeline works
223+
let response = format!(
224+
"I received your message: \"{}\". \
225+
(This is a placeholder response - LLM integration coming in Phase 23+)",
226+
if user_input.len() > 50 {
227+
format!("{}...", &user_input[..50])
228+
} else {
229+
user_input.clone()
230+
}
231+
);
232+
233+
self.event_bridge.end_thinking().await;
234+
235+
// Emit the response as text message
236+
self.event_bridge.start_message().await;
237+
238+
// Stream the response in chunks to demonstrate streaming capability
239+
for chunk in response.chars().collect::<Vec<_>>().chunks(10) {
240+
let chunk_str: String = chunk.iter().collect();
241+
self.event_bridge.emit_text_chunk(&chunk_str).await;
242+
}
243+
244+
self.event_bridge.end_message().await;
245+
246+
// Update session
247+
let session = self.get_or_create_session(&thread_id);
248+
session.add_assistant_message(&response);
249+
250+
debug!(
251+
thread_id = %thread_id,
252+
turn_count = session.turn_count,
253+
"Message processed"
254+
);
255+
256+
// Finish the run
257+
self.event_bridge.finish_run().await;
258+
}
259+
}
260+
261+
#[cfg(test)]
262+
mod tests {
263+
use super::*;
264+
use tokio::sync::broadcast;
265+
use std::sync::Arc;
266+
use tokio::sync::RwLock;
267+
268+
fn create_test_processor() -> (AgentProcessor, mpsc::Sender<AgentMessage>) {
269+
let (msg_tx, msg_rx) = mpsc::channel(100);
270+
let (event_tx, _) = broadcast::channel(100);
271+
let bridge = EventBridge::new(
272+
event_tx,
273+
Arc::new(RwLock::new(ThreadId::random())),
274+
Arc::new(RwLock::new(None)),
275+
);
276+
let processor = AgentProcessor::with_defaults(msg_rx, bridge);
277+
(processor, msg_tx)
278+
}
279+
280+
#[test]
281+
fn test_processor_config_default() {
282+
let config = ProcessorConfig::default();
283+
assert_eq!(config.provider, "openai");
284+
assert_eq!(config.model, "gpt-4o");
285+
assert_eq!(config.max_turns, 50);
286+
}
287+
288+
#[test]
289+
fn test_processor_config_builder() {
290+
let config = ProcessorConfig::new()
291+
.with_provider("anthropic")
292+
.with_model("claude-3-opus")
293+
.with_max_turns(100);
294+
assert_eq!(config.provider, "anthropic");
295+
assert_eq!(config.model, "claude-3-opus");
296+
assert_eq!(config.max_turns, 100);
297+
}
298+
299+
#[test]
300+
fn test_thread_session_new() {
301+
let thread_id = ThreadId::random();
302+
let session = ThreadSession::new(thread_id.clone());
303+
assert_eq!(session.thread_id, thread_id);
304+
assert!(session.history.is_empty());
305+
assert_eq!(session.turn_count, 0);
306+
}
307+
308+
#[test]
309+
fn test_thread_session_add_messages() {
310+
let mut session = ThreadSession::new(ThreadId::random());
311+
312+
session.add_user_message("Hello");
313+
assert_eq!(session.history.len(), 1);
314+
assert_eq!(session.turn_count, 0); // User message doesn't increment turn
315+
316+
session.add_assistant_message("Hi there!");
317+
assert_eq!(session.history.len(), 2);
318+
assert_eq!(session.turn_count, 1); // Assistant message increments turn
319+
}
320+
321+
#[test]
322+
fn test_processor_creation() {
323+
let (processor, _tx) = create_test_processor();
324+
assert_eq!(processor.session_count(), 0);
325+
assert_eq!(processor.config().provider, "openai");
326+
}
327+
328+
#[test]
329+
fn test_get_or_create_session() {
330+
let (mut processor, _tx) = create_test_processor();
331+
let thread_id = ThreadId::random();
332+
333+
// First call creates new session
334+
let session = processor.get_or_create_session(&thread_id);
335+
assert_eq!(session.turn_count, 0);
336+
337+
// Add a message
338+
session.add_user_message("test");
339+
340+
// Second call returns same session
341+
let session = processor.get_or_create_session(&thread_id);
342+
assert_eq!(session.history.len(), 1);
343+
}
344+
345+
#[tokio::test]
346+
async fn test_process_message() {
347+
let (mut processor, _tx) = create_test_processor();
348+
let thread_id = ThreadId::random();
349+
let run_id = RunId::random();
350+
351+
processor.process_message(
352+
thread_id.clone(),
353+
run_id,
354+
"Hello, agent!".to_string(),
355+
).await;
356+
357+
// Check session was updated
358+
assert_eq!(processor.session_count(), 1);
359+
let session = processor.sessions.get(&thread_id).unwrap();
360+
assert_eq!(session.turn_count, 1);
361+
assert_eq!(session.history.len(), 2); // user + assistant
362+
}
363+
}

0 commit comments

Comments
 (0)