Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,9 @@ pub enum SkillScope {
Repo,
System,
Admin,
/// Served by a connected MCP server per the Skills-over-MCP extension
/// (`io.modelcontextprotocol/skills`).
Mcp,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
Expand All @@ -1110,6 +1113,12 @@ pub struct SkillMetadata {
pub short_description: Option<String>,
pub path: PathBuf,
pub scope: SkillScope,
#[ts(optional)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub uri: Option<String>,
#[ts(optional)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub server_name: Option<String>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
Expand Down Expand Up @@ -1137,6 +1146,8 @@ impl From<CoreSkillMetadata> for SkillMetadata {
short_description: value.short_description,
path: value.path,
scope: value.scope.into(),
uri: value.uri,
server_name: value.server_name,
}
}
}
Expand All @@ -1148,6 +1159,7 @@ impl From<CoreSkillScope> for SkillScope {
CoreSkillScope::Repo => Self::Repo,
CoreSkillScope::System => Self::System,
CoreSkillScope::Admin => Self::Admin,
CoreSkillScope::Mcp => Self::Mcp,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3324,6 +3324,8 @@ fn skills_to_info(
short_description: skill.short_description.clone(),
path: skill.path.clone(),
scope: skill.scope.into(),
uri: skill.uri.clone(),
server_name: skill.server_name.clone(),
})
.collect()
}
Expand Down
1 change: 1 addition & 0 deletions codex-rs/cli/src/mcp_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ async fn run_add(config_overrides: &CliConfigOverrides, add_args: AddArgs) -> Re
tool_timeout_sec: None,
enabled_tools: None,
disabled_tools: None,
mcp_skills: true,
};

servers.insert(name.clone(), new_entry);
Expand Down
195 changes: 153 additions & 42 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ use crate::skills::SkillInjections;
use crate::skills::SkillMetadata;
use crate::skills::SkillsManager;
use crate::skills::build_skill_injections;
use crate::skills::load_mcp_skill_manifests;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
Expand Down Expand Up @@ -204,6 +205,49 @@ fn maybe_push_chat_wire_api_deprecation(
});
}

/// Bring up the session-scoped MCP connection manager ahead of skill
/// discovery and turn-1 tool use, and return its handle, cancellation
/// token, and the event receiver. `initialize()` emits its own events,
/// but `SessionConfigured` must be the first event callers see (see
/// `conversation_manager::finalize_spawn`), so MCP events flow through a
/// dedicated channel that `Session::new` later fans into the main
/// event stream.
async fn spawn_mcp_manager(
config: &Config,
) -> (
Arc<RwLock<McpConnectionManager>>,
CancellationToken,
Receiver<Event>,
) {
let manager: Arc<RwLock<McpConnectionManager>> =
Arc::new(RwLock::new(McpConnectionManager::default()));
let cancellation_token = CancellationToken::new();
let (tx_mcp, rx_mcp) = async_channel::unbounded::<Event>();
let auth_statuses = compute_auth_statuses(
config.mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
)
.await;
let sandbox_state = crate::mcp_connection_manager::SandboxState {
sandbox_policy: config.sandbox_policy.get().clone(),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
sandbox_cwd: config.cwd.clone(),
};
manager
.write()
.await
.initialize(
config.mcp_servers.clone(),
config.mcp_oauth_credentials_store_mode,
auth_statuses,
tx_mcp,
cancellation_token.clone(),
sandbox_state,
)
.await;
(manager, cancellation_token, rx_mcp)
}

