Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions dotnet/src/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public sealed partial class CopilotClient : IDisposable, IAsyncDisposable
private readonly List<Action<SessionLifecycleEvent>> _lifecycleHandlers = [];
private readonly Dictionary<string, List<Action<SessionLifecycleEvent>>> _typedLifecycleHandlers = [];
private readonly object _lifecycleHandlersLock = new();
private readonly ConcurrentDictionary<string, CopilotSession> _shellProcessMap = new();
private ServerRpc? _rpc;

/// <summary>
Expand Down Expand Up @@ -428,6 +429,9 @@ public async Task<CopilotSession> CreateSessionAsync(SessionConfig config, Cance
session.On(config.OnEvent);
}
_sessions[sessionId] = session;
session.SetShellProcessCallbacks(
(processId, s) => _shellProcessMap[processId] = s,
processId => _shellProcessMap.TryRemove(processId, out _));

try
{
Expand Down Expand Up @@ -532,6 +536,9 @@ public async Task<CopilotSession> ResumeSessionAsync(string sessionId, ResumeSes
session.On(config.OnEvent);
}
_sessions[sessionId] = session;
session.SetShellProcessCallbacks(
(processId, s) => _shellProcessMap[processId] = s,
processId => _shellProcessMap.TryRemove(processId, out _));

try
{
Expand Down Expand Up @@ -1202,6 +1209,8 @@ private async Task<Connection> ConnectToServerAsync(Process? cliProcess, string?
rpc.AddLocalRpcMethod("permission.request", handler.OnPermissionRequestV2);
rpc.AddLocalRpcMethod("userInput.request", handler.OnUserInputRequest);
rpc.AddLocalRpcMethod("hooks.invoke", handler.OnHooksInvoke);
rpc.AddLocalRpcMethod("shell.output", handler.OnShellOutput);
rpc.AddLocalRpcMethod("shell.exit", handler.OnShellExit);
rpc.StartListening();

// Transition state to Disconnected if the JSON-RPC connection drops
Expand Down Expand Up @@ -1421,6 +1430,34 @@ public async Task<PermissionRequestResponseV2> OnPermissionRequestV2(string sess
});
}
}

public void OnShellOutput(string processId, string stream, string data)
{
if (client._shellProcessMap.TryGetValue(processId, out var session))
{
session.DispatchShellOutput(new ShellOutputNotification
{
ProcessId = processId,
Stream = stream,
Data = data,
});
}
}

public void OnShellExit(string processId, int exitCode)
{
if (client._shellProcessMap.TryGetValue(processId, out var session))
{
session.DispatchShellExit(new ShellExitNotification
{
ProcessId = processId,
ExitCode = exitCode,
});
// Clean up the mapping after exit
client._shellProcessMap.TryRemove(processId, out _);
session.UntrackShellProcess(processId);
}
Comment on lines +1434 to +1459
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_shellProcessMap is the only routing mechanism for shell.output/shell.exit, but nothing in the .NET SDK calls CopilotSession.TrackShellProcess(...) (search shows only the method definition). This means these notifications will be dropped and OnShellOutput/OnShellExit won’t fire. The process registration needs to happen when starting a shell command (track immediately after receiving processId), or the server needs to include sessionId in these notifications so routing doesn’t depend on prior registration.

Copilot uses AI. Check for mistakes.
}
}

private class Connection(
Expand Down
115 changes: 115 additions & 0 deletions dotnet/src/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public sealed partial class CopilotSession : IAsyncDisposable
private readonly SemaphoreSlim _hooksLock = new(1, 1);
private SessionRpc? _sessionRpc;
private int _isDisposed;
private event Action<ShellOutputNotification>? ShellOutputHandlers;
private event Action<ShellExitNotification>? ShellExitHandlers;
private readonly HashSet<string> _trackedProcessIds = [];
private readonly object _trackedProcessIdsLock = new();
private Action<string, CopilotSession>? _registerShellProcess;
private Action<string>? _unregisterShellProcess;

/// <summary>
/// Channel that serializes event dispatch. <see cref="DispatchEvent"/> enqueues;
Expand Down Expand Up @@ -278,6 +284,52 @@ public IDisposable On(SessionEventHandler handler)
return new ActionDisposable(() => ImmutableInterlocked.Update(ref _eventHandlers, array => array.Remove(handler)));
}

