diff --git a/hub/src/index.ts b/hub/src/index.ts index a53ae3308..3f6cbc29b 100644 --- a/hub/src/index.ts +++ b/hub/src/index.ts @@ -179,7 +179,8 @@ async function main() { onWebappEvent: (event: SyncEvent) => syncEngine?.handleRealtimeEvent(event), onSessionAlive: (payload) => syncEngine?.handleSessionAlive(payload), onSessionEnd: (payload) => syncEngine?.handleSessionEnd(payload), - onMachineAlive: (payload) => syncEngine?.handleMachineAlive(payload) + onMachineAlive: (payload) => syncEngine?.handleMachineAlive(payload), + onBackgroundTaskDelta: (sessionId, delta) => syncEngine?.handleBackgroundTaskDelta(sessionId, delta) }) syncEngine = new SyncEngine(store, socketServer.io, socketServer.rpcRegistry, sseManager) diff --git a/hub/src/socket/handlers/cli/index.ts b/hub/src/socket/handlers/cli/index.ts index 480ac7def..5c2ec6b57 100644 --- a/hub/src/socket/handlers/cli/index.ts +++ b/hub/src/socket/handlers/cli/index.ts @@ -40,10 +40,11 @@ export type CliHandlersDeps = { onSessionEnd?: (payload: SessionEndPayload) => void onMachineAlive?: (payload: MachineAlivePayload) => void onWebappEvent?: (event: SyncEvent) => void + onBackgroundTaskDelta?: (sessionId: string, delta: { started: number; completed: number }) => void } export function registerCliHandlers(socket: CliSocketWithData, deps: CliHandlersDeps): void { - const { io, store, rpcRegistry, terminalRegistry, onSessionAlive, onSessionEnd, onMachineAlive, onWebappEvent } = deps + const { io, store, rpcRegistry, terminalRegistry, onSessionAlive, onSessionEnd, onMachineAlive, onWebappEvent, onBackgroundTaskDelta } = deps const terminalNamespace = io.of('/terminal') const namespace = typeof socket.data.namespace === 'string' ? socket.data.namespace : null @@ -102,7 +103,8 @@ export function registerCliHandlers(socket: CliSocketWithData, deps: CliHandlers emitAccessError, onSessionAlive, onSessionEnd, - onWebappEvent + onWebappEvent, + onBackgroundTaskDelta }) registerMachineHandlers(socket, { store, diff --git a/hub/src/socket/handlers/cli/sessionHandlers.ts b/hub/src/socket/handlers/cli/sessionHandlers.ts index 67ec014b7..8860421e1 100644 --- a/hub/src/socket/handlers/cli/sessionHandlers.ts +++ b/hub/src/socket/handlers/cli/sessionHandlers.ts @@ -6,6 +6,7 @@ import type { Store, StoredSession } from '../../../store' import type { SyncEvent } from '../../../sync/syncEngine' import { extractTodoWriteTodosFromMessageContent } from '../../../sync/todos' import { extractTeamStateFromMessageContent, applyTeamStateDelta } from '../../../sync/teams' +import { extractBackgroundTaskDelta } from '../../../sync/backgroundTasks' import type { CliSocketWithData } from '../../socketTypes' import type { AccessErrorReason, AccessResult } from './types' @@ -57,10 +58,11 @@ export type SessionHandlersDeps = { onSessionAlive?: (payload: SessionAlivePayload) => void onSessionEnd?: (payload: SessionEndPayload) => void onWebappEvent?: (event: SyncEvent) => void + onBackgroundTaskDelta?: (sessionId: string, delta: { started: number; completed: number }) => void } export function registerSessionHandlers(socket: CliSocketWithData, deps: SessionHandlersDeps): void { - const { store, resolveSessionAccess, emitAccessError, onSessionAlive, onSessionEnd, onWebappEvent } = deps + const { store, resolveSessionAccess, emitAccessError, onSessionAlive, onSessionEnd, onWebappEvent, onBackgroundTaskDelta } = deps socket.on('message', (data: unknown) => { const parsed = messageSchema.safeParse(data) @@ -109,6 +111,11 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session } } + const bgDelta = extractBackgroundTaskDelta(content) + if (bgDelta) { + onBackgroundTaskDelta?.(sid, bgDelta) + } + const update = { id: randomUUID(), seq: msg.seq, diff --git a/hub/src/socket/server.ts b/hub/src/socket/server.ts index bc591a11c..19086a95b 100644 --- a/hub/src/socket/server.ts +++ b/hub/src/socket/server.ts @@ -39,6 +39,7 @@ export type SocketServerDeps = { onSessionAlive?: (payload: { sid: string; time: number; thinking?: boolean; mode?: 'local' | 'remote' }) => void onSessionEnd?: (payload: { sid: string; time: number }) => void onMachineAlive?: (payload: { machineId: string; time: number }) => void + onBackgroundTaskDelta?: (sessionId: string, delta: { started: number; completed: number }) => void } export function createSocketServer(deps: SocketServerDeps): { @@ -113,7 +114,8 @@ export function createSocketServer(deps: SocketServerDeps): { onSessionAlive: deps.onSessionAlive, onSessionEnd: deps.onSessionEnd, onMachineAlive: deps.onMachineAlive, - onWebappEvent: deps.onWebappEvent + onWebappEvent: deps.onWebappEvent, + onBackgroundTaskDelta: deps.onBackgroundTaskDelta })) terminalNs.use(async (socket, next) => { diff --git a/hub/src/sync/backgroundTasks.ts b/hub/src/sync/backgroundTasks.ts new file mode 100644 index 000000000..526a08d7d --- /dev/null +++ b/hub/src/sync/backgroundTasks.ts @@ -0,0 +1,91 @@ +import { isObject } from '@hapi/protocol' +import { unwrapRoleWrappedRecordEnvelope } from '@hapi/protocol/messages' + +/** + * Extract background task start/completion signals from a message. + * + * Uses role-aware parsing to avoid false positives: + * - Started: agent-role output with tool_result containing + * "Command running in background with ID:" + * - Completed: agent-role output wrapping a user-type message (system-injected) + * starting with "" + * + * Both signals arrive as { role: 'agent', content: { type: 'output', data: {...} } } + * because the CLI wraps all messages in agent envelopes. + */ +export function extractBackgroundTaskDelta(messageContent: unknown): { started: number; completed: number } | null { + const record = unwrapRoleWrappedRecordEnvelope(messageContent) + if (!record || record.role !== 'agent') return null + if (!isObject(record.content) || record.content.type !== 'output') return null + + const data = isObject(record.content.data) ? record.content.data : null + if (!data) return null + + const started = countTaskStarts(record.content) + const completed = data.type === 'user' ? countTaskCompletions(data) : 0 + + if (started === 0 && completed === 0) return null + return { started, completed } +} + +/** + * Count background task starts from tool_result blocks. + */ +function countTaskStarts(content: Record): number { + const data = isObject(content.data) ? content.data : null + if (!data) return 0 + + // Direct tool_result + if (data.type === 'tool_result') { + return isBackgroundStartResult(data) ? 1 : 0 + } + + // Assistant message with content array containing tool_result blocks + if (data.type === 'assistant') { + const message = isObject(data.message) ? data.message : null + const modelContent = message?.content + if (!Array.isArray(modelContent)) return 0 + + let count = 0 + for (const block of modelContent) { + if (isObject(block) && block.type === 'tool_result' && isBackgroundStartResult(block)) { + count++ + } + } + return count + } + + return 0 +} + +function isBackgroundStartResult(block: Record): boolean { + const text = typeof block.content === 'string' + ? block.content + : Array.isArray(block.content) + ? block.content.map((c: unknown) => isObject(c) && typeof c.text === 'string' ? c.text : '').join('') + : '' + return text.includes('Command running in background with ID:') +} + +/** + * Count task completions from system-injected user messages. + * + * These arrive as: { type: 'user', message: { content: '...' } } + * inside the agent output envelope. + */ +function countTaskCompletions(data: Record): number { + // { type: 'user', message: { content: '...' } } + if (isObject(data.message)) { + const msg = data.message as Record + if (typeof msg.content === 'string' && msg.content.trimStart().startsWith('')) { + return 1 + } + } + + // { type: 'user', uuid: '...', content: '...' } + if (typeof data.content === 'string' && data.content.trimStart().startsWith('')) { + return 1 + } + + return 0 +} diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 618447c7d..b4abd3865 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -4,6 +4,7 @@ import type { Store } from '../store' import { clampAliveTime } from './aliveTime' import { EventPublisher } from './eventPublisher' import { extractTodoWriteTodosFromMessageContent, TodosSchema } from './todos' +import { extractBackgroundTaskDelta } from './backgroundTasks' export class SessionCache { private readonly sessions: Map = new Map() @@ -131,6 +132,7 @@ export class SessionCache { agentStateVersion: stored.agentStateVersion, thinking: existing?.thinking ?? false, thinkingAt: existing?.thinkingAt ?? 0, + backgroundTaskCount: existing?.backgroundTaskCount ?? 0, todos, teamState, model: stored.model, @@ -230,6 +232,22 @@ export class SessionCache { } } + applyBackgroundTaskDelta(sessionId: string, delta: { started: number; completed: number }): void { + const session = this.sessions.get(sessionId) + if (!session) return + + const prev = session.backgroundTaskCount ?? 0 + const next = Math.max(0, prev + delta.started - delta.completed) + if (next === prev) return + + session.backgroundTaskCount = next + this.publisher.emit({ + type: 'session-updated', + sessionId, + data: { backgroundTaskCount: next } + }) + } + handleSessionEnd(payload: { sid: string; time: number }): void { const t = clampAliveTime(payload.time) ?? Date.now() @@ -243,8 +261,9 @@ export class SessionCache { session.active = false session.thinking = false session.thinkingAt = t + session.backgroundTaskCount = 0 - this.publisher.emit({ type: 'session-updated', sessionId: session.id, data: { active: false, thinking: false } }) + this.publisher.emit({ type: 'session-updated', sessionId: session.id, data: { active: false, thinking: false, backgroundTaskCount: 0 } }) } expireInactive(now: number = Date.now()): void { diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index 6b5be2f1c..ffc5792bf 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -198,6 +198,10 @@ export class SyncEngine { this.sessionCache.handleSessionEnd(payload) } + handleBackgroundTaskDelta(sessionId: string, delta: { started: number; completed: number }): void { + this.sessionCache.applyBackgroundTaskDelta(sessionId, delta) + } + handleMachineAlive(payload: { machineId: string; time: number }): void { this.machineCache.handleMachineAlive(payload) } diff --git a/shared/src/schemas.ts b/shared/src/schemas.ts index 52ec83737..01db07cd3 100644 --- a/shared/src/schemas.ts +++ b/shared/src/schemas.ts @@ -172,6 +172,7 @@ export const SessionSchema = z.object({ agentStateVersion: z.number(), thinking: z.boolean(), thinkingAt: z.number(), + backgroundTaskCount: z.number().optional(), todos: TodosSchema.optional(), teamState: TeamStateSchema.optional(), model: z.string().nullable(), diff --git a/web/src/components/AssistantChat/HappyComposer.tsx b/web/src/components/AssistantChat/HappyComposer.tsx index 101cca2d6..927c778e8 100644 --- a/web/src/components/AssistantChat/HappyComposer.tsx +++ b/web/src/components/AssistantChat/HappyComposer.tsx @@ -48,6 +48,7 @@ export function HappyComposer(props: { allowSendWhenInactive?: boolean thinking?: boolean agentState?: AgentState | null + backgroundTaskCount?: number contextSize?: number controlledByUser?: boolean agentFlavor?: string | null @@ -77,6 +78,7 @@ export function HappyComposer(props: { allowSendWhenInactive = false, thinking = false, agentState, + backgroundTaskCount, contextSize, controlledByUser = false, agentFlavor, @@ -668,6 +670,7 @@ export function HappyComposer(props: { active={active} thinking={thinking} agentState={agentState} + backgroundTaskCount={backgroundTaskCount} contextSize={contextSize} model={model} permissionMode={permissionMode} diff --git a/web/src/components/AssistantChat/StatusBar.tsx b/web/src/components/AssistantChat/StatusBar.tsx index e026c99f9..4527311bc 100644 --- a/web/src/components/AssistantChat/StatusBar.tsx +++ b/web/src/components/AssistantChat/StatusBar.tsx @@ -42,6 +42,7 @@ function getConnectionStatus( thinking: boolean, agentState: AgentState | null | undefined, voiceStatus: ConversationStatus | undefined, + backgroundTaskCount: number, t: (key: string) => string ): { text: string; color: string; dotColor: string; isPulsing: boolean } { const hasPermissions = agentState?.requests && Object.keys(agentState.requests).length > 0 @@ -84,6 +85,15 @@ function getConnectionStatus( } } + if (backgroundTaskCount > 0) { + return { + text: `${backgroundTaskCount} background task${backgroundTaskCount > 1 ? 's' : ''} running`, + color: 'text-[#007AFF]', + dotColor: 'bg-[#007AFF]', + isPulsing: true + } + } + return { text: t('misc.online'), color: 'text-[#34C759]', @@ -110,6 +120,7 @@ export function StatusBar(props: { active: boolean thinking: boolean agentState: AgentState | null | undefined + backgroundTaskCount?: number contextSize?: number model?: string | null permissionMode?: PermissionMode @@ -119,8 +130,8 @@ export function StatusBar(props: { }) { const { t } = useTranslation() const connectionStatus = useMemo( - () => getConnectionStatus(props.active, props.thinking, props.agentState, props.voiceStatus, t), - [props.active, props.thinking, props.agentState, props.voiceStatus, t] + () => getConnectionStatus(props.active, props.thinking, props.agentState, props.voiceStatus, props.backgroundTaskCount ?? 0, t), + [props.active, props.thinking, props.agentState, props.voiceStatus, props.backgroundTaskCount, t] ) const contextWarning = useMemo( diff --git a/web/src/components/SessionChat.tsx b/web/src/components/SessionChat.tsx index 841286245..17dd463c3 100644 --- a/web/src/components/SessionChat.tsx +++ b/web/src/components/SessionChat.tsx @@ -379,6 +379,7 @@ export function SessionChat(props: { allowSendWhenInactive thinking={props.session.thinking} agentState={props.session.agentState} + backgroundTaskCount={props.session.backgroundTaskCount} contextSize={reduced.latestUsage?.contextSize} controlledByUser={controlledByUser} onCollaborationModeChange={