Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions res/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ Apify input-schema spec semantics (required vs default vs prefill), the root cau
Notes on keeping widget bundles small (narrow `@apify/ui-library` imports, markdown stack cost).
- **Use case**: When changing widget dependencies or markdown rendering, re-measure bundle impact

### [task_status_workaround.md](./task_status_workaround.md)
Why all task results (including errors) are stored as `'completed'` instead of `'failed'`.
- SDK `requestStream()` discards stored results for `'failed'` tasks
- `[error]` prefix convention in `statusMessage` to signal the real outcome
- Upstream fixes needed in the MCP SDK and mcpc
- **Use case**: Reference when touching task status logic or updating the MCP SDK

### [chatgpt-app-submission.md](./chatgpt-app-submission.md)
Checklist and notes for ChatGPT MCP Apps store submission (verify line references against current source before relying on them).
- **Use case**: Submission prep and audits
Expand Down
111 changes: 111 additions & 0 deletions res/task_status_workaround.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Task status workaround: why errors are stored as 'completed'

## The problem

The MCP SDK's `requestStream()` (in `shared/protocol.js`) handles terminal task states differently:

- **`'completed'`** → calls `getTaskResult()` → yields `{ type: 'result', result }` → client gets the full `CallToolResult`
- **`'failed'`** → yields `{ type: 'error', error: "Task {id} failed" }` → client gets a generic error, **stored result is discarded**
- **`'cancelled'`** → same as failed, generic error

This means any result stored with `status: 'failed'` is never delivered to the client. The actual error text, x402 payment payload, structured content — all lost.

## What we do

We store **all** task results as `'completed'`, including errors. The error nature is conveyed through:

1. **`isError: true`** in the `CallToolResult` payload — this is what clients (mcpc, Claude, Cursor) use to determine success/failure
2. **`[error]` prefix** in `statusMessage` — so `tasks/list` and `tasks/get` polling clearly shows the task failed

### Affected paths in `executeToolAndUpdateTask()`

| Path | Result | Status | statusMessage |
|---|---|---|---|
| Success | tool output | `'completed'` | `tool-name: completed` |
| SOFT_FAIL (actor not found, validation) | error text + `isError: true` | `'completed'` | `[error] tool-name: Actor not found...` |
| Payment required (pre-flight) | x402 payload + `isError: true` | `'completed'` | `[error] tool-name: payment required` |
| Payment required (catch 402) | x402 payload + `isError: true` | `'completed'` | `[error] tool-name: payment required` |
| Hard error (5xx, network, etc.) | error text + `isError: true` | `'completed'` | `[error] tool-name: error text...` |
| Aborted (signal) | none | `'cancelled'` | `tool-name: aborted by client` |
Comment thread
jirispilka marked this conversation as resolved.

### Why x402 payment specifically requires 'completed'

The mcpc bridge's `handlePaymentRequiredRetry()` inspects the tool result for `isError: true` + `x402Version` + `accepts` fields. If the task is stored as `'failed'`, the SDK never delivers the result, and the auto-pay retry never triggers.

## What should be fixed upstream

### SDK: `requestStream()` should deliver results for 'failed' tasks

Location: `@modelcontextprotocol/sdk/shared/protocol.js`, lines ~566-583

```javascript
// Current behavior (broken):
if (task.status === 'failed') {
yield { type: 'error', error: new McpError(ErrorCode.InternalError, `Task ${taskId} failed`) };
}

// Desired behavior:
if (task.status === 'failed') {
const result = await this.getTaskResult({ taskId }, resultSchema, options);
yield { type: 'result', result }; // result has isError: true
}
```

The `'failed'` status was stored alongside a result via `storeTaskResult()`. The SDK should deliver that result, not discard it. The `isError` flag in the result already tells the client it's an error.

Alternatively, `requestStream()` could yield both the result and a status indicator, but the simplest fix is to treat `'failed'` the same as `'completed'` for result delivery.

### SDK: `requestStream()` 'failed' error should include statusMessage

Even without delivering the full result, the generic `"Task {id} failed"` message should at least include `task.statusMessage`:

```javascript
yield {
type: 'error',
error: new McpError(ErrorCode.InternalError, task.statusMessage || `Task ${taskId} failed`)
};
```

### SDK: `storeTaskResult()` should accept a statusMessage