/// <summary>
/// Subscribes to shell output notifications for this session.
/// </summary>
/// <param name="handler">A callback that receives shell output notifications.</param>
/// <returns>An <see cref="IDisposable"/> that unsubscribes the handler when disposed.</returns>
/// <remarks>
/// Shell output notifications are streamed in chunks when commands started
/// via <c>session.Rpc.Shell.ExecAsync()</c> produce stdout or stderr output.
/// </remarks>
/// <example>
/// <code>
/// using var sub = session.OnShellOutput(n =>
/// {
/// Console.WriteLine($"[{n.ProcessId}:{n.Stream}] {n.Data}");
/// });
/// </code>
/// </example>
public IDisposable OnShellOutput(Action<ShellOutputNotification> handler)
{
ShellOutputHandlers += handler;
return new ActionDisposable(() => ShellOutputHandlers -= handler);
}

/// <summary>
/// Subscribes to shell exit notifications for this session.
/// </summary>
/// <param name="handler">A callback that receives shell exit notifications.</param>
/// <returns>An <see cref="IDisposable"/> that unsubscribes the handler when disposed.</returns>
/// <remarks>
/// Shell exit notifications are sent when commands started via
/// <c>session.Rpc.Shell.ExecAsync()</c> complete (after all output has been streamed).
Comment on lines +316 to +317
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The XML docs mention commands started via session.Rpc.Shell.ExecAsync(), but there is no Shell API exposed on the session RPC surface in this repo. Either add the missing RPC API or update the docs to reference the real method that starts shell commands and yields the processId used for notifications.

Suggested change
/// Shell exit notifications are sent when commands started via
/// <c>session.Rpc.Shell.ExecAsync()</c> complete (after all output has been streamed).
/// Shell exit notifications are sent when shell commands started via the session's RPC
/// surface complete (after all output has been streamed). The notification's
/// <see cref="ShellExitNotification.ProcessId"/> matches the <c>processId</c> returned
/// when the corresponding shell command was started.

Copilot uses AI. Check for mistakes.
/// </remarks>
/// <example>
/// <code>
/// using var sub = session.OnShellExit(n =>
/// {
/// Console.WriteLine($"Process {n.ProcessId} exited with code {n.ExitCode}");
/// });
/// </code>
/// </example>
public IDisposable OnShellExit(Action<ShellExitNotification> handler)
{
ShellExitHandlers += handler;
return new ActionDisposable(() => ShellExitHandlers -= handler);
}

/// <summary>
/// Enqueues an event for serial dispatch to all registered handlers.
/// </summary>
Expand Down Expand Up @@ -323,6 +375,57 @@ private async Task ProcessEventsAsync()
}
}

/// <summary>
/// Dispatches a shell output notification to all registered handlers.
/// </summary>
internal void DispatchShellOutput(ShellOutputNotification notification)
{
ShellOutputHandlers?.Invoke(notification);
}

/// <summary>
/// Dispatches a shell exit notification to all registered handlers.
/// </summary>
internal void DispatchShellExit(ShellExitNotification notification)
{
ShellExitHandlers?.Invoke(notification);
}

/// <summary>
/// Track a shell process ID so notifications are routed to this session.
/// </summary>
internal void TrackShellProcess(string processId)
{
lock (_trackedProcessIdsLock)
{
_trackedProcessIds.Add(processId);
}
_registerShellProcess?.Invoke(processId, this);
}

/// <summary>
/// Stop tracking a shell process ID.
/// </summary>
internal void UntrackShellProcess(string processId)
{
lock (_trackedProcessIdsLock)
{
_trackedProcessIds.Remove(processId);
}
_unregisterShellProcess?.Invoke(processId);
}

