diff --git a/codex-rs/memory/src/factory.rs b/codex-rs/memory/src/factory.rs index c44b1a5843a..713c9156d1d 100644 --- a/codex-rs/memory/src/factory.rs +++ b/codex-rs/memory/src/factory.rs @@ -35,10 +35,10 @@ pub fn open_repo_store( let be = backend.unwrap_or_else(choose_backend_from_env); Ok(match be { Backend::Jsonl => { - let _path = std::env::var("CODEX_MEMORY_REPO_JSONL") + let path = std::env::var("CODEX_MEMORY_REPO_JSONL") .map(std::path::PathBuf::from) .unwrap_or_else(|_| base.join("memory.jsonl")); - Box::new(JsonlMemoryStore) + Box::new(JsonlMemoryStore::new(path)) } #[cfg(feature = "sqlite")] Backend::Sqlite => { @@ -62,10 +62,10 @@ pub fn open_global_store( let be = backend.unwrap_or_else(choose_backend_from_env); Ok(match be { Backend::Jsonl => { - let _path = std::env::var("CODEX_MEMORY_HOME_JSONL") + let path = std::env::var("CODEX_MEMORY_HOME_JSONL") .map(std::path::PathBuf::from) .unwrap_or_else(|_| base.join("memory.jsonl")); - Box::new(JsonlMemoryStore) + Box::new(JsonlMemoryStore::new(path)) } #[cfg(feature = "sqlite")] Backend::Sqlite => { @@ -76,3 +76,28 @@ pub fn open_global_store( } }) } + +/// Rewrite a JSONL file, stripping invalid or empty lines. +pub fn compact(path: &std::path::Path) -> anyhow::Result<()> { + let data = match std::fs::read_to_string(path) { + Ok(s) => s, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()), + Err(e) => return Err(e.into()), + }; + let mut out = String::new(); + for line in data.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + if let Ok(v) = serde_json::from_str::(line) { + out.push_str(&serde_json::to_string(&v)?); + out.push('\n'); + } + } + if let Some(dir) = path.parent() { + std::fs::create_dir_all(dir)?; + } + std::fs::write(path, out)?; + Ok(()) +} diff --git a/codex-rs/memory/src/store/jsonl.rs b/codex-rs/memory/src/store/jsonl.rs index 09976c7de15..fdc7bb69a51 100644 --- a/codex-rs/memory/src/store/jsonl.rs +++ b/codex-rs/memory/src/store/jsonl.rs @@ -1,37 +1,182 @@ use super::*; -pub struct JsonlMemoryStore; +#[derive(Debug, Clone)] +pub struct JsonlMemoryStore { + path: std::path::PathBuf, +} + +impl JsonlMemoryStore { + pub fn new>(path: P) -> Self { + Self { path: path.into() } + } + + fn read_all(&self) -> anyhow::Result> { + let data = match std::fs::read_to_string(&self.path) { + Ok(s) => s, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => String::new(), + Err(e) => return Err(e.into()), + }; + let mut items = Vec::new(); + for line in data.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + match serde_json::from_str::(line) { + Ok(item) => items.push(item), + Err(_) => continue, // skip noisy lines + } + } + Ok(items) + } + + fn write_all(&self, items: &[MemoryItem]) -> anyhow::Result<()> { + if let Some(dir) = self.path.parent() { + std::fs::create_dir_all(dir)?; + } + let mut out = String::new(); + for it in items { + out.push_str(&serde_json::to_string(it)?); + out.push('\n'); + } + std::fs::write(&self.path, out)?; + Ok(()) + } +} impl MemoryStore for JsonlMemoryStore { - fn add(&self, _item: MemoryItem) -> anyhow::Result<()> { - todo!() + fn add(&self, item: MemoryItem) -> anyhow::Result<()> { + if let Some(dir) = self.path.parent() { + std::fs::create_dir_all(dir)?; + } + let mut line = serde_json::to_string(&item)?; + line.push('\n'); + use std::io::Write as _; + let mut f = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.path)?; + f.write_all(line.as_bytes())?; + f.flush()?; + Ok(()) } - fn update(&self, _item: &MemoryItem) -> anyhow::Result<()> { - todo!() + + fn update(&self, item: &MemoryItem) -> anyhow::Result<()> { + let mut items = self.read_all()?; + for it in &mut items { + if it.id == item.id { + *it = item.clone(); + } + } + self.write_all(&items) } - fn delete(&self, _id: &str) -> anyhow::Result<()> { - todo!() + + fn delete(&self, id: &str) -> anyhow::Result<()> { + let items = self.read_all()?; + let items: Vec<_> = items.into_iter().filter(|i| i.id != id).collect(); + self.write_all(&items) } - fn get(&self, _id: &str) -> anyhow::Result> { - todo!() + + fn get(&self, id: &str) -> anyhow::Result> { + let items = self.read_all()?; + Ok(items.into_iter().find(|i| i.id == id)) } + fn list( &self, - _scope: Option, - _status: Option, + scope: Option, + status: Option, ) -> anyhow::Result> { - todo!() + let mut items = self.read_all()?; + if let Some(sc) = scope { + items.retain(|i| i.scope == sc); + } + if let Some(st) = status { + items.retain(|i| i.status == st); + } + items.sort_by(|a, b| b.updated_at.cmp(&a.updated_at)); + Ok(items) } - fn archive(&self, _id: &str, _archived: bool) -> anyhow::Result<()> { - todo!() + + fn archive(&self, id: &str, archived: bool) -> anyhow::Result<()> { + let mut items = self.read_all()?; + let st = if archived { + Status::Archived + } else { + Status::Active + }; + let mut found = false; + for it in &mut items { + if it.id == id { + it.status = st.clone(); + found = true; + } + } + if !found { + anyhow::bail!("archive: id not found: {id}"); + } + self.write_all(&items) } - fn export(&self, _out: &mut dyn std::io::Write) -> anyhow::Result<()> { - todo!() + + fn export(&self, out: &mut dyn std::io::Write) -> anyhow::Result<()> { + let items = self.list(None, None)?; + for it in items { + let line = serde_json::to_string(&it)?; + out.write_all(line.as_bytes())?; + out.write_all(b"\n")?; + } + Ok(()) } - fn import(&self, _input: &mut dyn std::io::Read) -> anyhow::Result { - todo!() + + fn import(&self, input: &mut dyn std::io::Read) -> anyhow::Result { + let mut data = String::new(); + std::io::Read::read_to_string(input, &mut data)?; + let items = self.read_all()?; + let mut map: std::collections::HashMap = + items.into_iter().map(|i| (i.id.clone(), i)).collect(); + let mut count = 0usize; + for line in data.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + match serde_json::from_str::(line) { + Ok(item) => { + map.insert(item.id.clone(), item); + count += 1; + } + Err(_) => continue, + } + } + let mut items: Vec = map.into_values().collect(); + items.sort_by(|a, b| b.updated_at.cmp(&a.updated_at)); + self.write_all(&items)?; + Ok(count) } + fn stats(&self) -> anyhow::Result { - todo!() + let items = self.read_all()?; + let total = items.len(); + let active = items.iter().filter(|i| i.status == Status::Active).count(); + let archived = items + .iter() + .filter(|i| i.status == Status::Archived) + .count(); + let mut by_scope = serde_json::Map::new(); + for sc in [Scope::Global, Scope::Repo, Scope::Dir] { + let n = items.iter().filter(|i| i.scope == sc).count(); + let key = match sc { + Scope::Global => "global", + Scope::Repo => "repo", + Scope::Dir => "dir", + }; + by_scope.insert(key.to_string(), serde_json::json!(n)); + } + Ok(serde_json::json!({ + "total": total, + "active": active, + "archived": archived, + "by_scope": serde_json::Value::Object(by_scope), + })) } }