Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion src/pathways/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ export class PathwaysBuilder<
private readonly schemas: Record<keyof TPathway, AnyZodObject> = {} as Record<keyof TPathway, AnyZodObject>
private readonly inputSchemas: Record<keyof TPathway, AnyZodObject> = {} as Record<keyof TPathway, AnyZodObject>
private readonly writable: Record<keyof TPathway, boolean> = {} as Record<keyof TPathway, boolean>
private readonly subscribed: Record<keyof TPathway, boolean> = {} as Record<keyof TPathway, boolean>
private readonly timeouts: Record<keyof TPathway, number> = {} as Record<keyof TPathway, number>
private readonly maxRetries: Record<keyof TPathway, number> = {} as Record<keyof TPathway, number>
private readonly retryDelays: Record<keyof TPathway, number> = {} as Record<keyof TPathway, number>
Expand Down Expand Up @@ -819,6 +820,15 @@ export class PathwaysBuilder<
>(
contract: PathwayContract<F, E, S> & {
writable?: W
/**
* Whether the in-process pump should subscribe to and consume events for
* this pathway. Defaults to `true`. Set to `false` for analytical /
* write-only pathways: `.write()` still works, but the pump does not pull
* events from flowcore and `.handle()` will never fire. Useful when
* events are written for history/analytics but no in-service consumer
* exists, avoiding back-pressure on the shared pump queue.
*/
subscribe?: boolean
maxRetries?: number
retryDelayMs?: number
isFilePathway?: FP
Expand All @@ -833,12 +843,14 @@ export class PathwaysBuilder<
> {
const path = `${contract.flowType}/${contract.eventType}` as PathwayKey<F, E>
const writable = contract.writable ?? true
const subscribe = contract.subscribe ?? true

this.logger.debug(`Registering pathway`, {
pathway: path,
flowType: contract.flowType,
eventType: contract.eventType,
writable,
subscribe,
isFilePathway: contract.isFilePathway,
timeoutMs: contract.timeoutMs,
maxRetries: contract.maxRetries,
Expand Down Expand Up @@ -887,6 +899,7 @@ export class PathwaysBuilder<
this.inputSchemas[path] = contract.schema ?? z.object({})
}
this.writable[path] = writable
this.subscribed[path] = subscribe

// Store provisioning descriptions
if (contract.description !== undefined) {
Expand All @@ -901,6 +914,7 @@ export class PathwaysBuilder<
flowType: contract.flowType,
eventType: contract.eventType,
writable,
subscribe,
isFilePathway: contract.isFilePathway,
})

Expand Down Expand Up @@ -1450,7 +1464,7 @@ export class PathwaysBuilder<
return
}

const registrations = this.buildRegistrations()
const registrations = this.buildSubscribedRegistrations()
await this.pathwayPump.start(registrations)

this.logger.info("Pump started", {
Expand Down Expand Up @@ -1522,6 +1536,27 @@ export class PathwaysBuilder<
})
}

/**
* Returns the subset of registrations the in-process pump should subscribe to.
* Pathways registered with `subscribe: false` are excluded — they are
* write-only / analytical-only and the pump must not pull their events.
* Provisioning still uses {@link buildRegistrations} so the flow type and
* event type are created in flowcore for every registered pathway.
*/
private buildSubscribedRegistrations(): ProvisionerRegistration[] {
return Object.keys(this.pathways)
.filter((key) => this.subscribed[key as keyof TPathway] !== false)
.map((key) => {
const [flowType, eventType] = key.split("/")
return {
flowType,
eventType,
flowTypeDescription: this.flowTypeDescriptions.get(flowType),
eventTypeDescription: this.eventTypeDescriptions.get(key),
}
})
}

private async provisionSharedResources(
skipFlags: { skipDataCore?: boolean; skipFlowTypes?: boolean; skipEventTypes?: boolean } = {},
): Promise<ProvisionerRegistration[]> {
Expand Down
119 changes: 119 additions & 0 deletions tests/pathway-subscribe-flag.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { assertEquals } from "https://deno.land/std@0.224.0/assert/mod.ts"
import { z } from "npm:zod@^3.25.63"
import { PathwaysBuilder } from "../src/pathways/builder.ts"

const baseOpts = {
baseUrl: "https://api.flowcore.io",
tenant: "test-tenant",
dataCore: "test-dc",
apiKey: "fc_testid_testsecret",
}

const flowA = "flow-a.0"
const flowB = "flow-b.0"
const eventCreated = "thing.created.0"
const pathA = `${flowA}/${eventCreated}` as const
const pathB = `${flowB}/${eventCreated}` as const

type ProvisionerRegistration = {
flowType: string
eventType: string
flowTypeDescription?: string
eventTypeDescription?: string
}

type BuilderInternals = {
subscribed: Record<string, boolean>
writable: Record<string, boolean>
writers: Record<string, unknown>
buildRegistrations(): ProvisionerRegistration[]
buildSubscribedRegistrations(): ProvisionerRegistration[]
}

// deno-lint-ignore no-explicit-any
function inspect(builder: PathwaysBuilder<any>): BuilderInternals {
return builder as unknown as BuilderInternals
}

const schema = z.object({ id: z.string() })

Deno.test({
name: "PathwaysBuilder.register — subscribe flag",
sanitizeResources: false,
sanitizeOps: false,
fn: async (t) => {
await t.step("defaults subscribe to true when omitted", () => {
const builder = new PathwaysBuilder(baseOpts).register({
flowType: flowA,
eventType: eventCreated,
schema,
})

const internals = inspect(builder)
assertEquals(internals.subscribed[pathA], true)
assertEquals(internals.writable[pathA], true)
})

await t.step("records subscribe: false when explicitly opted out", () => {
const builder = new PathwaysBuilder(baseOpts).register({
flowType: flowA,
eventType: eventCreated,
schema,
subscribe: false,
})

const internals = inspect(builder)
assertEquals(internals.subscribed[pathA], false)
})

await t.step("subscribe: false still wires write capability", () => {
const builder = new PathwaysBuilder(baseOpts).register({
flowType: flowA,
eventType: eventCreated,
schema,
subscribe: false,
})

const internals = inspect(builder)
// writable defaults to true; .write() must remain available even when
// subscribe is false — the whole point of the flag.
assertEquals(internals.writable[pathA], true)
assertEquals(typeof internals.writers[pathA], "function")
})

await t.step("buildRegistrations includes write-only pathways (provisioning)", () => {
const builder = new PathwaysBuilder(baseOpts)
.register({ flowType: flowA, eventType: eventCreated, schema })
.register({ flowType: flowB, eventType: eventCreated, schema, subscribe: false })

const all = inspect(builder).buildRegistrations()
const flowTypes = all.map((r) => r.flowType).sort()
assertEquals(flowTypes, [flowA, flowB])
})

await t.step("buildSubscribedRegistrations excludes subscribe: false pathways", () => {
const builder = new PathwaysBuilder(baseOpts)
.register({ flowType: flowA, eventType: eventCreated, schema })
.register({ flowType: flowB, eventType: eventCreated, schema, subscribe: false })

const subscribed = inspect(builder).buildSubscribedRegistrations()
const flowTypes = subscribed.map((r) => r.flowType)
assertEquals(flowTypes, [flowA])
})

await t.step("buildSubscribedRegistrations matches buildRegistrations when no opt-out", () => {
const builder = new PathwaysBuilder(baseOpts)
.register({ flowType: flowA, eventType: eventCreated, schema })
.register({ flowType: flowB, eventType: eventCreated, schema })

const internals = inspect(builder)
const all = internals.buildRegistrations()
const subscribed = internals.buildSubscribedRegistrations()
assertEquals(subscribed.length, all.length)
assertEquals(
subscribed.map((r) => r.flowType).sort(),
all.map((r) => r.flowType).sort(),
)
})
},
})
Loading