/// <summary>
/// Set the registration callbacks for shell process tracking.
/// </summary>
internal void SetShellProcessCallbacks(
Action<string, CopilotSession> register,
Action<string> unregister)
{
_registerShellProcess = register;
_unregisterShellProcess = unregister;
}

/// <summary>
/// Registers custom tool handlers for this session.
/// </summary>
Expand Down Expand Up @@ -805,6 +908,18 @@ await InvokeRpcAsync<object>(
}

_eventHandlers = ImmutableInterlocked.InterlockedExchange(ref _eventHandlers, ImmutableArray<SessionEventHandler>.Empty);
ShellOutputHandlers = null;
ShellExitHandlers = null;

lock (_trackedProcessIdsLock)
{
foreach (var processId in _trackedProcessIds)
{
_unregisterShellProcess?.Invoke(processId);
}
_trackedProcessIds.Clear();
}

_toolHandlers.Clear();

_permissionHandler = null;
Expand Down
50 changes: 50 additions & 0 deletions dotnet/src/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,54 @@ public class SessionLifecycleEvent
public SessionLifecycleEventMetadata? Metadata { get; set; }
}

// ============================================================================
// Shell Notification Types
// ============================================================================

/// <summary>
/// Notification sent when a shell command produces output.
/// Streamed in chunks (up to 64KB per notification).
/// </summary>
public class ShellOutputNotification
{
/// <summary>
/// Process identifier returned by shell.exec.
/// </summary>
[JsonPropertyName("processId")]
public string ProcessId { get; set; } = string.Empty;

/// <summary>
/// Which output stream produced this chunk ("stdout" or "stderr").
/// </summary>
[JsonPropertyName("stream")]
public string Stream { get; set; } = string.Empty;

/// <summary>
/// The output data (UTF-8 string).
/// </summary>
[JsonPropertyName("data")]
public string Data { get; set; } = string.Empty;
}

/// <summary>
/// Notification sent when a shell command exits.
/// Sent after all output has been streamed.
/// </summary>
public class ShellExitNotification
{
/// <summary>
/// Process identifier returned by shell.exec.
/// </summary>
[JsonPropertyName("processId")]
public string ProcessId { get; set; } = string.Empty;

/// <summary>
/// Process exit code (0 = success).
/// </summary>
[JsonPropertyName("exitCode")]
public int ExitCode { get; set; }
}