Currently we use `storeTaskResultWithMessage()` which calls `updateTaskStatus('working', message)` then `storeTaskResult(status, result)` — two non-atomic calls. The SDK's `storeTaskResult()` should accept an optional `statusMessage` parameter to make this atomic.

This also leaves a small race with `tasks/cancel`: after the tool has already produced its payload, the task is briefly back in `'working'`, so a concurrent cancel can still win and the computed result is lost.

### mcpc: `pollTask()` should fetch result for 'completed' tasks

**Scope:** `pollTask()` is the detached task polling fallback, used when a task was started via
`callToolDetached()` and polled manually later. The normal `callTool()` path is **not affected** —
it uses `callToolStream()` → SDK `requestStream()` → correctly calls `getTaskResult()` for
`'completed'` tasks.

Location: `mcp-cli/src/core/mcp-client.ts`, lines ~715-720

```javascript
// Current behavior (incomplete — detached polling only):
if (task.status === 'completed') {
return { content: [{ type: 'text', text: task.statusMessage || 'Task completed' }] };
}

// Desired behavior:
if (task.status === 'completed') {
const result = await this.getTaskResult(taskId);
return result;
}
```

The detached polling fallback returns `statusMessage` as fake content instead of fetching the actual
stored result via `tasks/result`. The actual tool output (dataset items, actor run info, etc.) is lost.

**Normal mcpc path is correct:**
```
mcpc callTool → callToolStream → requestStream → completed → getTaskResult ✅
mcpc pollTask → tasks/get loop → completed → statusMessage as content ❌ (detached only)
```

## When to remove this workaround

Once the SDK's `requestStream()` delivers results for `'failed'` tasks, we can:
1. Store errors as `'failed'` (semantically correct)
2. Remove the `[error]` prefix from statusMessages
3. Remove `storeTaskResultWithMessage()` if `storeTaskResult()` accepts statusMessage
2 changes: 2 additions & 0 deletions src/dev_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ if (process.argv[1] === fileURLToPath(import.meta.url)) {
process.exit(1);
}

log.setLevel(process.env.LOG_LEVEL === 'info' ? log.LEVELS.INFO : log.LEVELS.DEBUG);
Comment thread
jirispilka marked this conversation as resolved.

Comment thread
jirispilka marked this conversation as resolved.
const HOST = process.env.HOST ?? 'http://localhost';
const PORT = Number(process.env.PORT ?? 3001);

