From 6c6b1c4c54f6481701a1d20992c8f12533ec8cd5 Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Tue, 31 Mar 2026 21:54:32 -0400 Subject: [PATCH 1/5] fix(memory): tighten persistence rules and add conversational events --- src/tools/memory_persistence_complete.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/tools/memory_persistence_complete.rs b/src/tools/memory_persistence_complete.rs index e7e9c8b8f..4648832bd 100644 --- a/src/tools/memory_persistence_complete.rs +++ b/src/tools/memory_persistence_complete.rs @@ -416,11 +416,22 @@ mod tests { .await .expect("timed out waiting for working memory events"); assert_eq!(events.len(), 2); +<<<<<<< HEAD assert!(events.iter().any(|event| { event.event_type == crate::memory::WorkingMemoryEventType::UserCorrection })); assert!(events.iter().any(|event| { event.event_type == crate::memory::WorkingMemoryEventType::DecisionRevised })); +======= + assert_eq!( + events[0].event_type, + crate::memory::WorkingMemoryEventType::UserCorrection + ); + assert_eq!( + events[1].event_type, + crate::memory::WorkingMemoryEventType::DecisionRevised + ); +>>>>>>> af646041 (fix(memory): tighten persistence rules and add conversational events) } } From 87aafc481cdc629e222cbc7c4b8a0f22e919bd1a Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Tue, 31 Mar 2026 22:54:17 -0400 Subject: [PATCH 2/5] feat(memory): expand conversational event semantics --- prompts/en/memory_persistence.md.j2 | 6 +- src/agent/channel.rs | 123 ++++++++++++++++-- src/memory/working.rs | 26 +++- src/tools/memory_persistence_complete.rs | 7 +- src/tools/send_agent_message.rs | 144 ++++++++++++++++++++- src/tools/spawn_worker.rs | 13 ++ src/tools/task_create.rs | 134 ++++++++++++++++++- src/tools/task_update.rs | 156 ++++++++++++++++++++++- 8 files changed, 590 insertions(+), 19 deletions(-) diff --git a/prompts/en/memory_persistence.md.j2 b/prompts/en/memory_persistence.md.j2 index 3a60a0bd5..2b2932313 100644 --- a/prompts/en/memory_persistence.md.j2 +++ b/prompts/en/memory_persistence.md.j2 @@ -32,13 +32,17 @@ This is an automatic process triggered periodically during conversation. You are - `"decision"` for commitments or choices made - `"user_correction"` when the user corrects a prior assumption, instruction, or framing - `"decision_revised"` when a prior choice changes after feedback or new information + - `"deadline_set"` when the conversation establishes a concrete due date, deadline, or scheduled milestone + - `"blocked_on"` when progress is waiting on an approval, dependency, missing input, or external action + - `"constraint"` when the user or system states a hard requirement, limitation, or non-negotiable boundary + - `"outcome"` when a task, branch, or delegated step reaches a clear terminal result worth retaining in temporal memory - `"error"` for failures or problems - `"system"` for other notable events - `summary`: one-line description of what happened - Normalize relative time references to absolute dates/times with timezone (for example `2026-03-31T14:20:00-04:00`) so downstream memory checks are stable across sessions. - - `importance`: 0.0-1.0 score (`decision`, `user_correction`, and `decision_revised` are typically 0.6-0.8) + - `importance`: 0.0-1.0 score (`decision`, `user_correction`, `decision_revised`, `blocked_on`, and `outcome` are typically 0.6-0.8) - Events feed the agent's temporal working memory — they help the agent remember *what happened today*, not just facts. 5. **Finish with the terminal tool.** You must call `memory_persistence_complete` before finishing: diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 1539bda2e..4fd53c1c8 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -82,6 +82,55 @@ fn should_flush_coalesce_buffer_for_event(event: &ProcessEvent) -> bool { ) } +fn classify_conversational_event_summary( + summary: &str, + default_event_type: crate::memory::WorkingMemoryEventType, +) -> (crate::memory::WorkingMemoryEventType, String) { + let trimmed = summary.trim(); + if trimmed.is_empty() { + return (default_event_type, String::new()); + } + + if let Some((prefix, rest)) = trimmed.split_once(':') { + let rest_trimmed = rest.trim(); + match prefix.trim().to_ascii_lowercase().as_str() { + "outcome" => { + return ( + crate::memory::WorkingMemoryEventType::Outcome, + if rest_trimmed.is_empty() { + trimmed.to_string() + } else { + rest_trimmed.to_string() + }, + ); + } + "blocked_on" => { + return ( + crate::memory::WorkingMemoryEventType::BlockedOn, + if rest_trimmed.is_empty() { + trimmed.to_string() + } else { + rest_trimmed.to_string() + }, + ); + } + "constraint" => { + return ( + crate::memory::WorkingMemoryEventType::Constraint, + if rest_trimmed.is_empty() { + trimmed.to_string() + } else { + rest_trimmed.to_string() + }, + ); + } + _ => {} + } + } + + (default_event_type, trimmed.to_string()) +} + /// Shared state that channel tools need to act on the channel. /// /// Wrapped in Arc and passed to tools (branch, spawn_worker, route, cancel) @@ -3008,11 +3057,26 @@ impl Channel { } else { conclusion.clone() }; + let (event_type, event_summary) = classify_conversational_event_summary( + &summary, + crate::memory::WorkingMemoryEventType::BranchCompleted, + ); self.deps .working_memory .emit( - crate::memory::WorkingMemoryEventType::BranchCompleted, - format!("Branch concluded: {summary}"), + event_type, + match event_type { + crate::memory::WorkingMemoryEventType::Outcome => { + format!("Branch outcome: {event_summary}") + } + crate::memory::WorkingMemoryEventType::BlockedOn => { + format!("Branch blocked on: {event_summary}") + } + crate::memory::WorkingMemoryEventType::Constraint => { + format!("Branch constraint: {event_summary}") + } + _ => format!("Branch concluded: {event_summary}"), + }, ) .channel(self.id.to_string()) .importance(0.7) @@ -3081,19 +3145,31 @@ impl Channel { } else { result.clone() }; - let event_type = if *success { + let default_event_type = if *success { crate::memory::WorkingMemoryEventType::WorkerCompleted } else { crate::memory::WorkingMemoryEventType::Error }; + let (event_type, event_summary) = + classify_conversational_event_summary(&worker_summary, default_event_type); self.deps .working_memory .emit( event_type, - if *success { - format!("Worker completed: {worker_summary}") - } else { - format!("Worker failed: {worker_summary}") + match event_type { + crate::memory::WorkingMemoryEventType::Outcome => { + format!("Worker outcome: {event_summary}") + } + crate::memory::WorkingMemoryEventType::BlockedOn => { + format!("Worker blocked on: {event_summary}") + } + crate::memory::WorkingMemoryEventType::Constraint => { + format!("Worker constraint: {event_summary}") + } + crate::memory::WorkingMemoryEventType::Error => { + format!("Worker failed: {event_summary}") + } + _ => format!("Worker completed: {event_summary}"), }, ) .channel(self.id.to_string()) @@ -3674,8 +3750,9 @@ mod tests { ObserveModeFallbackState, compute_listen_mode_invocation, is_dm_conversation_id, recv_channel_event, should_process_event_for_channel, should_send_discord_quiet_mode_ping_ack, should_send_quiet_mode_fallback, + classify_conversational_event_summary, }; - use crate::memory::MemoryType; + use crate::memory::{MemoryType, WorkingMemoryEventType}; use crate::{AgentId, ChannelId, InboundMessage, MessageContent, ProcessEvent, ProcessId}; use std::collections::HashMap; use std::sync::Arc; @@ -3844,6 +3921,36 @@ mod tests { assert!(!should_process_event_for_channel(&event, &channel_id)); } + #[test] + fn conversational_event_summary_extracts_outcome_prefix() { + let (event_type, summary) = classify_conversational_event_summary( + "outcome: implemented the migration safety check", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::Outcome); + assert_eq!(summary, "implemented the migration safety check"); + } + + #[test] + fn conversational_event_summary_extracts_blocked_on_prefix() { + let (event_type, summary) = classify_conversational_event_summary( + "blocked_on: waiting for review from infra", + WorkingMemoryEventType::Error, + ); + assert_eq!(event_type, WorkingMemoryEventType::BlockedOn); + assert_eq!(summary, "waiting for review from infra"); + } + + #[test] + fn conversational_event_summary_falls_back_to_default_type() { + let (event_type, summary) = classify_conversational_event_summary( + "completed with no blockers", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::WorkerCompleted); + assert_eq!(summary, "completed with no blockers"); + } + #[test] fn quiet_mode_invocation_uses_discord_mention_and_reply_metadata() { let message = inbound_message( diff --git a/src/memory/working.rs b/src/memory/working.rs index 7d1bff0d2..06506de40 100644 --- a/src/memory/working.rs +++ b/src/memory/working.rs @@ -43,6 +43,14 @@ pub enum WorkingMemoryEventType { UserCorrection, /// A prior decision was revised. DecisionRevised, + /// A concrete deadline or due date was set. + DeadlineSet, + /// Progress is currently blocked on an external dependency or prerequisite. + BlockedOn, + /// An explicit constraint was stated. + Constraint, + /// A task or branch reached a terminal result. + Outcome, /// An error or failure occurred. Error, /// A task was created or updated. @@ -68,6 +76,10 @@ impl WorkingMemoryEventType { Self::Decision => "decision", Self::UserCorrection => "user_correction", Self::DecisionRevised => "decision_revised", + Self::DeadlineSet => "deadline_set", + Self::BlockedOn => "blocked_on", + Self::Constraint => "constraint", + Self::Outcome => "outcome", Self::Error => "error", Self::TaskUpdate => "task_update", Self::AgentMessage => "agent_message", @@ -87,6 +99,10 @@ impl WorkingMemoryEventType { "decision" => Some(Self::Decision), "user_correction" => Some(Self::UserCorrection), "decision_revised" => Some(Self::DecisionRevised), + "deadline_set" => Some(Self::DeadlineSet), + "blocked_on" => Some(Self::BlockedOn), + "constraint" => Some(Self::Constraint), + "outcome" => Some(Self::Outcome), "error" => Some(Self::Error), "task_update" => Some(Self::TaskUpdate), "agent_message" => Some(Self::AgentMessage), @@ -765,6 +781,10 @@ fn format_event_line(event: &WorkingMemoryEvent, current_channel_id: &str) -> St WorkingMemoryEventType::Decision => "Decision", WorkingMemoryEventType::UserCorrection => "User correction", WorkingMemoryEventType::DecisionRevised => "Decision revised", + WorkingMemoryEventType::DeadlineSet => "Deadline set", + WorkingMemoryEventType::BlockedOn => "Blocked on", + WorkingMemoryEventType::Constraint => "Constraint", + WorkingMemoryEventType::Outcome => "Outcome", WorkingMemoryEventType::Error => "Error", WorkingMemoryEventType::TaskUpdate => "Task update", WorkingMemoryEventType::AgentMessage => "Agent message", @@ -1069,6 +1089,10 @@ mod tests { WorkingMemoryEventType::Decision, WorkingMemoryEventType::UserCorrection, WorkingMemoryEventType::DecisionRevised, + WorkingMemoryEventType::DeadlineSet, + WorkingMemoryEventType::BlockedOn, + WorkingMemoryEventType::Constraint, + WorkingMemoryEventType::Outcome, WorkingMemoryEventType::Error, WorkingMemoryEventType::TaskUpdate, WorkingMemoryEventType::AgentMessage, @@ -1091,7 +1115,7 @@ mod tests { } let events = store.get_events_for_day(&today).await.unwrap(); - assert_eq!(events.len(), 14); + assert_eq!(events.len(), 18); // Verify all types survived the roundtrip. let types: Vec = events.iter().map(|e| e.event_type).collect(); diff --git a/src/tools/memory_persistence_complete.rs b/src/tools/memory_persistence_complete.rs index 4648832bd..1652870cc 100644 --- a/src/tools/memory_persistence_complete.rs +++ b/src/tools/memory_persistence_complete.rs @@ -109,7 +109,8 @@ pub struct MemoryPersistenceCompleteArgs { /// A single event extracted by the persistence branch for the working memory log. #[derive(Debug, Clone, Deserialize, JsonSchema)] pub struct WorkingMemoryEventInput { - /// Event type: "decision", "user_correction", "decision_revised", "error", or "system". + /// Event type: "decision", "user_correction", "decision_revised", "deadline_set", + /// "blocked_on", "constraint", "outcome", "error", or "system". pub event_type: String, /// One-line summary of the event. pub summary: String, @@ -170,6 +171,10 @@ impl Tool for MemoryPersistenceCompleteTool { "decision", "user_correction", "decision_revised", + "deadline_set", + "blocked_on", + "constraint", + "outcome", "error", "system" ], diff --git a/src/tools/send_agent_message.rs b/src/tools/send_agent_message.rs index dcc9e79e2..43731c7a3 100644 --- a/src/tools/send_agent_message.rs +++ b/src/tools/send_agent_message.rs @@ -285,7 +285,7 @@ impl Tool for SendAgentMessageTool { if let Some(working_memory) = &self.working_memory { working_memory .emit( - crate::memory::WorkingMemoryEventType::AgentMessage, + crate::memory::WorkingMemoryEventType::Outcome, format!("Delegated task #{task_number} to {target_display}"), ) .importance(0.7) @@ -325,3 +325,145 @@ fn extract_task_title(message: &str) -> String { format!("{}...", first_line[..boundary].trim()) } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::links::{AgentLink, LinkDirection, LinkKind}; + use crate::memory::working::WorkingMemoryEvent; + use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; + use arc_swap::ArcSwap; + use chrono_tz::Tz; + use sqlx::sqlite::SqlitePoolOptions; + use std::collections::HashMap; + use std::time::Duration; + + async fn wait_for_single_event(store: &WorkingMemoryStore) -> WorkingMemoryEvent { + tokio::time::timeout(Duration::from_secs(2), async { + loop { + let events = store + .get_recent_events(10, 0.0) + .await + .expect("working memory query"); + if let Some(event) = events.into_iter().next() { + break event; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("timed out waiting for working memory event") + } + + #[tokio::test] + async fn send_agent_message_emits_outcome_event() { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("sqlite connect"); + sqlx::query( + r#" + CREATE TABLE tasks ( + id TEXT PRIMARY KEY, + task_number INTEGER NOT NULL UNIQUE, + title TEXT NOT NULL, + description TEXT, + status TEXT NOT NULL, + priority TEXT NOT NULL, + owner_agent_id TEXT NOT NULL, + assigned_agent_id TEXT NOT NULL, + subtasks TEXT NOT NULL, + metadata TEXT NOT NULL DEFAULT '{}', + source_memory_id TEXT, + worker_id TEXT, + created_by TEXT NOT NULL, + approved_at TEXT, + approved_by TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + completed_at TEXT + ) + "#, + ) + .execute(&pool) + .await + .expect("tasks schema should be created"); + sqlx::query( + "CREATE TABLE task_number_seq ( + id INTEGER PRIMARY KEY CHECK (id = 1), + next_number INTEGER NOT NULL DEFAULT 1 + )", + ) + .execute(&pool) + .await + .expect("task_number_seq should be created"); + sqlx::query( + "CREATE TABLE conversation_messages ( + id TEXT PRIMARY KEY, + channel_id TEXT NOT NULL, + role TEXT NOT NULL, + sender_name TEXT, + sender_id TEXT, + content TEXT NOT NULL, + metadata TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) + )", + ) + .execute(&pool) + .await + .expect("conversation messages schema should be created"); + sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") + .execute(&pool) + .await + .expect("sequence seed should be inserted"); + + let task_store = Arc::new(TaskStore::new(pool.clone())); + let conversation_logger = ConversationLogger::new(pool.clone()); + let working_memory_pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("sqlite connect"); + sqlx::migrate!("./migrations") + .run(&working_memory_pool) + .await + .expect("working memory migrations"); + let working_memory = WorkingMemoryStore::new(working_memory_pool, Tz::UTC); + + let links = Arc::new(ArcSwap::from_pointee(vec![AgentLink { + from_agent_id: "planner".to_string(), + to_agent_id: "executor".to_string(), + direction: LinkDirection::TwoWay, + kind: LinkKind::Peer, + }])); + let agent_names = Arc::new(HashMap::from([ + ("planner".to_string(), "Planner".to_string()), + ("executor".to_string(), "Executor".to_string()), + ])); + + let tool = SendAgentMessageTool::new( + crate::AgentId::from("planner"), + links, + agent_names, + task_store, + conversation_logger, + ) + .with_working_memory(working_memory.clone()); + + let output = tool + .call(SendAgentMessageArgs { + target: "executor".to_string(), + message: "Implement the working-memory renderer. Include tests.".to_string(), + }) + .await + .expect("send agent message should succeed"); + + assert!(output.success); + + let event = wait_for_single_event(&working_memory).await; + assert_eq!(event.event_type, WorkingMemoryEventType::Outcome); + assert_eq!(event.summary, "Delegated task #1 to Executor"); + } +} diff --git a/src/tools/spawn_worker.rs b/src/tools/spawn_worker.rs index fbb90a3c8..0efcf8436 100644 --- a/src/tools/spawn_worker.rs +++ b/src/tools/spawn_worker.rs @@ -188,6 +188,19 @@ impl Tool for SpawnWorkerTool { { let status = self.state.status_block.read().await; if let Some(existing_id) = status.find_duplicate_worker_task(&args.task) { + self.state + .deps + .working_memory + .emit( + crate::memory::WorkingMemoryEventType::BlockedOn, + format!( + "Worker spawn blocked on active worker {existing_id} for duplicate task" + ), + ) + .channel(self.state.channel_id.to_string()) + .importance(0.6) + .record(); + return Ok(SpawnWorkerOutput { worker_id: existing_id, spawned: false, diff --git a/src/tools/task_create.rs b/src/tools/task_create.rs index dabc7ead6..ddccff266 100644 --- a/src/tools/task_create.rs +++ b/src/tools/task_create.rs @@ -142,12 +142,25 @@ impl Tool for TaskCreateTool { .map_err(|error| TaskCreateError(format!("{error}")))?; if let Some(working_memory) = &self.working_memory { - working_memory - .emit( + let (event_type, summary, importance) = if task.status == TaskStatus::Done { + ( + crate::memory::WorkingMemoryEventType::Outcome, + format!("Task #{} completed: {}", task.task_number, task.title), + 0.7, + ) + } else { + ( crate::memory::WorkingMemoryEventType::TaskUpdate, - format!("Task created #{}: {}", task.task_number, task.title), + format!( + "Task created #{}: {} (status: {})", + task.task_number, task.title, task.status + ), + 0.5, ) - .importance(0.5) + }; + working_memory + .emit(event_type, summary) + .importance(importance) .record(); } @@ -159,3 +172,116 @@ impl Tool for TaskCreateTool { }) } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::memory::working::WorkingMemoryEvent; + use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; + use chrono_tz::Tz; + use sqlx::sqlite::SqlitePoolOptions; + use std::time::Duration; + + async fn setup_task_store() -> TaskStore { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("sqlite connect"); + sqlx::query( + r#" + CREATE TABLE tasks ( + id TEXT PRIMARY KEY, + task_number INTEGER NOT NULL UNIQUE, + title TEXT NOT NULL, + description TEXT, + status TEXT NOT NULL, + priority TEXT NOT NULL, + owner_agent_id TEXT NOT NULL, + assigned_agent_id TEXT NOT NULL, + subtasks TEXT NOT NULL, + metadata TEXT NOT NULL DEFAULT '{}', + source_memory_id TEXT, + worker_id TEXT, + created_by TEXT NOT NULL, + approved_at TEXT, + approved_by TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + completed_at TEXT + ) + "#, + ) + .execute(&pool) + .await + .expect("tasks schema should be created"); + sqlx::query( + "CREATE TABLE task_number_seq ( + id INTEGER PRIMARY KEY CHECK (id = 1), + next_number INTEGER NOT NULL DEFAULT 1 + )", + ) + .execute(&pool) + .await + .expect("task_number_seq should be created"); + sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") + .execute(&pool) + .await + .expect("sequence seed should be inserted"); + TaskStore::new(pool) + } + + async fn wait_for_single_event(store: &WorkingMemoryStore) -> WorkingMemoryEvent { + tokio::time::timeout(Duration::from_secs(2), async { + loop { + let events = store + .get_recent_events(10, 0.0) + .await + .expect("working memory query"); + if let Some(event) = events.into_iter().next() { + break event; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("timed out waiting for working memory event") + } + + #[tokio::test] + async fn task_create_emits_outcome_for_done_tasks() { + let task_store = Arc::new(setup_task_store().await); + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("sqlite connect"); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .expect("migrations"); + let working_memory = WorkingMemoryStore::new(pool, Tz::UTC); + + let tool = TaskCreateTool::new(task_store, "agent-test", "branch") + .with_working_memory(working_memory.clone()); + + let output = tool + .call(TaskCreateArgs { + title: "Ship observation MVP".to_string(), + description: Some("land the first packet".to_string()), + priority: "medium".to_string(), + subtasks: Vec::new(), + metadata: None, + status: Some("done".to_string()), + }) + .await + .expect("task create should succeed"); + + assert_eq!(output.status, "done"); + + let event = wait_for_single_event(&working_memory).await; + assert_eq!(event.event_type, WorkingMemoryEventType::Outcome); + assert_eq!(event.summary, "Task #1 completed: Ship observation MVP"); + } +} diff --git a/src/tools/task_update.rs b/src/tools/task_update.rs index 555df299f..1568c47f1 100644 --- a/src/tools/task_update.rs +++ b/src/tools/task_update.rs @@ -236,15 +236,25 @@ impl Tool for TaskUpdateTool { .ok_or_else(|| TaskUpdateError(format!("task #{} not found", task_number)))?; if let Some(working_memory) = &self.working_memory { - working_memory - .emit( + let (event_type, summary, importance) = if updated.status == TaskStatus::Done { + ( + crate::memory::WorkingMemoryEventType::Outcome, + format!("Task #{} completed", updated.task_number), + 0.7, + ) + } else { + ( crate::memory::WorkingMemoryEventType::TaskUpdate, format!( "Task #{} updated to {}", updated.task_number, updated.status ), + 0.4, ) - .importance(0.4) + }; + working_memory + .emit(event_type, summary) + .importance(importance) .record(); } @@ -256,3 +266,143 @@ impl Tool for TaskUpdateTool { }) } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::memory::working::WorkingMemoryEvent; + use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; + use chrono_tz::Tz; + use sqlx::sqlite::SqlitePoolOptions; + use std::time::Duration; + + async fn setup_task_store() -> TaskStore { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("sqlite connect"); + sqlx::query( + r#" + CREATE TABLE tasks ( + id TEXT PRIMARY KEY, + task_number INTEGER NOT NULL UNIQUE, + title TEXT NOT NULL, + description TEXT, + status TEXT NOT NULL, + priority TEXT NOT NULL, + owner_agent_id TEXT NOT NULL, + assigned_agent_id TEXT NOT NULL, + subtasks TEXT NOT NULL, + metadata TEXT NOT NULL DEFAULT '{}', + source_memory_id TEXT, + worker_id TEXT, + created_by TEXT NOT NULL, + approved_at TEXT, + approved_by TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + completed_at TEXT + ) + "#, + ) + .execute(&pool) + .await + .expect("tasks schema should be created"); + sqlx::query( + "CREATE TABLE task_number_seq ( + id INTEGER PRIMARY KEY CHECK (id = 1), + next_number INTEGER NOT NULL DEFAULT 1 + )", + ) + .execute(&pool) + .await + .expect("task_number_seq should be created"); + sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") + .execute(&pool) + .await + .expect("sequence seed should be inserted"); + TaskStore::new(pool) + } + + async fn setup_working_memory() -> Arc { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .expect("sqlite connect"); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .expect("migrations"); + WorkingMemoryStore::new(pool, Tz::UTC) + } + + async fn wait_for_single_event(store: &WorkingMemoryStore) -> WorkingMemoryEvent { + tokio::time::timeout(Duration::from_secs(2), async { + loop { + let events = store + .get_recent_events(10, 0.0) + .await + .expect("working memory query"); + if let Some(event) = events.into_iter().next() { + break event; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("timed out waiting for working memory event") + } + + #[tokio::test] + async fn task_update_emits_outcome_for_done_status() { + let task_store = Arc::new(setup_task_store().await); + let working_memory = setup_working_memory().await; + + let created = task_store + .create(crate::tasks::CreateTaskInput { + owner_agent_id: "agent-test".to_string(), + assigned_agent_id: "agent-test".to_string(), + title: "Review PR 2".to_string(), + description: None, + status: TaskStatus::InProgress, + priority: TaskPriority::Medium, + subtasks: Vec::new(), + metadata: serde_json::json!({}), + source_memory_id: None, + created_by: "branch".to_string(), + }) + .await + .expect("task should be created"); + + let tool = TaskUpdateTool::for_branch(task_store, AgentId::from("agent-test")) + .with_working_memory(working_memory.clone()); + + let output = tool + .call(TaskUpdateArgs { + task_number: created.task_number as i32, + title: None, + description: None, + status: Some("done".to_string()), + priority: None, + subtasks: None, + metadata: None, + complete_subtask: None, + worker_id: None, + approved_by: None, + }) + .await + .expect("task update should succeed"); + + assert_eq!(output.status, "done"); + + let event = wait_for_single_event(&working_memory).await; + assert_eq!(event.event_type, WorkingMemoryEventType::Outcome); + assert_eq!( + event.summary, + format!("Task #{} completed", created.task_number) + ); + } +} From f89fa272cb7af81527bd7779882da63d2cbc83be Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Wed, 1 Apr 2026 09:07:57 -0400 Subject: [PATCH 3/5] fix(memory): close stacked PR bot feedback --- src/agent/channel.rs | 131 +++++++++++++++++--------------- src/memory/working.rs | 8 +- src/tasks/store.rs | 109 ++++++++++++++------------ src/tools/send_agent_message.rs | 51 +------------ src/tools/spawn_worker.rs | 18 ++++- src/tools/task_create.rs | 52 +------------ src/tools/task_update.rs | 113 ++++++++++++++------------- 7 files changed, 221 insertions(+), 261 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 4fd53c1c8..20fce40c0 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -93,44 +93,52 @@ fn classify_conversational_event_summary( if let Some((prefix, rest)) = trimmed.split_once(':') { let rest_trimmed = rest.trim(); - match prefix.trim().to_ascii_lowercase().as_str() { - "outcome" => { - return ( - crate::memory::WorkingMemoryEventType::Outcome, - if rest_trimmed.is_empty() { - trimmed.to_string() - } else { - rest_trimmed.to_string() - }, - ); - } - "blocked_on" => { - return ( - crate::memory::WorkingMemoryEventType::BlockedOn, - if rest_trimmed.is_empty() { - trimmed.to_string() - } else { - rest_trimmed.to_string() - }, - ); - } - "constraint" => { - return ( - crate::memory::WorkingMemoryEventType::Constraint, - if rest_trimmed.is_empty() { - trimmed.to_string() - } else { - rest_trimmed.to_string() - }, - ); - } - _ => {} + let prefix = prefix.trim(); + if prefix.eq_ignore_ascii_case("outcome") { + return ( + crate::memory::WorkingMemoryEventType::Outcome, + rest_trimmed.to_string(), + ); + } + if prefix.eq_ignore_ascii_case("blocked_on") { + return ( + crate::memory::WorkingMemoryEventType::BlockedOn, + rest_trimmed.to_string(), + ); + } + if prefix.eq_ignore_ascii_case("constraint") { + return ( + crate::memory::WorkingMemoryEventType::Constraint, + rest_trimmed.to_string(), + ); } } (default_event_type, trimmed.to_string()) } +fn format_conversational_event_summary( + event_type: crate::memory::WorkingMemoryEventType, + source: &str, + event_summary: &str, +) -> String { + let label = match event_type { + crate::memory::WorkingMemoryEventType::Outcome => "outcome", + crate::memory::WorkingMemoryEventType::BlockedOn => "blocked on", + crate::memory::WorkingMemoryEventType::Constraint => "constraint", + crate::memory::WorkingMemoryEventType::Error => "failed", + crate::memory::WorkingMemoryEventType::BranchCompleted + | crate::memory::WorkingMemoryEventType::WorkerCompleted => "completed", + _ => "concluded", + }; + + if event_summary.is_empty() { + format!("{source} {label}") + } else { + format!("{source} {label}: {event_summary}") + } +} + /// Shared state that channel tools need to act on the channel. /// /// Wrapped in Arc and passed to tools (branch, spawn_worker, route, cancel) @@ -3065,18 +3073,11 @@ impl Channel { .working_memory .emit( event_type, - match event_type { - crate::memory::WorkingMemoryEventType::Outcome => { - format!("Branch outcome: {event_summary}") - } - crate::memory::WorkingMemoryEventType::BlockedOn => { - format!("Branch blocked on: {event_summary}") - } - crate::memory::WorkingMemoryEventType::Constraint => { - format!("Branch constraint: {event_summary}") - } - _ => format!("Branch concluded: {event_summary}"), - }, + format_conversational_event_summary( + event_type, + "Branch", + &event_summary, + ), ) .channel(self.id.to_string()) .importance(0.7) @@ -3156,21 +3157,7 @@ impl Channel { .working_memory .emit( event_type, - match event_type { - crate::memory::WorkingMemoryEventType::Outcome => { - format!("Worker outcome: {event_summary}") - } - crate::memory::WorkingMemoryEventType::BlockedOn => { - format!("Worker blocked on: {event_summary}") - } - crate::memory::WorkingMemoryEventType::Constraint => { - format!("Worker constraint: {event_summary}") - } - crate::memory::WorkingMemoryEventType::Error => { - format!("Worker failed: {event_summary}") - } - _ => format!("Worker completed: {event_summary}"), - }, + format_conversational_event_summary(event_type, "Worker", &event_summary), ) .channel(self.id.to_string()) .importance(if *success { 0.6 } else { 0.8 }) @@ -3750,7 +3737,7 @@ mod tests { ObserveModeFallbackState, compute_listen_mode_invocation, is_dm_conversation_id, recv_channel_event, should_process_event_for_channel, should_send_discord_quiet_mode_ping_ack, should_send_quiet_mode_fallback, - classify_conversational_event_summary, + classify_conversational_event_summary, format_conversational_event_summary, }; use crate::memory::{MemoryType, WorkingMemoryEventType}; use crate::{AgentId, ChannelId, InboundMessage, MessageContent, ProcessEvent, ProcessId}; @@ -3951,6 +3938,30 @@ mod tests { assert_eq!(summary, "completed with no blockers"); } + #[test] + fn conversational_event_summary_extracts_constraint_prefix_case_insensitively() { + let (event_type, summary) = classify_conversational_event_summary( + "CoNsTrAiNt: must keep migrations immutable", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::Constraint); + assert_eq!(summary, "must keep migrations immutable"); + } + + #[test] + fn conversational_event_summary_treats_empty_prefixed_content_as_empty_summary() { + let (event_type, summary) = classify_conversational_event_summary( + "outcome: ", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::Outcome); + assert!(summary.is_empty()); + assert_eq!( + format_conversational_event_summary(event_type, "Worker", &summary), + "Worker outcome" + ); + } + #[test] fn quiet_mode_invocation_uses_discord_mention_and_reply_metadata() { let message = inbound_message( diff --git a/src/memory/working.rs b/src/memory/working.rs index 06506de40..9308915ca 100644 --- a/src/memory/working.rs +++ b/src/memory/working.rs @@ -1080,7 +1080,7 @@ mod tests { let store = setup_test_store().await; let today = store.today(); - for event_type in [ + let inserted = [ WorkingMemoryEventType::BranchCompleted, WorkingMemoryEventType::WorkerSpawned, WorkingMemoryEventType::WorkerCompleted, @@ -1099,7 +1099,9 @@ mod tests { WorkingMemoryEventType::System, WorkingMemoryEventType::MemoryPromoted, WorkingMemoryEventType::MemoryDemoted, - ] { + ]; + + for event_type in inserted { let event = WorkingMemoryEvent { id: Uuid::new_v4().to_string(), event_type, @@ -1115,7 +1117,7 @@ mod tests { } let events = store.get_events_for_day(&today).await.unwrap(); - assert_eq!(events.len(), 18); + assert_eq!(events.len(), inserted.len()); // Verify all types survived the roundtrip. let types: Vec = events.iter().map(|e| e.event_type).collect(); diff --git a/src/tasks/store.rs b/src/tasks/store.rs index db3b0d117..199413c3c 100644 --- a/src/tasks/store.rs +++ b/src/tasks/store.rs @@ -9,6 +9,8 @@ use anyhow::Context as _; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; +#[cfg(test)] +use sqlx::sqlite::SqlitePoolOptions; use sqlx::{Row as _, SqlitePool}; #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, utoipa::ToSchema)] @@ -183,6 +185,11 @@ impl TaskStore { Self { pool } } + #[cfg(test)] + pub(crate) fn pool(&self) -> &SqlitePool { + &self.pool + } + /// Maximum number of retries when a concurrent create races on the /// `task_number` UNIQUE constraint. const MAX_CREATE_RETRIES: usize = 3; @@ -670,61 +677,65 @@ fn read_optional_timestamp(row: &sqlx::sqlite::SqliteRow, column: &str) -> Optio } #[cfg(test)] -mod tests { - use super::*; - use sqlx::sqlite::SqlitePoolOptions; - - async fn setup_store() -> TaskStore { - let pool = SqlitePoolOptions::new() - .max_connections(1) - .connect("sqlite::memory:") - .await - .expect("in-memory sqlite should connect"); - - sqlx::query( - r#" - CREATE TABLE tasks ( - id TEXT PRIMARY KEY, - task_number INTEGER NOT NULL UNIQUE, - title TEXT NOT NULL, - description TEXT, - status TEXT NOT NULL DEFAULT 'backlog', - priority TEXT NOT NULL DEFAULT 'medium', - owner_agent_id TEXT NOT NULL, - assigned_agent_id TEXT NOT NULL, - subtasks TEXT, - metadata TEXT, - source_memory_id TEXT, - worker_id TEXT, - created_by TEXT NOT NULL, - approved_at TEXT, - approved_by TEXT, - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - completed_at TEXT - ) - "#, - ) - .execute(&pool) +pub(crate) async fn setup_test_store() -> TaskStore { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") .await - .expect("tasks schema should be created"); - - sqlx::query( - "CREATE TABLE task_number_seq ( - id INTEGER PRIMARY KEY CHECK (id = 1), - next_number INTEGER NOT NULL DEFAULT 1 - )", + .expect("in-memory sqlite should connect"); + + sqlx::query( + r#" + CREATE TABLE tasks ( + id TEXT PRIMARY KEY, + task_number INTEGER NOT NULL UNIQUE, + title TEXT NOT NULL, + description TEXT, + status TEXT NOT NULL DEFAULT 'backlog', + priority TEXT NOT NULL DEFAULT 'medium', + owner_agent_id TEXT NOT NULL, + assigned_agent_id TEXT NOT NULL, + subtasks TEXT, + metadata TEXT, + source_memory_id TEXT, + worker_id TEXT, + created_by TEXT NOT NULL, + approved_at TEXT, + approved_by TEXT, + created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + completed_at TEXT ) + "#, + ) + .execute(&pool) + .await + .expect("tasks schema should be created"); + + sqlx::query( + "CREATE TABLE task_number_seq ( + id INTEGER PRIMARY KEY CHECK (id = 1), + next_number INTEGER NOT NULL DEFAULT 1 + )", + ) + .execute(&pool) + .await + .expect("task_number_seq should be created"); + + sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") .execute(&pool) .await - .expect("task_number_seq should be created"); + .expect("sequence seed should be inserted"); - sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") - .execute(&pool) - .await - .expect("sequence seed should be inserted"); + TaskStore::new(pool) +} - TaskStore::new(pool) +#[cfg(test)] +mod tests { + use super::*; + + async fn setup_store() -> TaskStore { + setup_test_store().await } fn self_assigned_input(title: &str, status: TaskStatus) -> CreateTaskInput { diff --git a/src/tools/send_agent_message.rs b/src/tools/send_agent_message.rs index 43731c7a3..82b0267ed 100644 --- a/src/tools/send_agent_message.rs +++ b/src/tools/send_agent_message.rs @@ -333,6 +333,7 @@ mod tests { use crate::links::{AgentLink, LinkDirection, LinkKind}; use crate::memory::working::WorkingMemoryEvent; use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; + use crate::tasks::store::setup_test_store; use arc_swap::ArcSwap; use chrono_tz::Tz; use sqlx::sqlite::SqlitePoolOptions; @@ -358,47 +359,8 @@ mod tests { #[tokio::test] async fn send_agent_message_emits_outcome_event() { - let pool = SqlitePoolOptions::new() - .max_connections(1) - .connect("sqlite::memory:") - .await - .expect("sqlite connect"); - sqlx::query( - r#" - CREATE TABLE tasks ( - id TEXT PRIMARY KEY, - task_number INTEGER NOT NULL UNIQUE, - title TEXT NOT NULL, - description TEXT, - status TEXT NOT NULL, - priority TEXT NOT NULL, - owner_agent_id TEXT NOT NULL, - assigned_agent_id TEXT NOT NULL, - subtasks TEXT NOT NULL, - metadata TEXT NOT NULL DEFAULT '{}', - source_memory_id TEXT, - worker_id TEXT, - created_by TEXT NOT NULL, - approved_at TEXT, - approved_by TEXT, - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - completed_at TEXT - ) - "#, - ) - .execute(&pool) - .await - .expect("tasks schema should be created"); - sqlx::query( - "CREATE TABLE task_number_seq ( - id INTEGER PRIMARY KEY CHECK (id = 1), - next_number INTEGER NOT NULL DEFAULT 1 - )", - ) - .execute(&pool) - .await - .expect("task_number_seq should be created"); + let task_store = setup_test_store().await; + let pool = task_store.pool().clone(); sqlx::query( "CREATE TABLE conversation_messages ( id TEXT PRIMARY KEY, @@ -414,12 +376,7 @@ mod tests { .execute(&pool) .await .expect("conversation messages schema should be created"); - sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") - .execute(&pool) - .await - .expect("sequence seed should be inserted"); - - let task_store = Arc::new(TaskStore::new(pool.clone())); + let task_store = Arc::new(task_store); let conversation_logger = ConversationLogger::new(pool.clone()); let working_memory_pool = SqlitePoolOptions::new() .max_connections(1) diff --git a/src/tools/spawn_worker.rs b/src/tools/spawn_worker.rs index 0efcf8436..1552029a4 100644 --- a/src/tools/spawn_worker.rs +++ b/src/tools/spawn_worker.rs @@ -29,6 +29,21 @@ impl SpawnWorkerTool { } } +fn summarize_duplicate_task(task: &str) -> String { + let trimmed = task.trim(); + if trimmed.is_empty() { + return "unspecified task".to_string(); + } + + const MAX_CHARS: usize = 80; + if trimmed.len() <= MAX_CHARS { + trimmed.to_string() + } else { + let boundary = trimmed.floor_char_boundary(MAX_CHARS); + format!("{}...", &trimmed[..boundary]) + } +} + /// Error type for spawn worker tool. #[derive(Debug, thiserror::Error)] #[error("Worker spawn failed: {0}")] @@ -194,7 +209,8 @@ impl Tool for SpawnWorkerTool { .emit( crate::memory::WorkingMemoryEventType::BlockedOn, format!( - "Worker spawn blocked on active worker {existing_id} for duplicate task" + "Worker spawn blocked on active worker {existing_id} for duplicate task: {}", + summarize_duplicate_task(&args.task) ), ) .channel(self.state.channel_id.to_string()) diff --git a/src/tools/task_create.rs b/src/tools/task_create.rs index ddccff266..476d9ee7f 100644 --- a/src/tools/task_create.rs +++ b/src/tools/task_create.rs @@ -179,59 +179,11 @@ mod tests { use crate::memory::working::WorkingMemoryEvent; use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; + use crate::tasks::store::setup_test_store; use chrono_tz::Tz; use sqlx::sqlite::SqlitePoolOptions; use std::time::Duration; - async fn setup_task_store() -> TaskStore { - let pool = SqlitePoolOptions::new() - .max_connections(1) - .connect("sqlite::memory:") - .await - .expect("sqlite connect"); - sqlx::query( - r#" - CREATE TABLE tasks ( - id TEXT PRIMARY KEY, - task_number INTEGER NOT NULL UNIQUE, - title TEXT NOT NULL, - description TEXT, - status TEXT NOT NULL, - priority TEXT NOT NULL, - owner_agent_id TEXT NOT NULL, - assigned_agent_id TEXT NOT NULL, - subtasks TEXT NOT NULL, - metadata TEXT NOT NULL DEFAULT '{}', - source_memory_id TEXT, - worker_id TEXT, - created_by TEXT NOT NULL, - approved_at TEXT, - approved_by TEXT, - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - completed_at TEXT - ) - "#, - ) - .execute(&pool) - .await - .expect("tasks schema should be created"); - sqlx::query( - "CREATE TABLE task_number_seq ( - id INTEGER PRIMARY KEY CHECK (id = 1), - next_number INTEGER NOT NULL DEFAULT 1 - )", - ) - .execute(&pool) - .await - .expect("task_number_seq should be created"); - sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") - .execute(&pool) - .await - .expect("sequence seed should be inserted"); - TaskStore::new(pool) - } - async fn wait_for_single_event(store: &WorkingMemoryStore) -> WorkingMemoryEvent { tokio::time::timeout(Duration::from_secs(2), async { loop { @@ -251,7 +203,7 @@ mod tests { #[tokio::test] async fn task_create_emits_outcome_for_done_tasks() { - let task_store = Arc::new(setup_task_store().await); + let task_store = Arc::new(setup_test_store().await); let pool = SqlitePoolOptions::new() .max_connections(1) .connect("sqlite::memory:") diff --git a/src/tools/task_update.rs b/src/tools/task_update.rs index 1568c47f1..2c1a0a6ef 100644 --- a/src/tools/task_update.rs +++ b/src/tools/task_update.rs @@ -155,6 +155,13 @@ impl Tool for TaskUpdateTool { async fn call(&self, args: Self::Args) -> Result { let task_number = i64::from(args.task_number); + let previous_status = self + .task_store + .get_by_number(task_number) + .await + .map_err(|error| TaskUpdateError(format!("{error}")))? + .ok_or_else(|| TaskUpdateError(format!("task #{} not found", task_number)))? + .status; if let TaskUpdateScope::Worker(ref worker_id) = self.scope { let current = self @@ -236,7 +243,9 @@ impl Tool for TaskUpdateTool { .ok_or_else(|| TaskUpdateError(format!("task #{} not found", task_number)))?; if let Some(working_memory) = &self.working_memory { - let (event_type, summary, importance) = if updated.status == TaskStatus::Done { + let transitioned_to_done = + previous_status != TaskStatus::Done && updated.status == TaskStatus::Done; + let (event_type, summary, importance) = if transitioned_to_done { ( crate::memory::WorkingMemoryEventType::Outcome, format!("Task #{} completed", updated.task_number), @@ -273,59 +282,11 @@ mod tests { use crate::memory::working::WorkingMemoryEvent; use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; + use crate::tasks::store::setup_test_store; use chrono_tz::Tz; use sqlx::sqlite::SqlitePoolOptions; use std::time::Duration; - async fn setup_task_store() -> TaskStore { - let pool = SqlitePoolOptions::new() - .max_connections(1) - .connect("sqlite::memory:") - .await - .expect("sqlite connect"); - sqlx::query( - r#" - CREATE TABLE tasks ( - id TEXT PRIMARY KEY, - task_number INTEGER NOT NULL UNIQUE, - title TEXT NOT NULL, - description TEXT, - status TEXT NOT NULL, - priority TEXT NOT NULL, - owner_agent_id TEXT NOT NULL, - assigned_agent_id TEXT NOT NULL, - subtasks TEXT NOT NULL, - metadata TEXT NOT NULL DEFAULT '{}', - source_memory_id TEXT, - worker_id TEXT, - created_by TEXT NOT NULL, - approved_at TEXT, - approved_by TEXT, - created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), - completed_at TEXT - ) - "#, - ) - .execute(&pool) - .await - .expect("tasks schema should be created"); - sqlx::query( - "CREATE TABLE task_number_seq ( - id INTEGER PRIMARY KEY CHECK (id = 1), - next_number INTEGER NOT NULL DEFAULT 1 - )", - ) - .execute(&pool) - .await - .expect("task_number_seq should be created"); - sqlx::query("INSERT INTO task_number_seq (id, next_number) VALUES (1, 1)") - .execute(&pool) - .await - .expect("sequence seed should be inserted"); - TaskStore::new(pool) - } - async fn setup_working_memory() -> Arc { let pool = SqlitePoolOptions::new() .max_connections(1) @@ -358,7 +319,7 @@ mod tests { #[tokio::test] async fn task_update_emits_outcome_for_done_status() { - let task_store = Arc::new(setup_task_store().await); + let task_store = Arc::new(setup_test_store().await); let working_memory = setup_working_memory().await; let created = task_store @@ -405,4 +366,54 @@ mod tests { format!("Task #{} completed", created.task_number) ); } + + #[tokio::test] + async fn task_update_keeps_task_update_event_when_task_was_already_done() { + let task_store = Arc::new(setup_test_store().await); + let working_memory = setup_working_memory().await; + + let created = task_store + .create(crate::tasks::CreateTaskInput { + owner_agent_id: "agent-test".to_string(), + assigned_agent_id: "agent-test".to_string(), + title: "Review merged changes".to_string(), + description: None, + status: TaskStatus::Done, + priority: TaskPriority::Medium, + subtasks: Vec::new(), + metadata: serde_json::json!({}), + source_memory_id: None, + created_by: "branch".to_string(), + }) + .await + .expect("task should be created"); + + let tool = TaskUpdateTool::for_branch(task_store, AgentId::from("agent-test")) + .with_working_memory(working_memory.clone()); + + let output = tool + .call(TaskUpdateArgs { + task_number: created.task_number as i32, + title: Some("Review merged changes carefully".to_string()), + description: None, + status: None, + priority: None, + subtasks: None, + metadata: None, + complete_subtask: None, + worker_id: None, + approved_by: None, + }) + .await + .expect("task update should succeed"); + + assert_eq!(output.status, "done"); + + let event = wait_for_single_event(&working_memory).await; + assert_eq!(event.event_type, WorkingMemoryEventType::TaskUpdate); + assert_eq!( + event.summary, + format!("Task #{} updated to done", created.task_number) + ); + } } From 5141380588eb1dd995fdc6d4d505ea709d4a4ea7 Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Wed, 1 Apr 2026 10:00:54 -0400 Subject: [PATCH 4/5] test(channel): cover case-insensitive event prefixes --- src/agent/channel.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 20fce40c0..4c97d9d94 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -3948,6 +3948,23 @@ mod tests { assert_eq!(summary, "must keep migrations immutable"); } + #[test] + fn conversational_event_summary_is_case_insensitive_across_prefixes() { + let (event_type, summary) = classify_conversational_event_summary( + "OUTCOME: implemented the follow-up", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::Outcome); + assert_eq!(summary, "implemented the follow-up"); + + let (event_type, summary) = classify_conversational_event_summary( + "Blocked_On: waiting on reviewer signoff", + WorkingMemoryEventType::WorkerCompleted, + ); + assert_eq!(event_type, WorkingMemoryEventType::BlockedOn); + assert_eq!(summary, "waiting on reviewer signoff"); + } + #[test] fn conversational_event_summary_treats_empty_prefixed_content_as_empty_summary() { let (event_type, summary) = classify_conversational_event_summary( From bf4a2a32ecfdec5bbfc2b9b99e0d6e0331bf6cf2 Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Wed, 8 Apr 2026 13:23:03 -0400 Subject: [PATCH 5/5] fix(memory): resolve stacked rebase fallout --- src/agent/channel.rs | 13 ++++++++----- src/config/load.rs | 4 +++- src/tools/memory_persistence_complete.rs | 11 ----------- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 4c97d9d94..ca7f90fdf 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -1001,9 +1001,11 @@ impl Channel { "/quiet" | "/observe" => { self.set_response_mode(ResponseMode::Observe).await; self.send_builtin_text( - "observe mode enabled. i'll learn from this conversation but won't respond.".to_string(), + "observe mode enabled. i'll learn from this conversation but won't respond." + .to_string(), "observe", - ).await; + ) + .await; return Ok(true); } "/active" => { @@ -1033,7 +1035,8 @@ impl Channel { "- /tasks: ready task list".to_string(), "- /digest: one-shot day digest (00:00 -> now)".to_string(), "- /observe: learn from conversation, never respond".to_string(), - "- /mention-only: only respond when @mentioned, replied to, or given a command".to_string(), + "- /mention-only: only respond when @mentioned, replied to, or given a command" + .to_string(), "- /active: normal reply mode".to_string(), "- /agent-id: runtime agent id".to_string(), ]; @@ -3734,10 +3737,10 @@ fn is_dm_conversation_id(conv_id: &str) -> bool { #[cfg(test)] mod tests { use super::{ - ObserveModeFallbackState, compute_listen_mode_invocation, is_dm_conversation_id, + ObserveModeFallbackState, classify_conversational_event_summary, + compute_listen_mode_invocation, format_conversational_event_summary, is_dm_conversation_id, recv_channel_event, should_process_event_for_channel, should_send_discord_quiet_mode_ping_ack, should_send_quiet_mode_fallback, - classify_conversational_event_summary, format_conversational_event_summary, }; use crate::memory::{MemoryType, WorkingMemoryEventType}; use crate::{AgentId, ChannelId, InboundMessage, MessageContent, ProcessEvent, ProcessId}; diff --git a/src/config/load.rs b/src/config/load.rs index 1e6997515..e7779f54d 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -137,7 +137,9 @@ fn parse_response_mode( // Backwards compat: listen_only_mode maps to response_mode match listen_only_mode { Some(true) => { - tracing::warn!("listen_only_mode is deprecated, use response_mode = \"observe\" instead"); + tracing::warn!( + "listen_only_mode is deprecated, use response_mode = \"observe\" instead" + ); Some(ResponseMode::Observe) } Some(false) => Some(ResponseMode::Active), diff --git a/src/tools/memory_persistence_complete.rs b/src/tools/memory_persistence_complete.rs index 1652870cc..f54c1beb9 100644 --- a/src/tools/memory_persistence_complete.rs +++ b/src/tools/memory_persistence_complete.rs @@ -421,22 +421,11 @@ mod tests { .await .expect("timed out waiting for working memory events"); assert_eq!(events.len(), 2); -<<<<<<< HEAD assert!(events.iter().any(|event| { event.event_type == crate::memory::WorkingMemoryEventType::UserCorrection })); assert!(events.iter().any(|event| { event.event_type == crate::memory::WorkingMemoryEventType::DecisionRevised })); -======= - assert_eq!( - events[0].event_type, - crate::memory::WorkingMemoryEventType::UserCorrection - ); - assert_eq!( - events[1].event_type, - crate::memory::WorkingMemoryEventType::DecisionRevised - ); ->>>>>>> af646041 (fix(memory): tighten persistence rules and add conversational events) } }