/// <summary>
/// Response from session.getForeground
/// </summary>
Expand Down Expand Up @@ -2007,6 +2055,8 @@ public class SetForegroundSessionResponse
[JsonSerializable(typeof(SessionContext))]
[JsonSerializable(typeof(SessionLifecycleEvent))]
[JsonSerializable(typeof(SessionLifecycleEventMetadata))]
[JsonSerializable(typeof(ShellExitNotification))]
[JsonSerializable(typeof(ShellOutputNotification))]
[JsonSerializable(typeof(SessionListFilter))]
[JsonSerializable(typeof(SessionMetadata))]
[JsonSerializable(typeof(SetForegroundSessionResponse))]
Expand Down
44 changes: 44 additions & 0 deletions go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type Client struct {
lifecycleHandlers []SessionLifecycleHandler
typedLifecycleHandlers map[SessionLifecycleEventType][]SessionLifecycleHandler
lifecycleHandlersMux sync.Mutex
shellProcessMap map[string]*Session
shellProcessMapMux sync.Mutex
startStopMux sync.RWMutex // protects process and state during start/[force]stop
processDone chan struct{}
processErrorPtr *error
Expand Down Expand Up @@ -130,6 +132,7 @@ func NewClient(options *ClientOptions) *Client {
options: opts,
state: StateDisconnected,
sessions: make(map[string]*Session),
shellProcessMap: make(map[string]*Session),
actualHost: "localhost",
isExternalServer: false,
useStdio: true,
Expand Down Expand Up @@ -535,6 +538,7 @@ func (c *Client) CreateSession(ctx context.Context, config *SessionConfig) (*Ses
// Create and register the session before issuing the RPC so that
// events emitted by the CLI (e.g. session.start) are not dropped.
session := newSession(sessionID, c.client, "")
session.setShellProcessCallbacks(c.registerShellProcess, c.unregisterShellProcess)

session.registerTools(config.Tools)
session.registerPermissionHandler(config.OnPermissionRequest)
Expand Down Expand Up @@ -648,6 +652,7 @@ func (c *Client) ResumeSessionWithOptions(ctx context.Context, sessionID string,
// Create and register the session before issuing the RPC so that
// events emitted by the CLI (e.g. session.start) are not dropped.
session := newSession(sessionID, c.client, "")
session.setShellProcessCallbacks(c.registerShellProcess, c.unregisterShellProcess)

session.registerTools(config.Tools)
session.registerPermissionHandler(config.OnPermissionRequest)
Expand Down Expand Up @@ -1379,6 +1384,8 @@ func (c *Client) setupNotificationHandler() {
c.client.SetRequestHandler("permission.request", jsonrpc2.RequestHandlerFor(c.handlePermissionRequestV2))
c.client.SetRequestHandler("userInput.request", jsonrpc2.RequestHandlerFor(c.handleUserInputRequest))
c.client.SetRequestHandler("hooks.invoke", jsonrpc2.RequestHandlerFor(c.handleHooksInvoke))
c.client.SetRequestHandler("shell.output", jsonrpc2.NotificationHandlerFor(c.handleShellOutput))
c.client.SetRequestHandler("shell.exit", jsonrpc2.NotificationHandlerFor(c.handleShellExit))
}

func (c *Client) handleSessionEvent(req sessionEventRequest) {
Expand All @@ -1395,6 +1402,43 @@ func (c *Client) handleSessionEvent(req sessionEventRequest) {
}
}

func (c *Client) handleShellOutput(notification ShellOutputNotification) {
c.shellProcessMapMux.Lock()
session, ok := c.shellProcessMap[notification.ProcessID]
c.shellProcessMapMux.Unlock()

if ok {
session.dispatchShellOutput(notification)
}
}

func (c *Client) handleShellExit(notification ShellExitNotification) {
c.shellProcessMapMux.Lock()
session, ok := c.shellProcessMap[notification.ProcessID]
c.shellProcessMapMux.Unlock()

if ok {
session.dispatchShellExit(notification)
// Clean up the mapping after exit
c.shellProcessMapMux.Lock()
delete(c.shellProcessMap, notification.ProcessID)
c.shellProcessMapMux.Unlock()
session.untrackShellProcess(notification.ProcessID)
}
Comment on lines +1405 to +1427
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shell notifications are routed using shellProcessMap, but there is no call site that ever invokes session.trackShellProcess(...) to register a process ID (search shows only the method definition). As-is, handleShellOutput/handleShellExit will almost always find no session and drop notifications. The process ID registration needs to be wired into the API that starts shell commands (track immediately after obtaining processID), or the notification payload should include sessionId so routing doesn’t depend on an out-of-band registration.

Copilot uses AI. Check for mistakes.
}

func (c *Client) registerShellProcess(processID string, session *Session) {
c.shellProcessMapMux.Lock()
c.shellProcessMap[processID] = session
c.shellProcessMapMux.Unlock()
}

func (c *Client) unregisterShellProcess(processID string) {
c.shellProcessMapMux.Lock()
delete(c.shellProcessMap, processID)
c.shellProcessMapMux.Unlock()
}

// handleUserInputRequest handles a user input request from the CLI server.
func (c *Client) handleUserInputRequest(req userInputRequest) (*userInputResponse, *jsonrpc2.Error) {
if req.SessionID == "" || req.Question == "" {
Expand Down
Loading
Loading