@@ -188,6 +188,47 @@ impl AgentProcessor {
188188 . and_then ( |m| m. content ( ) . map ( |s| s. to_string ( ) ) )
189189 }
190190
191+ /// Runs the message processing loop.
192+ ///
193+ /// This method consumes messages from the channel and processes each one
194+ /// through the agent. It runs until the channel is closed.
195+ pub async fn run ( & mut self ) {
196+ info ! ( "AgentProcessor starting message processing loop" ) ;
197+
198+ while let Some ( msg) = self . message_rx . recv ( ) . await {
199+ let input = msg. input ;
200+ let thread_id = input. thread_id . clone ( ) ;
201+ let run_id = input. run_id . clone ( ) ;
202+
203+ debug ! (
204+ thread_id = %thread_id,
205+ run_id = %run_id,
206+ message_count = input. messages. len( ) ,
207+ "Received message from frontend"
208+ ) ;
209+
210+ // Extract user input from messages
211+ match self . extract_user_input ( & input. messages ) {
212+ Some ( user_input) => {
213+ self . process_message ( thread_id, run_id, user_input) . await ;
214+ }
215+ None => {
216+ debug ! (
217+ thread_id = %thread_id,
218+ "No user message found in input, skipping"
219+ ) ;
220+ // Emit error event
221+ self . event_bridge . start_run ( ) . await ;
222+ self . event_bridge
223+ . finish_run_with_error ( "No user message found in input" )
224+ . await ;
225+ }
226+ }
227+ }
228+
229+ info ! ( "AgentProcessor message channel closed, shutting down" ) ;
230+ }
231+
191232 /// Processes a single message through the agent.
192233 ///
193234 /// This is the core processing method that:
@@ -360,4 +401,103 @@ mod tests {
360401 assert_eq ! ( session. turn_count, 1 ) ;
361402 assert_eq ! ( session. history. len( ) , 2 ) ; // user + assistant
362403 }
404+
405+ #[ tokio:: test]
406+ async fn test_run_processes_messages ( ) {
407+ use ag_ui_core:: types:: { Message as AgUiProtocolMessage , RunAgentInput } ;
408+ use ag_ui_core:: Event ;
409+ use tokio:: sync:: broadcast;
410+
411+ let ( msg_tx, msg_rx) = mpsc:: channel ( 100 ) ;
412+ let ( event_tx, mut event_rx) = broadcast:: channel ( 100 ) ;
413+
414+ let bridge = EventBridge :: new (
415+ event_tx,
416+ Arc :: new ( RwLock :: new ( ThreadId :: random ( ) ) ) ,
417+ Arc :: new ( RwLock :: new ( None ) ) ,
418+ ) ;
419+
420+ let mut processor = AgentProcessor :: with_defaults ( msg_rx, bridge) ;
421+
422+ // Spawn processor in background
423+ let handle = tokio:: spawn ( async move {
424+ processor. run ( ) . await ;
425+ } ) ;
426+
427+ // Send a message
428+ let thread_id = ThreadId :: random ( ) ;
429+ let run_id = RunId :: random ( ) ;
430+ let input = RunAgentInput :: new ( thread_id. clone ( ) , run_id. clone ( ) )
431+ . with_messages ( vec ! [ AgUiProtocolMessage :: new_user( "Hello from test" ) ] ) ;
432+
433+ let agent_msg = super :: super :: AgentMessage :: new ( input) ;
434+ msg_tx. send ( agent_msg) . await . expect ( "Should send" ) ;
435+
436+ // Verify we receive RunStarted event
437+ let event = tokio:: time:: timeout (
438+ std:: time:: Duration :: from_millis ( 100 ) ,
439+ event_rx. recv ( )
440+ ) . await . expect ( "Should receive event in time" ) . expect ( "Should have event" ) ;
441+
442+ assert ! ( matches!( event, Event :: RunStarted ( _) ) ) ;
443+
444+ // Drop sender to close channel and stop processor
445+ drop ( msg_tx) ;
446+
447+ // Wait for processor to finish
448+ tokio:: time:: timeout (
449+ std:: time:: Duration :: from_millis ( 100 ) ,
450+ handle
451+ ) . await . expect ( "Processor should finish" ) . expect ( "Should not panic" ) ;
452+ }
453+
454+ #[ tokio:: test]
455+ async fn test_run_handles_empty_messages ( ) {
456+ use ag_ui_core:: types:: RunAgentInput ;
457+ use ag_ui_core:: Event ;
458+ use tokio:: sync:: broadcast;
459+
460+ let ( msg_tx, msg_rx) = mpsc:: channel ( 100 ) ;
461+ let ( event_tx, mut event_rx) = broadcast:: channel ( 100 ) ;
462+
463+ let bridge = EventBridge :: new (
464+ event_tx,
465+ Arc :: new ( RwLock :: new ( ThreadId :: random ( ) ) ) ,
466+ Arc :: new ( RwLock :: new ( None ) ) ,
467+ ) ;
468+
469+ let mut processor = AgentProcessor :: with_defaults ( msg_rx, bridge) ;
470+
471+ // Spawn processor in background
472+ let handle = tokio:: spawn ( async move {
473+ processor. run ( ) . await ;
474+ } ) ;
475+
476+ // Send a message with no user content
477+ let thread_id = ThreadId :: random ( ) ;
478+ let run_id = RunId :: random ( ) ;
479+ let input = RunAgentInput :: new ( thread_id. clone ( ) , run_id. clone ( ) ) ;
480+ // Note: no messages added
481+
482+ let agent_msg = super :: super :: AgentMessage :: new ( input) ;
483+ msg_tx. send ( agent_msg) . await . expect ( "Should send" ) ;
484+
485+ // Should receive RunStarted then RunError
486+ let event = tokio:: time:: timeout (
487+ std:: time:: Duration :: from_millis ( 100 ) ,
488+ event_rx. recv ( )
489+ ) . await . expect ( "Should receive event" ) . expect ( "Should have event" ) ;
490+
491+ assert ! ( matches!( event, Event :: RunStarted ( _) ) ) ;
492+
493+ let event = tokio:: time:: timeout (
494+ std:: time:: Duration :: from_millis ( 100 ) ,
495+ event_rx. recv ( )
496+ ) . await . expect ( "Should receive event" ) . expect ( "Should have event" ) ;
497+
498+ assert ! ( matches!( event, Event :: RunError ( _) ) ) ;
499+
500+ drop ( msg_tx) ;
501+ let _ = tokio:: time:: timeout ( std:: time:: Duration :: from_millis ( 100 ) , handle) . await ;
502+ }
363503}
0 commit comments