diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index bcab6d9..0a96975 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -384,6 +384,7 @@ export class PathwaysBuilder< private readonly schemas: Record = {} as Record private readonly inputSchemas: Record = {} as Record private readonly writable: Record = {} as Record + private readonly subscribed: Record = {} as Record private readonly timeouts: Record = {} as Record private readonly maxRetries: Record = {} as Record private readonly retryDelays: Record = {} as Record @@ -819,6 +820,15 @@ export class PathwaysBuilder< >( contract: PathwayContract & { writable?: W + /** + * Whether the in-process pump should subscribe to and consume events for + * this pathway. Defaults to `true`. Set to `false` for analytical / + * write-only pathways: `.write()` still works, but the pump does not pull + * events from flowcore and `.handle()` will never fire. Useful when + * events are written for history/analytics but no in-service consumer + * exists, avoiding back-pressure on the shared pump queue. + */ + subscribe?: boolean maxRetries?: number retryDelayMs?: number isFilePathway?: FP @@ -833,12 +843,14 @@ export class PathwaysBuilder< > { const path = `${contract.flowType}/${contract.eventType}` as PathwayKey const writable = contract.writable ?? true + const subscribe = contract.subscribe ?? true this.logger.debug(`Registering pathway`, { pathway: path, flowType: contract.flowType, eventType: contract.eventType, writable, + subscribe, isFilePathway: contract.isFilePathway, timeoutMs: contract.timeoutMs, maxRetries: contract.maxRetries, @@ -887,6 +899,7 @@ export class PathwaysBuilder< this.inputSchemas[path] = contract.schema ?? z.object({}) } this.writable[path] = writable + this.subscribed[path] = subscribe // Store provisioning descriptions if (contract.description !== undefined) { @@ -901,6 +914,7 @@ export class PathwaysBuilder< flowType: contract.flowType, eventType: contract.eventType, writable, + subscribe, isFilePathway: contract.isFilePathway, }) @@ -1450,7 +1464,7 @@ export class PathwaysBuilder< return } - const registrations = this.buildRegistrations() + const registrations = this.buildSubscribedRegistrations() await this.pathwayPump.start(registrations) this.logger.info("Pump started", { @@ -1522,6 +1536,27 @@ export class PathwaysBuilder< }) } + /** + * Returns the subset of registrations the in-process pump should subscribe to. + * Pathways registered with `subscribe: false` are excluded — they are + * write-only / analytical-only and the pump must not pull their events. + * Provisioning still uses {@link buildRegistrations} so the flow type and + * event type are created in flowcore for every registered pathway. + */ + private buildSubscribedRegistrations(): ProvisionerRegistration[] { + return Object.keys(this.pathways) + .filter((key) => this.subscribed[key as keyof TPathway] !== false) + .map((key) => { + const [flowType, eventType] = key.split("/") + return { + flowType, + eventType, + flowTypeDescription: this.flowTypeDescriptions.get(flowType), + eventTypeDescription: this.eventTypeDescriptions.get(key), + } + }) + } + private async provisionSharedResources( skipFlags: { skipDataCore?: boolean; skipFlowTypes?: boolean; skipEventTypes?: boolean } = {}, ): Promise { diff --git a/tests/pathway-subscribe-flag.test.ts b/tests/pathway-subscribe-flag.test.ts new file mode 100644 index 0000000..3078531 --- /dev/null +++ b/tests/pathway-subscribe-flag.test.ts @@ -0,0 +1,119 @@ +import { assertEquals } from "https://deno.land/std@0.224.0/assert/mod.ts" +import { z } from "npm:zod@^3.25.63" +import { PathwaysBuilder } from "../src/pathways/builder.ts" + +const baseOpts = { + baseUrl: "https://api.flowcore.io", + tenant: "test-tenant", + dataCore: "test-dc", + apiKey: "fc_testid_testsecret", +} + +const flowA = "flow-a.0" +const flowB = "flow-b.0" +const eventCreated = "thing.created.0" +const pathA = `${flowA}/${eventCreated}` as const +const pathB = `${flowB}/${eventCreated}` as const + +type ProvisionerRegistration = { + flowType: string + eventType: string + flowTypeDescription?: string + eventTypeDescription?: string +} + +type BuilderInternals = { + subscribed: Record + writable: Record + writers: Record + buildRegistrations(): ProvisionerRegistration[] + buildSubscribedRegistrations(): ProvisionerRegistration[] +} + +// deno-lint-ignore no-explicit-any +function inspect(builder: PathwaysBuilder): BuilderInternals { + return builder as unknown as BuilderInternals +} + +const schema = z.object({ id: z.string() }) + +Deno.test({ + name: "PathwaysBuilder.register — subscribe flag", + sanitizeResources: false, + sanitizeOps: false, + fn: async (t) => { + await t.step("defaults subscribe to true when omitted", () => { + const builder = new PathwaysBuilder(baseOpts).register({ + flowType: flowA, + eventType: eventCreated, + schema, + }) + + const internals = inspect(builder) + assertEquals(internals.subscribed[pathA], true) + assertEquals(internals.writable[pathA], true) + }) + + await t.step("records subscribe: false when explicitly opted out", () => { + const builder = new PathwaysBuilder(baseOpts).register({ + flowType: flowA, + eventType: eventCreated, + schema, + subscribe: false, + }) + + const internals = inspect(builder) + assertEquals(internals.subscribed[pathA], false) + }) + + await t.step("subscribe: false still wires write capability", () => { + const builder = new PathwaysBuilder(baseOpts).register({ + flowType: flowA, + eventType: eventCreated, + schema, + subscribe: false, + }) + + const internals = inspect(builder) + // writable defaults to true; .write() must remain available even when + // subscribe is false — the whole point of the flag. + assertEquals(internals.writable[pathA], true) + assertEquals(typeof internals.writers[pathA], "function") + }) + + await t.step("buildRegistrations includes write-only pathways (provisioning)", () => { + const builder = new PathwaysBuilder(baseOpts) + .register({ flowType: flowA, eventType: eventCreated, schema }) + .register({ flowType: flowB, eventType: eventCreated, schema, subscribe: false }) + + const all = inspect(builder).buildRegistrations() + const flowTypes = all.map((r) => r.flowType).sort() + assertEquals(flowTypes, [flowA, flowB]) + }) + + await t.step("buildSubscribedRegistrations excludes subscribe: false pathways", () => { + const builder = new PathwaysBuilder(baseOpts) + .register({ flowType: flowA, eventType: eventCreated, schema }) + .register({ flowType: flowB, eventType: eventCreated, schema, subscribe: false }) + + const subscribed = inspect(builder).buildSubscribedRegistrations() + const flowTypes = subscribed.map((r) => r.flowType) + assertEquals(flowTypes, [flowA]) + }) + + await t.step("buildSubscribedRegistrations matches buildRegistrations when no opt-out", () => { + const builder = new PathwaysBuilder(baseOpts) + .register({ flowType: flowA, eventType: eventCreated, schema }) + .register({ flowType: flowB, eventType: eventCreated, schema }) + + const internals = inspect(builder) + const all = internals.buildRegistrations() + const subscribed = internals.buildSubscribedRegistrations() + assertEquals(subscribed.length, all.length) + assertEquals( + subscribed.map((r) => r.flowType).sort(), + all.map((r) => r.flowType).sort(), + ) + }) + }, +})