Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ opt-level = 3
strip = true

[workspace.dependencies]
agent-client-protocol = { version = "0.9", features = ["unstable_session_model"] }
anyhow = "1.0.102"
async-recursion = "1.1.1"
async-stream = "0.3"
Expand Down
3 changes: 3 additions & 0 deletions crates/forge_api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,7 @@ pub trait API: Sync + Send {
&self,
data_parameters: DataGenerationParameters,
) -> Result<BoxStream<'static, Result<serde_json::Value, anyhow::Error>>>;

/// Starts the ACP (Agent Communication Protocol) server over stdio.
async fn acp_start_stdio(&self) -> Result<()>;
}
5 changes: 5 additions & 0 deletions crates/forge_api/src/forge_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ impl<A: Services, F: CommandInfra + EnvironmentInfra + SkillRepository + GrpcInf
app.execute(data_parameters).await
}

async fn acp_start_stdio(&self) -> Result<()> {
let acp_app = forge_app::AcpApp::new(self.services.clone());
acp_app.start_stdio().await
}

async fn get_default_provider(&self) -> Result<Provider<Url>> {
let provider_id = self.services.get_default_provider().await?;
self.services.get_provider(provider_id).await
Expand Down
4 changes: 4 additions & 0 deletions crates/forge_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ lazy_static.workspace = true
forge_json_repair.workspace = true

tonic.workspace = true
agent-client-protocol.workspace = true
tokio-util = { workspace = true, features = ["compat"] }
base64.workspace = true
uuid.workspace = true


[dev-dependencies]
Expand Down
149 changes: 149 additions & 0 deletions crates/forge_app/src/acp/adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::collections::HashMap;
use std::sync::Arc;

use agent_client_protocol as acp;
use forge_domain::{AgentId, ConversationId, ModelId};
use tokio::sync::{Mutex, Notify, mpsc};

use crate::Services;

use super::error::{Error, Result};

/// Maximum number of buffered session notifications before backpressure.
const NOTIFICATION_CHANNEL_CAPACITY: usize = 1024;

#[derive(Clone)]
pub(super) struct SessionState {
pub conversation_id: ConversationId,
pub agent_id: AgentId,
/// Session-scoped model override. When set, prompts use this model
/// instead of the global default.
pub model_id: Option<ModelId>,
pub cancel_notify: Option<Arc<Notify>>,
}

pub(crate) struct AcpAdapter<S> {
pub(super) services: Arc<S>,
pub(super) session_update_tx: mpsc::Sender<acp::SessionNotification>,
pub(super) client_conn: Arc<Mutex<Option<Arc<acp::AgentSideConnection>>>>,
sessions: Arc<Mutex<HashMap<String, SessionState>>>,
}

impl<S: Services> AcpAdapter<S> {
/// Creates a new ACP adapter and returns the notification receiver.
pub(crate) fn new(
services: Arc<S>,
) -> (Self, mpsc::Receiver<acp::SessionNotification>) {
let (tx, rx) = mpsc::channel(NOTIFICATION_CHANNEL_CAPACITY);
let adapter = Self {
services,
session_update_tx: tx,
client_conn: Arc::new(Mutex::new(None)),
sessions: Arc::new(Mutex::new(HashMap::new())),
};
(adapter, rx)
}

pub(crate) async fn set_client_connection(&self, conn: Arc<acp::AgentSideConnection>) {
*self.client_conn.lock().await = Some(conn);
}

pub(super) async fn store_session(&self, session_id: String, state: SessionState) {
self.sessions.lock().await.insert(session_id, state);
}

/// Removes a session from the adapter. Currently unused but available
/// for future session lifecycle management (TTL, explicit close).
#[allow(dead_code)]
pub(super) async fn remove_session(&self, session_id: &str) {
self.sessions.lock().await.remove(session_id);
}

pub(super) async fn session_state(&self, session_id: &str) -> Result<SessionState> {
self.sessions
.lock()
.await
.get(session_id)
.cloned()
.ok_or_else(|| Error::Application(anyhow::anyhow!("Session not found")))
}

pub(super) async fn update_session_agent(
&self,
session_id: &str,
agent_id: AgentId,
) -> Result<()> {
let mut sessions = self.sessions.lock().await;
let state = sessions
.get_mut(session_id)
.ok_or_else(|| Error::Application(anyhow::anyhow!("Session not found")))?;
state.agent_id = agent_id;
Ok(())
}

pub(super) async fn update_session_model(
&self,
session_id: &str,
model_id: ModelId,
) -> Result<()> {
let mut sessions = self.sessions.lock().await;
let state = sessions
.get_mut(session_id)
.ok_or_else(|| Error::Application(anyhow::anyhow!("Session not found")))?;
state.model_id = Some(model_id);
Ok(())
}

pub(super) async fn set_cancel_notify(
&self,
session_id: &str,
cancel_notify: Option<Arc<Notify>>,
) -> Result<()> {
let mut sessions = self.sessions.lock().await;
let state = sessions
.get_mut(session_id)
.ok_or_else(|| Error::Application(anyhow::anyhow!("Session not found")))?;
state.cancel_notify = cancel_notify;
Ok(())
}

pub(super) async fn cancel_session(&self, session_id: &str) -> bool {
let notify = self
.sessions
.lock()
.await
.get(session_id)
.and_then(|state| state.cancel_notify.clone());

if let Some(notify) = notify {
notify.notify_waiters();
true
} else {
false
}
}

pub(super) async fn ensure_session(
&self,
session_id: &str,
conversation_id: ConversationId,
agent_id: AgentId,
) -> SessionState {
let mut sessions = self.sessions.lock().await;
sessions
.entry(session_id.to_string())
.or_insert_with(|| SessionState {
conversation_id,
agent_id,
model_id: None,
cancel_notify: None,
})
.clone()
}

pub(super) fn send_notification(&self, notification: acp::SessionNotification) -> Result<()> {
self.session_update_tx
.try_send(notification)
.map_err(|_| Error::Application(anyhow::anyhow!("Failed to send notification")))
}
}
Loading
Loading