Expand Down
114 changes: 92 additions & 22 deletions src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { InMemoryTaskStore } from '@modelcontextprotocol/sdk/experimental/tasks/
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import type { RequestHandlerExtra } from '@modelcontextprotocol/sdk/shared/protocol.js';
import type { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import type { InitializeRequest, Notification, Request } from '@modelcontextprotocol/sdk/types.js';
import type { InitializeRequest, Notification, Request, Result } from '@modelcontextprotocol/sdk/types.js';
import {
CallToolRequestSchema,
CallToolResultSchema,
Expand Down Expand Up @@ -80,7 +80,7 @@ import { getUserIdFromTokenCached } from '../utils/userid_cache.js';
import { getPackageVersion } from '../utils/version.js';
import { connectMCPClient } from './client.js';
import { EXTERNAL_TOOL_CALL_TIMEOUT_MSEC, LOG_LEVEL_MAP } from './const.js';
import { isTaskCancelled, processParamsGetTools } from './utils.js';
import { isTaskCancelled, processParamsGetTools, storeTaskResultWithMessage } from './utils.js';

/** Mode → actor executor. Add new modes here. */
const actorExecutorsByMode: Record<ServerMode, ActorExecutor> = {
Expand Down Expand Up @@ -603,7 +603,12 @@ export class ActorsMcpServer {
`Cannot cancel task "${taskId}" with status "${task.status}"`,
);
}
await this.taskStore.updateTaskStatus(taskId, 'cancelled', 'Cancelled by client', mcpSessionId);
// Extract tool/actor prefix from the last statusMessage (e.g. "apify/rag-web-browser: Starting...")
// Strip leading "[error] " so cancellation doesn't inherit the error marker.
const normalizedMessage = task.statusMessage?.replace(/^\[error\]\s*/i, '').trim();
const toolPrefix = normalizedMessage?.split(':')?.[0];
const cancelMessage = toolPrefix ? `${toolPrefix}: cancelled by client` : 'Cancelled by client';
await this.taskStore.updateTaskStatus(taskId, 'cancelled', cancelMessage, mcpSessionId);
Comment thread
jirispilka marked this conversation as resolved.
const updatedTask = await this.taskStore.getTask(taskId, mcpSessionId);
log.debug('[CancelTaskRequestSchema] Task cancelled successfully', { taskId, mcpSessionId });
return updatedTask!;
Expand Down Expand Up @@ -1129,9 +1134,11 @@ export class ActorsMcpServer {
mcpSessionId,
});

const toolFullName = getToolFullName(tool);

// Prepare telemetry before try-catch so it's accessible to both paths.
// This avoids re-fetching user data in the error handler.
const { telemetryData, userId } = await this.prepareTelemetryData(getToolFullName(tool), mcpSessionId, apifyToken);
const { telemetryData, userId } = await this.prepareTelemetryData(toolFullName, mcpSessionId, apifyToken);

const finishTaskTracking = (status: ToolStatus, diagnostics?: CallDiagnostics) => {
this.logToolCallAndTelemetry({
Expand All @@ -1145,6 +1152,46 @@ export class ActorsMcpServer {
callDiagnostics: diagnostics,
});
};
const errorMsg = (detail: string) => `[error] ${toolFullName}: ${detail}`;
const truncate = (text: string, fallback = 'failed') => {
if (!text) return fallback;
return text.length > 200 ? `${text.slice(0, 200)}… (truncated)` : text;
};

// Stores a task result with cancellation-race guard. If the task was concurrently
// cancelled, storeTaskResultWithMessage throws (terminal state can't transition to 'working').
// Returns false if cancelled (tracking already done), true otherwise.
const safeStoreResult = async (result: Result, statusMessage: string): Promise<boolean> => {
try {
await storeTaskResultWithMessage(this.taskStore, taskId, 'completed', result, statusMessage, mcpSessionId);
return true;
} catch (storeError) {
if (await isTaskCancelled(taskId, mcpSessionId, this.taskStore)) {
log.debug('[executeToolAndUpdateTask] Task was cancelled during result storage, treating as aborted', {
taskId, mcpSessionId,
});
finishTaskTracking(TOOL_STATUS.ABORTED, callDiagnostics);
return false;
}
const failureMessage = errorMsg('failed to store task result');
log.error('[executeToolAndUpdateTask] Failed to store task result', {
taskId, mcpSessionId, error: storeError,
});
try {
await this.taskStore.updateTaskStatus(taskId, 'failed', failureMessage, mcpSessionId);
} catch (statusError) {
log.error('[executeToolAndUpdateTask] Failed to mark task as failed after result storage error', {
taskId, mcpSessionId, error: statusError,
});
}
finishTaskTracking(TOOL_STATUS.FAILED, {
...callDiagnostics,
failure_category: FAILURE_CATEGORY.INTERNAL_ERROR,
failure_detail: 'Failed to store task result',
});
return false;
}
Comment thread
jirispilka marked this conversation as resolved.
};

try {
// Check if task was already cancelled before we start execution.
Expand Down Expand Up @@ -1259,21 +1306,49 @@ export class ActorsMcpServer {
return;
}

// Store the result in the task store
log.debug('[executeToolAndUpdateTask] Storing completed result', {
taskId,
mcpSessionId,
});
await this.taskStore.storeTaskResult(taskId, 'completed', result, mcpSessionId);
log.debug('Task completed successfully', { taskId, toolName: tool.name, mcpSessionId });
// Persist every non-aborted result. Task-supporting tools like call-actor can return
// isError/toolStatus without throwing, and task mode must store that payload the same
// way non-task mode returns it.
log.debug('Storing task result', { taskId, toolStatus, mcpSessionId });
if (toolStatus === TOOL_STATUS.ABORTED) {
// Guard: a concurrent tasks/cancel request may have already transitioned the task
// to 'cancelled' between the isTaskCancelled check above and here. Skip if already terminal.
if (!await isTaskCancelled(taskId, mcpSessionId, this.taskStore)) {
await this.taskStore.updateTaskStatus(taskId, 'cancelled', `${toolFullName}: aborted by client`, mcpSessionId);
}
} else {
// Always store as 'completed' — even for SOFT_FAIL and paymentRequired.
// The MCP SDK's requestStream() only calls getTaskResult() for 'completed' tasks;
// 'failed' tasks yield a generic "Task X failed" error and discard the stored result.
// Error details are preserved in the result payload (isError: true, content text).
// See res/task_status_workaround.md for full context.
let statusMessage: string;
if (paymentRequiredResult) {
statusMessage = errorMsg('payment required');
} else if (toolStatus === TOOL_STATUS.SOFT_FAIL) {
const fullText = (result as { content?: { text?: string }[] })?.content?.[0]?.text || '';
statusMessage = errorMsg(truncate(fullText));
} else {
statusMessage = `${toolFullName}: completed`;
}
if (!await safeStoreResult(result, statusMessage)) return;
}
log.debug('Task execution finished', { taskId, toolName: tool.name, toolStatus, mcpSessionId });

finishTaskTracking(toolStatus, callDiagnostics);
} catch (error) {
// Handle 402 Payment Required — return structured x402 result so clients can auto-pay
const httpStatus = getHttpStatusCode(error);
if (httpStatus === HTTP_PAYMENT_REQUIRED) {
logHttpError(error, 'Payment required while calling tool (task mode)', { toolName: tool.name });
await this.taskStore.storeTaskResult(taskId, 'completed', buildPaymentRequiredResponse(error), mcpSessionId);
// Guard: task may have been cancelled while the actor was running.
if (await isTaskCancelled(taskId, mcpSessionId, this.taskStore)) {
finishTaskTracking(TOOL_STATUS.ABORTED, { ...buildActorFields(actorName, actorId) });
return;
}
// Store as 'completed' — see comment in the try block and res/task_status_workaround.md.
const paymentResult = buildPaymentRequiredResponse(error);
if (!await safeStoreResult(paymentResult, errorMsg('payment required'))) return;
finishTaskTracking(TOOL_STATUS.SOFT_FAIL, {
failure_category: FAILURE_CATEGORY.INVALID_INPUT,
Comment thread
jirispilka marked this conversation as resolved.
failure_http_status: 402,
Expand Down Expand Up @@ -1333,18 +1408,13 @@ export class ActorsMcpServer {
return;
}

log.debug('[executeToolAndUpdateTask] Storing failed result', {
taskId,
mcpSessionId,
});
await this.taskStore.storeTaskResult(taskId, 'failed', {
content: [{
type: 'text' as const,
text: userText,
}],
// Store as 'completed' so the SDK's requestStream() delivers the result to the client.
// See res/task_status_workaround.md for full context.
if (!await safeStoreResult({
content: [{ type: 'text' as const, text: userText }],
isError: true,
internalToolStatus: toolStatus,
}, mcpSessionId);
}, errorMsg(truncate(userText)))) return;

finishTaskTracking(toolStatus, callDiagnostics);
}
Expand Down
24 changes: 24 additions & 0 deletions src/mcp/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { parse } from 'node:querystring';

import type { TaskStore } from '@modelcontextprotocol/sdk/experimental/tasks/interfaces.js';
import type { Result } from '@modelcontextprotocol/sdk/types.js';
import type { ApifyClient } from 'apify-client';

import { processInput } from '../input.js';
Expand Down Expand Up @@ -47,3 +48,26 @@ export async function isTaskCancelled(
const task = await taskStore.getTask(taskId, mcpSessionId);
return task?.status === 'cancelled';
}

/**
* Stores a task result with a final statusMessage in one logical step.
*
* WARNING: This is NOT atomic. The SDK's storeTaskResult() does not accept a statusMessage,
* so we first call updateTaskStatus('working', message) then storeTaskResult(status, result).
* That creates a race: after the tool has already finished, tasks/cancel can still win before
* storeTaskResult() runs, and the computed result is lost. We keep this workaround so tasks/list
* shows the final statusMessage until the SDK supports atomic result + statusMessage storage.
*/
export async function storeTaskResultWithMessage(
taskStore: TaskStore,
taskId: string,
// Always 'completed' — the SDK's requestStream() only delivers results for 'completed' tasks;
// 'failed' tasks yield a generic error and discard the stored result. See res/task_status_workaround.md.
status: 'completed',
result: Result,
Comment thread
jirispilka marked this conversation as resolved.
statusMessage: string,
Comment thread
jirispilka marked this conversation as resolved.
sessionId?: string,
): Promise<void> {
await taskStore.updateTaskStatus(taskId, 'working', statusMessage, sessionId);
await taskStore.storeTaskResult(taskId, status, result, sessionId);
}
Loading
Loading