From e3adaefc9e8ab6549b4c12036ec0f96095e8ecfd Mon Sep 17 00:00:00 2001 From: arpit2006 Date: Wed, 3 Jun 2026 21:46:24 +0530 Subject: [PATCH] fix(core): repair malformed runStep, restore tests and config --- packages/core/src/lib/config.ts | 2 - packages/core/src/lib/runtime.test.ts | 21 +- packages/core/src/lib/runtime.ts | 322 ++++++++++++-------------- 3 files changed, 152 insertions(+), 193 deletions(-) diff --git a/packages/core/src/lib/config.ts b/packages/core/src/lib/config.ts index 200d310..28629c6 100644 --- a/packages/core/src/lib/config.ts +++ b/packages/core/src/lib/config.ts @@ -30,9 +30,7 @@ const envSchema = z.object({ API_KEY: z.string().default('pulsestack-local-api-key'), TENANT_ID: z.string().default('local'), PLUGIN_DIR: z.string().default('./plugins'), -issue-30-auth-disabled-false AUTH_DISABLED: booleanEnv.default(true), - AUTH_DISABLED: z.coerce.boolean().default(true), OTEL_TRACING_ENABLED: z.coerce.boolean().default(false), OTEL_SERVICE_NAME: z.string().default(''), OTEL_TRACES_EXPORTER: z.enum(['none', 'console']).default('none'), diff --git a/packages/core/src/lib/runtime.test.ts b/packages/core/src/lib/runtime.test.ts index f169521..05731b1 100644 --- a/packages/core/src/lib/runtime.test.ts +++ b/packages/core/src/lib/runtime.test.ts @@ -1,6 +1,11 @@ - import { describe, expect, it } from 'vitest'; -import type { EventEnvelope, WorkflowDefinition } from '@pulsestack/contracts'; +import type { + EventEnvelope, + WorkflowDefinition, + ExecutionSnapshot, + TraceSpan, +} from '@pulsestack/contracts'; +import type { PulseInfra } from './infra.js'; import { WorkflowRuntime } from './runtime.js'; class RuntimeInfraMock { @@ -43,15 +48,8 @@ describe('WorkflowRuntime', () => { ); expect(tenantEvents.length).toBeGreaterThan(0); expect(tenantEvents.every((event) => event.tenantId === workflow.tenantId)).toBe(true); - -import type { - EventEnvelope, - ExecutionSnapshot, - TraceSpan, -} from '@pulsestack/contracts'; -import { describe, expect, it } from 'vitest'; -import type { PulseInfra } from './infra.js'; -import { WorkflowRuntime } from './runtime.js'; + }); +}); function createRuntimeHarness() { const events: EventEnvelope[] = []; @@ -198,6 +196,5 @@ describe('WorkflowRuntime retry handling', () => { 'Step fetch_logs failed after 2 attempts: Simulated failure for fetch_logs on attempt 2', }, }); - }); }); diff --git a/packages/core/src/lib/runtime.ts b/packages/core/src/lib/runtime.ts index e2bceda..01c41dc 100644 --- a/packages/core/src/lib/runtime.ts +++ b/packages/core/src/lib/runtime.ts @@ -45,7 +45,7 @@ export class WorkflowRuntime { private readonly infra: PulseInfra, private readonly source = 'pulse-runtime', private readonly options: RuntimeOptions = {}, - ) {} + ) { } async execute(requestInput: ExecutionRequest) { const request = executionRequestSchema.parse(requestInput); @@ -78,39 +78,6 @@ export class WorkflowRuntime { input: request.input, }); - const state: Record = { ...request.input }; - const results: StepResult[] = []; - - for (const [index, step] of request.workflow.steps.entries()) { - const span = await this.startSpan({ - traceId, - executionId, - workflowId: request.workflow.id, - tenantId: request.workflow.tenantId, - step, - state, - }); - - const result = await this.runStep(step, state, request.workflow.tenantId, request.workflow.correlationId); - Object.assign(state, { [step.id]: result.output }); - results.push(result); - - await this.snapshot({ - id: createId('snap'), - executionId, - workflowId: request.workflow.id, - sequence: index, - state: structuredClone(state), - sideEffects: [ - { - type: step.kind, - key: step.id, - response: result.output, - }, - ], - createdAt: new Date().toISOString(), - }); - await publishEvent( this.infra, createEvent({ @@ -128,160 +95,160 @@ export class WorkflowRuntime { }), ); - const state: Record = { ...request.input }; const retryState: Record = {}; const results: StepResult[] = []; - try { - for (const [index, step] of request.workflow.steps.entries()) { - await withRuntimeSpan( - `workflow.step.${step.kind}`, - { - attributes: { - 'pulsestack.execution.id': executionId, - 'pulsestack.workflow.id': request.workflow.id, - 'pulsestack.step.id': step.id, - 'pulsestack.step.name': step.name, - 'pulsestack.step.kind': step.kind, - 'pulsestack.step.index': index, - 'pulsestack.step.depends_on': step.dependsOn.join(','), - 'pulsestack.state.keys': Object.keys(state).join(','), - 'pulsestack.step.retry.max_attempts': normalizeRetryPolicy( - step.retry, - ).maxAttempts, + try { + for (const [index, step] of request.workflow.steps.entries()) { + await withRuntimeSpan( + `workflow.step.${step.kind}`, + { + attributes: { + 'pulsestack.execution.id': executionId, + 'pulsestack.workflow.id': request.workflow.id, + 'pulsestack.step.id': step.id, + 'pulsestack.step.name': step.name, + 'pulsestack.step.kind': step.kind, + 'pulsestack.step.index': index, + 'pulsestack.step.depends_on': step.dependsOn.join(','), + 'pulsestack.state.keys': Object.keys(state).join(','), + 'pulsestack.step.retry.max_attempts': normalizeRetryPolicy( + step.retry, + ).maxAttempts, + }, }, - }, - async (otelStepSpan) => { - const span = await this.startSpan({ - traceId, - executionId, - workflowId: request.workflow.id, - parentSpanId: workflowSpanId, - step, - state, - }); - - let result: StepResult; - try { - result = await this.runStepWithRetry({ - step, - state, + async (otelStepSpan) => { + const span = await this.startSpan({ traceId, executionId, workflowId: request.workflow.id, tenantId: request.workflow.tenantId, - correlationId: request.workflow.correlationId, - spanId: span.spanId, + parentSpanId: workflowSpanId, + step, + state, }); - } catch (error) { - if (error instanceof StepRetryExhaustedError) { - retryState[error.stepId] = error.retry; - Object.assign(state, { __retry: retryState }); - otelStepSpan.setAttributes({ - 'pulsestack.step.retry.exhausted': true, - 'pulsestack.step.retry.max_attempts': - error.retry.maxAttempts, + + let result: StepResult; + try { + result = await this.runStepWithRetry({ + step, + state, + traceId, + executionId, + workflowId: request.workflow.id, + tenantId: request.workflow.tenantId, + correlationId: request.workflow.correlationId, + spanId: span.spanId, }); + } catch (error) { + if (error instanceof StepRetryExhaustedError) { + retryState[error.stepId] = error.retry; + Object.assign(state, { __retry: retryState }); + otelStepSpan.setAttributes({ + 'pulsestack.step.retry.exhausted': true, + 'pulsestack.step.retry.max_attempts': + error.retry.maxAttempts, + }); + } + await this.failSpan(span, error); + throw error; } - await this.failSpan(span, error); - throw error; - } - - retryState[step.id] = result.retry; - otelStepSpan.setAttributes({ - 'pulsestack.step.cost_usd': result.costUsd, - 'pulsestack.step.tokens': result.tokens, - 'pulsestack.step.attempts': result.attempts, - 'pulsestack.step.retry.exhausted': result.retry.exhausted, - }); - Object.assign(state, { - [step.id]: result.output, - __retry: retryState, - }); - results.push(result); - - await this.snapshot({ - id: createId('snap'), - executionId, - workflowId: request.workflow.id, - sequence: index, - state: structuredClone(state), - sideEffects: [ - { - type: step.kind, - key: step.id, - response: result.output, - }, - ], - createdAt: new Date().toISOString(), - }); - - await this.finishSpan(span, result); - }, + + retryState[step.id] = result.retry; + otelStepSpan.setAttributes({ + 'pulsestack.step.cost_usd': result.costUsd, + 'pulsestack.step.tokens': result.tokens, + 'pulsestack.step.attempts': result.attempts, + 'pulsestack.step.retry.exhausted': result.retry.exhausted, + }); + Object.assign(state, { + [step.id]: result.output, + __retry: retryState, + }); + results.push(result); + + await this.snapshot({ + id: createId('snap'), + executionId, + workflowId: request.workflow.id, + sequence: index, + state: structuredClone(state), + sideEffects: [ + { + type: step.kind, + key: step.id, + response: result.output, + }, + ], + createdAt: new Date().toISOString(), + }); + + await this.finishSpan(span, result); + }, + ); + } + } catch (error) { + const message = + error instanceof Error ? error.message : 'Workflow step failed'; + const failureOutput = { + steps: results, + finalState: state, + error: message, + }; + workflowSpan.setAttributes({ + 'pulsestack.workflow.failed': true, + 'pulsestack.workflow.error': message, + }); + await this.infra.completeExecution( + executionId, + 'failed', + failureOutput, + ); + await publishEvent( + this.infra, + createEvent({ + type: 'workflow.failed', + source: this.source, + tenantId: request.workflow.tenantId, + correlationId: request.workflow.correlationId, + workflowId: request.workflow.id, + executionId, + spanId: workflowSpanId, + payload: failureOutput, + }), ); + throw error; } - } catch (error) { - const message = - error instanceof Error ? error.message : 'Workflow step failed'; - const failureOutput = { + + const output = { steps: results, + totalCostUsd: results.reduce((sum, item) => sum + item.costUsd, 0), + totalTokens: results.reduce((sum, item) => sum + item.tokens, 0), finalState: state, - error: message, }; + workflowSpan.setAttributes({ - 'pulsestack.workflow.failed': true, - 'pulsestack.workflow.error': message, + 'pulsestack.workflow.total_cost_usd': output.totalCostUsd, + 'pulsestack.workflow.total_tokens': output.totalTokens, }); - await this.infra.completeExecution( - executionId, - 'failed', - failureOutput, - ); + await this.infra.completeExecution(executionId, 'completed', output); await publishEvent( this.infra, createEvent({ - type: 'workflow.failed', + type: 'workflow.completed', source: this.source, tenantId: request.workflow.tenantId, correlationId: request.workflow.correlationId, workflowId: request.workflow.id, executionId, spanId: workflowSpanId, - payload: failureOutput, + payload: output, }), ); - throw error; - } - - const output = { - steps: results, - totalCostUsd: results.reduce((sum, item) => sum + item.costUsd, 0), - totalTokens: results.reduce((sum, item) => sum + item.tokens, 0), - finalState: state, - }; - workflowSpan.setAttributes({ - 'pulsestack.workflow.total_cost_usd': output.totalCostUsd, - 'pulsestack.workflow.total_tokens': output.totalTokens, - }); - await this.infra.completeExecution(executionId, 'completed', output); - await publishEvent( - this.infra, - createEvent({ - type: 'workflow.completed', - source: this.source, - tenantId: request.workflow.tenantId, - correlationId: request.workflow.correlationId, - workflowId: request.workflow.id, - executionId, - spanId: workflowSpanId, - payload: output, - }), - ); - - return { executionId, traceId, output }; - }, + return { executionId, traceId, output }; + }, ); } @@ -303,6 +270,7 @@ export class WorkflowRuntime { const result = await this.runStep( args.step, args.state, + args.tenantId, args.correlationId, attempt, ); @@ -365,18 +333,14 @@ export class WorkflowRuntime { private async runStep( step: WorkflowStep, state: Record, - tenantId: string, correlationId: string, - ): Promise { - - correlationId: string, - attempt: number, + attempt = 1, ): Promise> { - const timestamp = new Date().toISOString(); - const plannedFailures = Number(step.input.failAttempts ?? 0); - if (Number.isFinite(plannedFailures) && attempt <= plannedFailures) { + const timestamp = new Date().toISOString(); + const plannedFailures = Number(step.input.failAttempts ?? 0); + if(Number.isFinite(plannedFailures) && attempt <= plannedFailures) { throw new Error(`Simulated failure for ${step.id} on attempt ${attempt}`); } if (step.kind === 'tool') { @@ -421,23 +385,23 @@ export class WorkflowRuntime { const output = step.kind === 'llm' ? { - ...base, - text: `synthetic completion for ${step.name}`, - tokens: 350 + step.name.length, - } + ...base, + text: `synthetic completion for ${step.name}`, + tokens: 350 + step.name.length, + } : step.kind === 'tool' ? { - ...base, - status: 'ok', - result: { - echoed: step.input, - checksum: `${step.id}:${Object.keys(state).length}`, - }, - } + ...base, + status: 'ok', + result: { + echoed: step.input, + checksum: `${step.id}:${Object.keys(state).length}`, + }, + } : { - ...base, - status: 'processed', - }; + ...base, + status: 'processed', + }; const tokens = step.kind === 'llm' && 'tokens' in output ? Number(output.tokens) : 0;