diff --git a/apps/sim/instrumentation-node.ts b/apps/sim/instrumentation-node.ts index 68ef65d0e6d..fae2a3a2651 100644 --- a/apps/sim/instrumentation-node.ts +++ b/apps/sim/instrumentation-node.ts @@ -63,6 +63,25 @@ function normalizeOtlpTracesUrl(url: string): string { } } +// Metrics counterpart to `normalizeOtlpTracesUrl`. Operates on the parsed +// pathname (not a raw string suffix) so query strings and trailing slashes +// don't corrupt the result: swap a `/v1/traces` suffix for `/v1/metrics`, +// otherwise append `/v1/metrics`. +function normalizeOtlpMetricsUrl(url: string): string { + if (!url) return url + try { + const u = new URL(url) + const path = u.pathname.replace(/\/$/, '') + if (path.endsWith('/v1/metrics')) return url + u.pathname = path.endsWith('/v1/traces') + ? path.replace(/\/v1\/traces$/, '/v1/metrics') + : `${path}/v1/metrics` + return u.toString() + } catch { + return url + } +} + // Sampling ratio from env (mirrors Go's `samplerFromEnv`); fallback // is 100% everywhere. Retention caps cost, not sampling. function resolveSamplingRatio(_isLocalEndpoint: boolean): number { @@ -144,6 +163,8 @@ async function initializeOpenTelemetry() { '@opentelemetry/semantic-conventions/incubating' ) const { OTLPTraceExporter } = await import('@opentelemetry/exporter-trace-otlp-http') + const { OTLPMetricExporter } = await import('@opentelemetry/exporter-metrics-otlp-http') + const { PeriodicExportingMetricReader } = await import('@opentelemetry/sdk-metrics') const { BatchSpanProcessor } = await import('@opentelemetry/sdk-trace-node') const { TraceIdRatioBasedSampler, SamplingDecision } = await import( '@opentelemetry/sdk-trace-base' @@ -226,6 +247,18 @@ async function initializeOpenTelemetry() { exportTimeoutMillis: telemetryConfig.batchSettings.exportTimeoutMillis, }) + // Metrics (hosted-key counters/histograms) share the trace endpoint and + // headers — only the signal path differs. Unlike spans these aren't sampled. + const metricReader = new PeriodicExportingMetricReader({ + exporter: new OTLPMetricExporter({ + url: normalizeOtlpMetricsUrl(telemetryConfig.endpoint), + headers: otlpHeaders, + timeoutMillis: Math.min(telemetryConfig.batchSettings.exportTimeoutMillis, 10000), + keepAlive: false, + }), + exportIntervalMillis: 60000, + }) + // Unique instance id per origin keeps Jaeger's clock-skew adjuster // from grouping Sim+Go spans together (they'd see multi-second // drift as intra-service and emit spurious warnings). @@ -268,6 +301,7 @@ async function initializeOpenTelemetry() { resource, spanProcessors, sampler, + metricReader, }) sdk.start() diff --git a/apps/sim/lib/core/telemetry.ts b/apps/sim/lib/core/telemetry.ts index 0ae2be8c1d9..4eb06aa112e 100644 --- a/apps/sim/lib/core/telemetry.ts +++ b/apps/sim/lib/core/telemetry.ts @@ -21,6 +21,7 @@ import { createLogger } from '@sim/logger' import { getErrorMessage, toError } from '@sim/utils/errors' import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1' import type { TraceSpan } from '@/lib/logs/types' +import { hostedKeyMetrics } from '@/lib/monitoring/metrics' /** * GenAI Semantic Convention Attributes @@ -954,14 +955,10 @@ export const PlatformEvents = { workspaceId?: string workflowId?: string }) => { - trackPlatformEvent('platform.hosted_key.user_throttled', { - 'tool.id': attrs.toolId, - 'throttle.reason': attrs.reason, - ...(attrs.provider && { 'provider.id': attrs.provider }), - ...(attrs.retryAfterMs != null && { 'rate_limit.retry_after_ms': attrs.retryAfterMs }), - ...(attrs.userId && { 'user.id': attrs.userId }), - ...(attrs.workspaceId && { 'workspace.id': attrs.workspaceId }), - ...(attrs.workflowId && { 'workflow.id': attrs.workflowId }), + hostedKeyMetrics.recordThrottled({ + provider: attrs.provider ?? attrs.toolId, + tool: attrs.toolId, + reason: attrs.reason, }) }, @@ -978,16 +975,7 @@ export const PlatformEvents = { workspaceId?: string workflowId?: string }) => { - trackPlatformEvent('platform.hosted_key.rate_limited', { - 'tool.id': attrs.toolId, - 'hosted_key.env_var': attrs.envVarName, - 'rate_limit.attempt': attrs.attempt, - 'rate_limit.max_retries': attrs.maxRetries, - 'rate_limit.delay_ms': attrs.delayMs, - ...(attrs.userId && { 'user.id': attrs.userId }), - ...(attrs.workspaceId && { 'workspace.id': attrs.workspaceId }), - ...(attrs.workflowId && { 'workflow.id': attrs.workflowId }), - }) + hostedKeyMetrics.recordUpstreamRateLimited({ tool: attrs.toolId, key: attrs.envVarName }) }, hostedKeyUnknownModelCost: (attrs: { @@ -995,11 +983,7 @@ export const PlatformEvents = { modelName: string defaultCost: number }) => { - trackPlatformEvent('platform.hosted_key.unknown_model_cost', { - 'tool.id': attrs.toolId, - 'model.name': attrs.modelName, - 'cost.default_cost': attrs.defaultCost, - }) + hostedKeyMetrics.recordUnknownModelCost({ tool: attrs.toolId }) }, /** @@ -1016,14 +1000,9 @@ export const PlatformEvents = { dimension?: string queuePosition?: number }) => { - trackPlatformEvent('platform.hosted_key.queue_waited', { - 'provider.id': attrs.provider, - 'workspace.id': attrs.workspaceId, - 'queue.waited_ms': attrs.waitedMs, - 'queue.attempts': attrs.attempts, - 'queue.reason': attrs.reason, - ...(attrs.dimension && { 'queue.dimension': attrs.dimension }), - ...(attrs.queuePosition != null && { 'queue.position': attrs.queuePosition }), + hostedKeyMetrics.recordQueueWait(attrs.waitedMs, { + provider: attrs.provider, + reason: attrs.reason, }) }, @@ -1037,12 +1016,9 @@ export const PlatformEvents = { reason: 'actor_requests' | 'dimension' | 'queue_position' dimension?: string }) => { - trackPlatformEvent('platform.hosted_key.queue_wait_exceeded', { - 'provider.id': attrs.provider, - 'workspace.id': attrs.workspaceId, - 'queue.waited_ms': attrs.waitedMs, - 'queue.reason': attrs.reason, - ...(attrs.dimension && { 'queue.dimension': attrs.dimension }), + hostedKeyMetrics.recordQueueWaitExceeded({ + provider: attrs.provider, + reason: attrs.reason, }) }, diff --git a/apps/sim/lib/monitoring/metrics.ts b/apps/sim/lib/monitoring/metrics.ts new file mode 100644 index 00000000000..7eaad52c1b5 --- /dev/null +++ b/apps/sim/lib/monitoring/metrics.ts @@ -0,0 +1,139 @@ +/** + * Hosted-key OTel metrics. + * + * Point events (usage, cost, throttles, queue waits) are emitted as metrics — + * not spans — so they bypass trace sampling and survive aggregation. Reads the + * global MeterProvider, which the Next.js app registers in `instrumentation-node.ts` + * and trigger.dev registers from `trigger.config.ts`; with no provider the API + * returns a no-op meter, so these recorders are always safe to call. + * + * Labels stay low-cardinality (provider, tool, reason, key). `key` is the env var + * NAME of the chosen hosted key (e.g. `PERPLEXITY_API_KEY_2`) — never the secret — + * and the pool is operator-managed, so it's safe to label. Per-workspace/user cost + * lives exactly in the `usage_log` table — never put those on metric labels. + */ + +import { type Counter, type Histogram, metrics } from '@opentelemetry/api' + +const METER_NAME = 'sim.hosted-key' +const METER_VERSION = '1.0.0' + +type ThrottleReason = 'billing_actor_limit' | 'upstream_retries_exhausted' +type QueueReason = 'actor_requests' | 'dimension' | 'queue_position' +type FailureReason = 'rate_limited' | 'auth' | 'other' + +let meter: ReturnType | undefined +let usedCounter: Counter | undefined +let failedCounter: Counter | undefined +let costCounter: Counter | undefined +let throttledCounter: Counter | undefined +let upstreamRateLimitedCounter: Counter | undefined +let queueWaitHistogram: Histogram | undefined +let queueWaitExceededCounter: Counter | undefined +let unknownModelCostCounter: Counter | undefined + +function getMeter() { + if (!meter) meter = metrics.getMeter(METER_NAME, METER_VERSION) + return meter +} + +function getUsedCounter() { + if (!usedCounter) { + usedCounter = getMeter().createCounter('hosted_key.used', { + description: 'Successful tool executions backed by a Sim-hosted API key', + }) + } + return usedCounter +} + +function getFailedCounter() { + if (!failedCounter) { + failedCounter = getMeter().createCounter('hosted_key.failed', { + description: 'Failed tool executions backed by a Sim-hosted API key', + }) + } + return failedCounter +} + +function getCostCounter() { + if (!costCounter) { + costCounter = getMeter().createCounter('hosted_key.cost_charged', { + description: 'Dollar cost charged to the billing actor for hosted-key usage', + unit: 'USD', + }) + } + return costCounter +} + +function getThrottledCounter() { + if (!throttledCounter) { + throttledCounter = getMeter().createCounter('hosted_key.throttled', { + description: 'Rate-limit errors surfaced to the end user (not retried/absorbed)', + }) + } + return throttledCounter +} + +function getUpstreamRateLimitedCounter() { + if (!upstreamRateLimitedCounter) { + upstreamRateLimitedCounter = getMeter().createCounter('hosted_key.upstream_rate_limited', { + description: 'Upstream provider 429s absorbed via retry/backoff', + }) + } + return upstreamRateLimitedCounter +} + +function getQueueWaitHistogram() { + if (!queueWaitHistogram) { + queueWaitHistogram = getMeter().createHistogram('hosted_key.queue_wait_duration', { + description: 'Time a hosted-key acquisition spent waiting in the per-workspace queue/bucket', + unit: 'ms', + }) + } + return queueWaitHistogram +} + +function getQueueWaitExceededCounter() { + if (!queueWaitExceededCounter) { + queueWaitExceededCounter = getMeter().createCounter('hosted_key.queue_wait_exceeded', { + description: 'Hosted-key acquisitions that exceeded the queue wait cap and fell back to 429', + }) + } + return queueWaitExceededCounter +} + +function getUnknownModelCostCounter() { + if (!unknownModelCostCounter) { + unknownModelCostCounter = getMeter().createCounter('hosted_key.unknown_model_cost', { + description: 'Hosted-key cost calculations that fell back to a default for an unmapped model', + }) + } + return unknownModelCostCounter +} + +export const hostedKeyMetrics = { + recordUsed(labels: { provider: string; tool: string; key: string }) { + getUsedCounter().add(1, labels) + }, + recordFailed(labels: { provider: string; tool: string; key: string; reason: FailureReason }) { + getFailedCounter().add(1, labels) + }, + recordCostCharged(costUsd: number, labels: { provider: string; tool: string }) { + if (costUsd > 0) getCostCounter().add(costUsd, labels) + }, + recordThrottled(labels: { provider: string; tool: string; reason: ThrottleReason }) { + getThrottledCounter().add(1, labels) + }, + recordUpstreamRateLimited(labels: { tool: string; key: string }) { + getUpstreamRateLimitedCounter().add(1, labels) + }, + recordQueueWait(durationMs: number, labels: { provider: string; reason: QueueReason }) { + getQueueWaitHistogram().record(durationMs, labels) + }, + recordQueueWaitExceeded(labels: { provider: string; reason: QueueReason }) { + getQueueWaitExceededCounter().add(1, labels) + }, + recordUnknownModelCost(labels: { tool: string }) { + getUnknownModelCostCounter().add(1, labels) + }, +} diff --git a/apps/sim/package.json b/apps/sim/package.json index d11921af525..29c196333ee 100644 --- a/apps/sim/package.json +++ b/apps/sim/package.json @@ -70,6 +70,7 @@ "@opentelemetry/exporter-metrics-otlp-http": "^0.217.0", "@opentelemetry/exporter-trace-otlp-http": "^0.217.0", "@opentelemetry/resources": "^2.7.0", + "@opentelemetry/sdk-metrics": "^2.7.0", "@opentelemetry/sdk-node": "^0.217.0", "@opentelemetry/sdk-trace-base": "^2.7.0", "@opentelemetry/sdk-trace-node": "^2.7.0", diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 40db9898868..0cabe09af66 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -22,6 +22,7 @@ import { isUserFile } from '@/lib/core/utils/user-file' import { isSameOrigin } from '@/lib/core/utils/validation' import { SIM_VIA_HEADER, serializeCallChain } from '@/lib/execution/call-chain' import { parseMcpToolId } from '@/lib/mcp/utils' +import { hostedKeyMetrics } from '@/lib/monitoring/metrics' import { resolveWorkspaceFileReference } from '@/lib/uploads/contexts/workspace/workspace-file-manager' import { assertPermissionsAllowed } from '@/ee/access-control/utils/permission-check' import { isCustomTool, isMcpTool } from '@/executor/constants' @@ -334,11 +335,11 @@ async function reacquireHostedKey( params: Record, executionContext: ExecutionContext | undefined, requestId: string -): Promise { - if (!tool.hosting) return false +): Promise { + if (!tool.hosting) return null const { envKeyPrefix, apiKeyParam, byokProviderId, rateLimit } = tool.hosting const { workspaceId } = resolveToolScope(params, executionContext) - if (!workspaceId) return false + if (!workspaceId) return null const provider = byokProviderId || tool.id const acquireResult = await getHostedKeyRateLimiter().acquireKey( @@ -353,14 +354,14 @@ async function reacquireHostedKey( logger.warn( `[${requestId}] Re-acquire of hosted key for ${tool.id} failed: ${acquireResult.error ?? 'unknown'}` ) - return false + return null } params[apiKeyParam] = acquireResult.key logger.info( `[${requestId}] Re-acquired hosted key for ${tool.id} (${acquireResult.envVarName}) after upstream throttling` ) - return true + return acquireResult.envVarName ?? 'unknown' } /** @@ -383,10 +384,27 @@ function isRateLimitError(error: unknown): boolean { return false } +/** + * Map a thrown tool error to a hosted-key failure reason for metrics. Mirrors + * `isRateLimitError`: some providers signal quota/rate-limit via 401/403 with a + * descriptive message, so those count as `rate_limited`, not `auth`. + */ +function classifyHostedKeyFailure(error: unknown): 'rate_limited' | 'auth' | 'other' { + const status = (error as { status?: number } | null)?.status + if (status === 429 || status === 503) return 'rate_limited' + if (status === 401 || status === 403) { + const message = ((error as { message?: string } | null)?.message ?? '').toLowerCase() + if (message.includes('quota') || message.includes('rate limit')) return 'rate_limited' + return 'auth' + } + return 'other' +} + /** Context for retry with rate limit tracking */ interface RetryContext { requestId: string toolId: string + provider: string envVarName: string executionContext?: ExecutionContext /** @@ -413,8 +431,14 @@ async function executeWithRetry( maxRetries = 3, baseDelayMs = 1000 ): Promise { - const { requestId, toolId, envVarName, executionContext, reacquireAfterRetriesExhausted } = - context + const { + requestId, + toolId, + provider, + envVarName, + executionContext, + reacquireAfterRetriesExhausted, + } = context let lastError: unknown for (let attempt = 0; attempt <= maxRetries; attempt++) { @@ -445,6 +469,7 @@ async function executeWithRetry( PlatformEvents.hostedKeyUserThrottled({ toolId, reason: 'upstream_retries_exhausted', + provider, userId: executionContext?.userId, workspaceId: executionContext?.workspaceId, workflowId: executionContext?.workflowId, @@ -619,7 +644,8 @@ async function applyHostedKeyCostToResult( tool: ToolConfig, params: Record, executionContext: ExecutionContext | undefined, - requestId: string + requestId: string, + envVarName: string | undefined ): Promise { await reportCustomDimensionUsage(tool, params, finalResult.output, executionContext, requestId) @@ -630,6 +656,12 @@ async function applyHostedKeyCostToResult( executionContext, requestId ) + + const provider = tool.hosting?.byokProviderId || tool.id + const key = envVarName ?? 'unknown' + hostedKeyMetrics.recordUsed({ provider, tool: tool.id, key }) + hostedKeyMetrics.recordCostCharged(hostedKeyCost, { provider, tool: tool.id }) + if (hostedKeyCost > 0) { finalResult.output = { ...finalResult.output, @@ -887,6 +919,9 @@ export async function executeTool( const startTimeISO = startTime.toISOString() const requestId = generateRequestId() + // Hoisted so the outer catch can attribute a thrown failure to the chosen key. + let hostedKeyForMetrics: { provider: string; tool: string; key: string } | undefined + try { let tool: ToolConfig | undefined @@ -1008,6 +1043,14 @@ export async function executeTool( requestId ) + if (hostedKeyInfo.isUsingHostedKey) { + hostedKeyForMetrics = { + provider: tool.hosting?.byokProviderId || tool.id, + tool: tool.id, + key: hostedKeyInfo.envVarName ?? 'unknown', + } + } + // If we have a credential parameter, fetch the access token if (contextParams.oauthCredential) { contextParams.credential = contextParams.oauthCredential @@ -1152,8 +1195,11 @@ export async function executeTool( tool, contextParams, executionContext, - requestId + requestId, + hostedKeyInfo.envVarName ) + } else if (hostedKeyForMetrics) { + hostedKeyMetrics.recordFailed({ ...hostedKeyForMetrics, reason: 'other' }) } const strippedOutput = postProcessToolOutput(normalizedToolId, finalResult.output ?? {}) @@ -1177,16 +1223,20 @@ export async function executeTool( { requestId, toolId, + provider: tool.hosting?.byokProviderId || tool.id, envVarName: hostedKeyInfo.envVarName!, executionContext, reacquireAfterRetriesExhausted: async () => { - const reacquired = await reacquireHostedKey( + const reacquiredEnvVar = await reacquireHostedKey( tool, contextParams, executionContext, requestId ) - if (!reacquired) return null + if (!reacquiredEnvVar) return null + // Re-point metric labels at the freshly acquired key. + hostedKeyInfo.envVarName = reacquiredEnvVar + if (hostedKeyForMetrics) hostedKeyForMetrics.key = reacquiredEnvVar return () => executeToolRequest(toolId, tool, contextParams, effectiveSignal) }, } @@ -1220,8 +1270,11 @@ export async function executeTool( tool, contextParams, executionContext, - requestId + requestId, + hostedKeyInfo.envVarName ) + } else if (hostedKeyForMetrics) { + hostedKeyMetrics.recordFailed({ ...hostedKeyForMetrics, reason: 'other' }) } const strippedOutput = postProcessToolOutput(normalizedToolId, finalResult.output ?? {}) @@ -1241,6 +1294,13 @@ export async function executeTool( stack: error instanceof Error ? error.stack : undefined, }) + if (hostedKeyForMetrics) { + hostedKeyMetrics.recordFailed({ + ...hostedKeyForMetrics, + reason: classifyHostedKeyFailure(error), + }) + } + // Default error handling let errorMessage = 'Unknown error occurred' let errorDetails = {} diff --git a/bun.lock b/bun.lock index 562fdc1411d..eb65c533e99 100644 --- a/bun.lock +++ b/bun.lock @@ -124,6 +124,7 @@ "@opentelemetry/exporter-metrics-otlp-http": "^0.217.0", "@opentelemetry/exporter-trace-otlp-http": "^0.217.0", "@opentelemetry/resources": "^2.7.0", + "@opentelemetry/sdk-metrics": "^2.7.0", "@opentelemetry/sdk-node": "^0.217.0", "@opentelemetry/sdk-trace-base": "^2.7.0", "@opentelemetry/sdk-trace-node": "^2.7.0",