impl Codex {
/// Spawn a new [`Codex`] and initialize the session.
pub async fn spawn(
Expand All @@ -217,20 +261,30 @@ impl Codex {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();

let loaded_skills = config
let mut loaded_skills = config
.features
.enabled(Feature::Skills)
.then(|| skills_manager.skills_for_cwd(&config.cwd));

if let Some(outcome) = &loaded_skills {
for err in &outcome.errors {
error!(
"failed to load skill {}: {}",
err.path.display(),
err.message
);
}
}
let (mcp_connection_manager, mcp_startup_cancellation_token, rx_mcp) =
spawn_mcp_manager(&config).await;

let session_mcp_skills: Arc<Vec<SkillMetadata>> =
if let Some(outcome) = loaded_skills.as_mut() {
let guard = mcp_connection_manager.read().await;
let added = merge_mcp_skills(outcome, &guard, &config).await;
drop(guard);
for err in &outcome.errors {
error!(
"failed to load skill {}: {}",
err.path.display(),
err.message
);
}
Arc::new(added)
} else {
Arc::new(Vec::new())
};

let user_instructions = get_user_instructions(
&config,
Expand Down Expand Up @@ -280,6 +334,10 @@ impl Codex {
conversation_history,
session_source_clone,
skills_manager,
mcp_connection_manager,
mcp_startup_cancellation_token,
session_mcp_skills,
rx_mcp,
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -545,6 +603,10 @@ impl Session {
initial_history: InitialHistory,
session_source: SessionSource,
skills_manager: Arc<SkillsManager>,
mcp_connection_manager: Arc<RwLock<McpConnectionManager>>,
mcp_startup_cancellation_token: CancellationToken,
session_mcp_skills: Arc<Vec<SkillMetadata>>,
rx_mcp: async_channel::Receiver<Event>,
) -> anyhow::Result<Arc<Self>> {
debug!(
"Configuring session: model={}; provider={:?}",
Expand Down Expand Up @@ -583,14 +645,12 @@ impl Session {
let rollout_fut = RolloutRecorder::new(&config, rollout_params);

let history_meta_fut = crate::message_history::history_metadata(&config);
let auth_statuses_fut = compute_auth_statuses(
config.mcp_servers.iter(),
config.mcp_oauth_credentials_store_mode,
);

// Join all independent futures.
let (rollout_recorder, (history_log_id, history_entry_count), auth_statuses) =
tokio::join!(rollout_fut, history_meta_fut, auth_statuses_fut);
// Join the independent startup futures. The MCP connection manager
// has already been initialized by `Codex::spawn` and is passed in —
// see the caller for why this is handled outside `Session::new`.
let (rollout_recorder, (history_log_id, history_entry_count)) =
tokio::join!(rollout_fut, history_meta_fut);

let rollout_recorder = rollout_recorder.map_err(|e| {
error!("failed to initialize rollout recorder: {e:#}");
Expand Down Expand Up @@ -653,8 +713,8 @@ impl Session {
let state = SessionState::new(session_configuration.clone());

let services = SessionServices {
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
mcp_startup_cancellation_token: CancellationToken::new(),
mcp_connection_manager,
mcp_startup_cancellation_token,
unified_exec_manager: UnifiedExecSessionManager::default(),
notifier: UserNotifier::new(config.notify.clone()),
rollout: Mutex::new(Some(rollout_recorder)),
Expand All @@ -666,6 +726,7 @@ impl Session {
models_manager: Arc::clone(&models_manager),
tool_approvals: Mutex::new(ApprovalStore::default()),
skills_manager,
session_mcp_skills,
};

let sess = Arc::new(Session {
Expand Down Expand Up @@ -702,26 +763,21 @@ impl Session {
sess.send_event_raw(event).await;
}

// Construct sandbox_state before initialize() so it can be sent to each
// MCP server immediately after it becomes ready (avoiding blocking).
let sandbox_state = SandboxState {
sandbox_policy: session_configuration.sandbox_policy.get().clone(),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
sandbox_cwd: session_configuration.cwd.clone(),
};
sess.services
.mcp_connection_manager
.write()
.await
.initialize(
config.mcp_servers.clone(),
config.mcp_oauth_credentials_store_mode,
auth_statuses.clone(),
tx_event.clone(),
sess.services.mcp_startup_cancellation_token.clone(),
sandbox_state,
)
.await;
// Flush MCP startup / elicitation events that `Codex::spawn` routed
// through a dedicated channel so they couldn't arrive before
// `SessionConfigured`. The forwarder runs for the session lifetime
// because the MCP connection manager also reuses this channel for
// elicitation events produced mid-session.
{
let tx_forward = tx_event.clone();
tokio::spawn(async move {
while let Ok(event) = rx_mcp.recv().await {
if tx_forward.send(event).await.is_err() {
break;
}
}
});
}

// record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted.
sess.record_initial_history(initial_history).await;
Expand Down Expand Up @@ -1961,9 +2017,13 @@ mod handlers {
};
let skills = if sess.enabled(Feature::Skills) {
let skills_manager = &sess.services.skills_manager;
let session_mcp_skills = sess.services.session_mcp_skills.as_ref();
cwds.into_iter()
.map(|cwd| {
let outcome = skills_manager.skills_for_cwd_with_options(&cwd, force_reload);
let mut outcome =
skills_manager.skills_for_cwd_with_options(&cwd, force_reload);
// Filesystem wins on name collision.
outcome.add_mcp_skills(session_mcp_skills.iter().cloned());
let errors = super::errors_to_info(&outcome.errors);
let skills = super::skills_to_info(&outcome.skills);
SkillsListEntry {
Expand Down Expand Up @@ -2174,6 +2234,8 @@ fn skills_to_info(skills: &[SkillMetadata]) -> Vec<ProtocolSkillMetadata> {
short_description: skill.short_description.clone(),
path: skill.path.clone(),
scope: skill.scope,
uri: skill.uri.clone(),
server_name: skill.server_name.clone(),
})
.collect()
}
Expand All @@ -2188,6 +2250,47 @@ fn errors_to_info(errors: &[SkillError]) -> Vec<SkillErrorInfo> {
.collect()
}

/// Fetch Skills-over-MCP manifests from every connected MCP server whose
/// config opts in, and merge them into `outcome`. Filesystem skills win on
/// name collision (matching the precedence in `skill_roots_for_cwd`).
///
/// The caller is expected to have already initialized `manager` against
/// `config.mcp_servers` — we reuse the session's live connections rather
/// than spawning throwaway children.
///
/// Returns the MCP-origin skills that survived masking, so the session can
/// make them visible per-turn for `$Mention` resolution.
async fn merge_mcp_skills(
outcome: &mut crate::skills::SkillLoadOutcome,
manager: &McpConnectionManager,
config: &Config,
) -> Vec<SkillMetadata> {
let any_eligible = config
.mcp_servers
.iter()
.any(|(_, server_cfg)| server_cfg.enabled && server_cfg.mcp_skills);
if !any_eligible {
return Vec::new();
}

// `load_mcp_skill_manifests` re-filters by `enabled && mcp_skills`, so
// we pass the raw server map and let it be the single point of truth.
let mcp_outcome = load_mcp_skill_manifests(manager, &config.mcp_servers).await;
outcome.errors.extend(mcp_outcome.errors);

let skills = mcp_outcome.skills;
let before = outcome.skills.len();
let masked = outcome.add_mcp_skills(skills);
if !masked.is_empty() {
warn!(
"skills: {} MCP skill(s) masked by filesystem skills with the same name: {}",
masked.len(),
masked.join(", ")
);
}
outcome.skills[before..].to_vec()
}

/// Takes a user message as input and runs a loop where, at each turn, the model
/// replies with either:
///
Expand Down Expand Up @@ -2227,9 +2330,15 @@ pub(crate) async fn run_task(
sess.send_event(&turn_context, event).await;

let skills_outcome = sess.enabled(Feature::Skills).then(|| {
sess.services
let mut outcome = sess
.services
.skills_manager
.skills_for_cwd(&turn_context.cwd)
.skills_for_cwd(&turn_context.cwd);
// MCP-served skills were discovered at session start and cached on
// the session; merge them in with filesystem skills winning on name
// collision so `$Mention` can resolve URI-backed skills too.
outcome.add_mcp_skills(sess.services.session_mcp_skills.iter().cloned());
outcome
});

let SkillInjections {
Expand Down Expand Up @@ -3143,6 +3252,7 @@ mod tests {
models_manager,
tool_approvals: Mutex::new(ApprovalStore::default()),
skills_manager,
session_mcp_skills: Arc::new(Vec::new()),
};

let turn_context = Session::make_turn_context(
Expand Down Expand Up @@ -3230,6 +3340,7 @@ mod tests {
models_manager,
tool_approvals: Mutex::new(ApprovalStore::default()),
skills_manager,
session_mcp_skills: Arc::new(Vec::new()),
};

let turn_context = Arc::new(Session::make_turn_context(
Expand Down
Loading