diff --git a/.gitignore b/.gitignore index ec4e0b2..08d9321 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ docs/build/ /docs/build-docs.js scripts/* util-lambdas/* +.claude/settings.local.json diff --git a/lib/mock-sdk.md b/lib/mock-sdk.md new file mode 100644 index 0000000..4c513d0 --- /dev/null +++ b/lib/mock-sdk.md @@ -0,0 +1,598 @@ +# RStreams SDK Mock (`mock-sdk`) + +A framework-agnostic mock for the RStreams SDK designed for unit testing. Works with Jest, Mocha, Vitest, or any test runner. + +## How It Works with Bot Handlers + +RStreams bots are typically deployed using a wrapper (e.g., `wrappers/cron.js`) that handles the Lambda lifecycle: locking, invoking your handler, checkpointing, and reporting completion. That wrapper creates its own internal SDK instance which cannot be swapped out. + +**This mock is designed to test your bot handler function directly, bypassing the wrapper.** Don't test the wrapper — it's framework plumbing. Test your `botHandler` function, which is where your business logic lives. + +The only thing you need is a way to get the mock SDK into your handler. Two options: + +### Option A: The `mockable()` Pattern + +If your codebase has a `mockable()` utility, this is the cleanest approach. The handler keeps its normal `(settings, context, callback)` signature and gets the SDK through a swappable getter: + +```typescript +// handler.ts +import { RStreamsSdk, RSFQueueBotInvocationEvent } from "leo-sdk"; +import { Callback, Context } from "aws-lambda"; +import { mockable } from "../../lib/mockable"; + +const leoInstance = new RStreamsSdk(); + +export const getLeo = mockable(_getLeo); +function _getLeo() { return leoInstance; } + +export const botHandler = async ( + settings: RSFQueueBotInvocationEvent, + context: Context, + callback: Callback +) => { + const leo = getLeo(); + + leo.enrich( + { + id: settings.botId, + inQueue: settings.queue, + outQueue: "enriched-orders", + transform(payload, event, done) { + done(null, { ...payload, processed: true }); + }, + }, + (err) => { + if (err) return callback(err); + callback(); + } + ); +}; + +export const handler = require("leo-sdk/wrappers/cron")(botHandler); +``` + +```typescript +// handler.test.ts +import { RStreamsSdk } from "leo-sdk"; +import { mockRStreamsSdk, createContext, createBotInvocationEvent } from "leo-sdk/lib/mock-sdk"; +import { getLeo, botHandler } from "../handler"; + +describe("my bot", () => { + let sdk; + + beforeEach(() => { + sdk = mockRStreamsSdk(new RStreamsSdk()); + getLeo.override(() => sdk); + }); + + afterEach(() => { + getLeo.clear(); + sdk.mock.reset(); + }); + + it("enriches orders", (done) => { + sdk.mock.queues["raw-orders"] = [ + { orderId: "A", subtotal: 100 }, + ]; + + const event = createBotInvocationEvent("test-bot", { + queue: "raw-orders", + botNumber: 0, + }); + + botHandler(event, createContext(), (err) => { + assert.isNull(err); + assert.equal(sdk.mock.written["enriched-orders"].payloads.length, 1); + done(); + }); + }); +}); +``` + +### Option B: Pass the SDK as the First Argument + +If you don't have `mockable()`, put the SDK as the first parameter of your handler and give the cron wrapper a thin shim: + +```typescript +// handler.ts +const leoInstance = new RStreamsSdk(); + +export async function botHandler( + leo: RStreamsSdk, + settings: RSFQueueBotInvocationEvent, + context: Context, +) { + await leo.enrichEvents({ ... }); +} + +// Thin shim — the wrapper sees a normal 2-arg async handler +export const handler = require("leo-sdk/wrappers/cron")( + async (event, context) => { + return botHandler(leoInstance, event, context); + } +); +``` + +```typescript +// handler.test.ts +it("enriches orders", async () => { + sdk.mock.queues["raw-orders"] = [{ orderId: "A" }]; + + const event = createBotInvocationEvent("test-bot", { + queue: "raw-orders", + botNumber: 0, + }); + + await botHandler(sdk, event, createContext()); + + assert.equal(sdk.mock.written["enriched-orders"].payloads.length, 1); +}); +``` + +The shim is what the cron wrapper sees, so its `handler.length` check works normally. Your testable `botHandler` has a clean signature that takes the SDK explicitly. + +**Key principle:** Don't test the wrapper. Test your handler. Make the SDK reachable via either a `mockable()` getter (preferred) or by accepting it as the first parameter with a shim for the wrapper. + +--- + +## Quick Start + +```typescript +import { RStreamsSdk } from "leo-sdk"; +import { mockRStreamsSdk } from "leo-sdk/lib/mock-sdk"; + +const sdk = mockRStreamsSdk(new RStreamsSdk()); + +sdk.mock.queues["my-input-queue"] = [ + { orderId: "123", amount: 10 }, + { orderId: "456", amount: 20 }, +]; + +// Call your handler with the mock SDK... +// Then assert on what was written, checkpointed, etc. +``` + +## API Reference + +### `mockRStreamsSdk(sdk: RStreamsSdk, opts?): MockRStreamsSdk` + +Takes an `RStreamsSdk` instance and returns a new object that extends it with a `.mock` control surface. The original SDK is **not** mutated. + +All real streaming utilities (`through`, `throughAsync`, `pipeline`, `pipeAsync`, `devnull`, `parse`, `stringify`, `batch`, `eventstream`, etc.) are preserved and work normally. Only bus-connected operations are intercepted. + +**Options:** + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `disableS3` | `boolean` | `true` | Strip `useS3` from all write options passed to `enrich`, `offload`, `load`, and `toLeo`. Prevents any code path from attempting real S3 operations during tests. Set to `false` if you explicitly want to test S3-related code paths. | + +### `mock.queues` + +**Type:** `Record` + +Configure what events each queue returns when `sdk.read()` / `sdk.streams.fromLeo()` is called. + +Values can be: +- **Raw payloads** — auto-wrapped into `ReadEvent` objects with generated `eid`, `event`, `id`, and `timestamp` fields +- **Full `ReadEvent` objects** — passed through as-is (detected by having both `eid` and `payload` properties) + +```typescript +// Raw payloads (simplest) +sdk.mock.queues["orders"] = [ + { orderId: "A", total: 100 }, + { orderId: "B", total: 200 }, +]; + +// Full ReadEvent objects (when you need control over eid, timestamps, etc.) +sdk.mock.queues["orders"] = [ + { + id: "my-bot", + event: "orders", + eid: "z/2025/01/15/00/00/1705276800000-0000000", + timestamp: 1705276800000, + event_source_timestamp: 1705276800000, + payload: { orderId: "A", total: 100 }, + }, +]; +``` + +Queues not configured in `mock.queues` return an empty stream (zero events). + +### `mock.written` + +**Type:** `Record` + +Access events captured by `sdk.write()` / `sdk.streams.toLeo()` / `sdk.streams.load()` / `sdk.put()` / `sdk.putEvent()` / `sdk.putEvents()`. + +Auto-populated as events flow through write streams. Key is the queue name. + +```typescript +interface QueueWriteCapture { + events: Event[]; // Full event objects + payloads: T[]; // Just the payloads (convenience) +} +``` + +```typescript +// After running your pipeline... +const written = sdk.mock.written["output-queue"]; +assert.equal(written.events.length, 2); +assert.deepEqual(written.payloads, [ + { orderId: "A", enriched: true }, + { orderId: "B", enriched: true }, +]); +``` + +### `mock.checkpoints` + +**Type:** `MockCheckpointControl` + +```typescript +interface MockCheckpointControl { + calls: SpyCall[]; // bot.checkpoint() calls + toCheckpointCalls: SpyCall[]; // streams.toCheckpoint() creation calls +} +``` + +`toCheckpoint()` returns a real `devnull` stream so pipelines work normally, but the call is recorded. + +### `mock.bot` + +**Type:** `MockBotControl` + +Every bot/cron method is replaced with a spy that records calls and immediately invokes callbacks with success. + +```typescript +interface MockBotControl { + checkLock: SpyFn; + reportComplete: SpyFn; + createLock: SpyFn; + removeLock: SpyFn; + checkpoint: SpyFn; + getCheckpoint: SpyFn & { returnValue: Record }; +} +``` + +**Configuring `getCheckpoint` return values:** + +```typescript +sdk.mock.bot.getCheckpoint.returnValue["my-queue"] = { + checkpoint: "z/2025/01/15/00/00/1705276800000", + records: 500, +}; + +const cp = await sdk.bot.getCheckpoint("my-bot", "my-queue"); +// cp === { checkpoint: "z/2025/01/15/00/00/1705276800000", records: 500 } +``` + +### `mock.stats` + +**Type:** `Partial | null` + +Override the stats returned by the read stream's `.get()` method and the stats stream. + +```typescript +sdk.mock.stats = { + checkpoint: "z/2025/01/01/00/00/custom", + records: 999, + source_timestamp: 1700000000000, +}; +``` + +Set to `null` to use auto-computed stats (the default). + +### `mock.reset()` + +Clears all state: queue configurations, captured writes, call records, stats override, and all spy histories. + +Call this in your test `beforeEach` / `afterEach` to ensure test isolation. + +### `mock.readSpy` / `mock.writeSpy` / `mock.loadSpy` + +**Type:** `SpyFn` + +Spies on `fromLeo` / `toLeo` / `load` calls. Useful to assert which bot ID, queue name, or config was used. + +```typescript +sdk.streams.fromLeo("my-bot", "my-queue", { limit: 100 }); + +assert.equal(sdk.mock.readSpy.callCount, 1); +assert.equal(sdk.mock.readSpy.lastCall.args[0], "my-bot"); +assert.equal(sdk.mock.readSpy.lastCall.args[1], "my-queue"); +assert.deepEqual(sdk.mock.readSpy.lastCall.args[2], { limit: 100 }); +``` + +### `mock.s3` + +**Type:** `MockS3Control` + +Mocks for `streams.toS3` and `streams.fromS3`. + +```typescript +interface MockS3Control { + written: Record; // Captured writes, keyed by "Bucket/File" + files: Record; // Configure data for fromS3, keyed by "bucket/key" + toS3Spy: SpyFn; + fromS3Spy: SpyFn; +} +``` + +**Writing to S3 (capture):** + +`streams.toS3(Bucket, File)` returns a real writable stream that captures data to `mock.s3.written["Bucket/File"]`. + +**Reading from S3 (configure):** + +```typescript +sdk.mock.s3.files["my-bucket/path/to/file.json"] = '{"hello":"world"}'; +const stream = sdk.streams.fromS3({ bucket: "my-bucket", key: "path/to/file.json" }); +``` + +If the key is not configured, `fromS3` throws a `NoSuchKey` error (matching real S3 behavior). + +### `createContext(config?)` + +```typescript +import { createContext } from "leo-sdk/lib/mock-sdk"; +``` + +Creates a fake Lambda `Context` object with a working `getRemainingTimeInMillis()`. Useful when calling bot handlers directly in tests. + +```typescript +const context = createContext({ Timeout: 60 }); // 60 seconds +context.awsRequestId; // "requestid-mock-1234567890" +context.getRemainingTimeInMillis(); // ~60000 (counts down from creation time) +``` + +`Timeout` is in seconds (default 300). + +### `createBotInvocationEvent(botId, settings?)` + +```typescript +import { createBotInvocationEvent } from "leo-sdk/lib/mock-sdk"; +``` + +Creates a fake `BotInvocationEvent` with `__cron` pre-configured for testing (`force: true`, `ignoreLock: true`). + +```typescript +const event = createBotInvocationEvent("my-bot", { + queue: "input-queue", + botNumber: 0, +}); +// event.botId === "my-bot" +// event.__cron.id === "my-bot" +// event.__cron.force === true +// event.queue === "input-queue" +``` + +### `SpyFn` Interface + +Every spy exposes: + +| Property | Type | Description | +|----------|------|-------------| +| `calls` | `SpyCall[]` | All recorded calls with `{ args, timestamp }` | +| `callCount` | `number` | Number of times called | +| `called` | `boolean` | Whether called at least once | +| `lastCall` | `SpyCall \| undefined` | Most recent call | +| `reset()` | `() => void` | Clear this spy's call history | + +--- + +## Recipes + +### Testing an Enrich Bot (Callback Style) + +This matches the most common real-world pattern: `leo.enrich(opts, callback)`. + +```typescript +// handler.ts +import { RStreamsSdk, RSFQueueBotInvocationEvent } from "leo-sdk"; +import { Callback, Context } from "aws-lambda"; + +const leoInstance = new RStreamsSdk(); +export const getLeo = () => leoInstance; // make swappable + +export const botHandler = async ( + settings: RSFQueueBotInvocationEvent, + context: Context, + callback: Callback +) => { + const leo = getLeo(); + + leo.enrich( + { + id: settings.botId, + inQueue: settings.queue, + outQueue: "enriched-items", + useS3: true, + config: { + stopTime: Date.now() + context.getRemainingTimeInMillis() * 0.8, + fast_s3_read: true, + }, + transform(payload, event, done) { + if (!payload.active) return done(); // filter out + done(null, { ...payload, enriched: true }); + }, + }, + (err) => { + if (err) return callback(err); + callback(); + } + ); +}; + +export const handler = require("leo-sdk/wrappers/cron")(botHandler); +``` + +```typescript +// handler.test.ts +import { mockRStreamsSdk, createContext, createBotInvocationEvent } from "leo-sdk/lib/mock-sdk"; +import * as mod from "../handler"; + +describe("item enrichment", () => { + let sdk; + + beforeEach(() => { + sdk = mockRStreamsSdk(new RStreamsSdk()); + (mod as any).getLeo = () => sdk; + }); + + it("filters inactive items and enriches active ones", (done) => { + sdk.mock.queues["raw-items"] = [ + { item_id: 1, active: true, name: "Widget" }, + { item_id: 2, active: false, name: "Discontinued" }, + { item_id: 3, active: true, name: "Gadget" }, + ]; + + const event = createBotInvocationEvent("test", { + queue: "raw-items", + botNumber: 0, + }); + + mod.botHandler(event, createContext(), (err) => { + assert.isNull(err); + const output = sdk.mock.written["enriched-items"]; + assert.equal(output.payloads.length, 2); + assert.isTrue(output.payloads[0].enriched); + assert.equal(output.payloads[0].name, "Widget"); + done(); + }); + }); +}); +``` + +### Testing an Offload Bot + +```typescript +// offload-handler.ts +export const botHandler = async (settings, context, callback) => { + const leo = getLeo(); + const db = await getDatabase(); + + leo.offload( + { + id: settings.botId, + inQueue: settings.queue, + transform(payload, event, done) { + db.upsert("items", payload).then(() => done()).catch(done); + }, + }, + callback + ); +}; +``` + +```typescript +// offload-handler.test.ts +it("offloads all events from the queue", (done) => { + sdk.mock.queues["items-to-sync"] = [ + { item_id: 1, name: "Widget" }, + { item_id: 2, name: "Gadget" }, + ]; + + const event = createBotInvocationEvent("sync-bot", { + queue: "items-to-sync", + botNumber: 0, + }); + + mod.botHandler(event, createContext(), (err) => { + assert.isNull(err); + // For offload bots, assert on side effects (DB calls, etc.) + // The mock ensures no real bus reads happened + assert.equal(sdk.mock.readSpy.callCount, 1); + done(); + } + ); +}); +``` + +### Testing put / putEvent / putEvents + +```typescript +it("writes a single event", async () => { + await sdk.putEvent("my-bot", "notifications", { + userId: "u1", + message: "Hello!", + }); + + assert.equal(sdk.mock.written["notifications"].payloads.length, 1); + assert.equal(sdk.mock.written["notifications"].payloads[0].message, "Hello!"); +}); + +it("writes multiple events", async () => { + await sdk.putEvents( + [{ data: "a" }, { data: "b" }, { data: "c" }], + { botId: "batch-bot", queue: "batch-queue" } + ); + + assert.equal(sdk.mock.written["batch-queue"].payloads.length, 3); +}); +``` + +### Testing a Custom Pipeline + +```typescript +it("transforms and writes via pipeline", async () => { + sdk.mock.queues["input"] = [ + { name: "alice" }, + { name: "bob" }, + ]; + + await sdk.streams.pipeAsync( + sdk.read("my-bot", "input"), + sdk.streams.through((event, done) => { + done(null, { + id: "my-bot", + event: "output", + payload: { greeting: `Hello, ${event.payload.name}!` }, + }); + }), + sdk.write("my-bot"), + sdk.streams.devnull() + ); + + assert.deepEqual(sdk.mock.written["output"].payloads, [ + { greeting: "Hello, alice!" }, + { greeting: "Hello, bob!" }, + ]); +}); +``` + +### Asserting Checkpoint Behavior + +```typescript +it("checkpoints after processing", async () => { + sdk.mock.queues["q"] = [{ x: 1 }]; + + const readStream = sdk.read("bot", "q"); + // ... process the stream ... + + readStream.checkpoint(() => {}); + assert.equal(readStream.checkpointSpy.callCount, 1); +}); +``` + +### Configuring getCheckpoint Return Values + +```typescript +it("resumes from last checkpoint", async () => { + sdk.mock.bot.getCheckpoint.returnValue["my-queue"] = { + checkpoint: "z/2025/01/15/00/00/1705276800000", + records: 1000, + }; + + const cp = await sdk.bot.getCheckpoint("my-bot", "my-queue"); + assert.equal(cp.records, 1000); +}); +``` + +--- + +## Architecture Notes + +- **No framework dependencies**: The spy system is built from scratch — no sinon, jest, or other test library required. Works identically with any test runner. +- **Real streams**: Read streams are real Node.js `Readable` streams in object mode. Write streams use real `Transform` streams (`streams.through`). Backpressure works normally. +- **Non-mutating**: `mockRStreamsSdk` creates a new object via `Object.create(sdk)`. The original SDK instance is untouched — safe for parallel test execution. +- **Prototype chain**: The mock SDK inherits from the original via prototype, so any property/method not explicitly overridden falls through to the real SDK. +- **Wrapper-agnostic**: This mock tests your handler logic directly. The cron/resource wrappers (`wrappers/cron.js`, etc.) manage Lambda lifecycle (locking, reporting) using their own internal SDK instance. Make your SDK accessor swappable (a getter function or `mockable()` wrapper) so tests can inject the mock while the handler keeps its normal `(settings, context, callback)` signature. diff --git a/lib/mock-sdk.ts b/lib/mock-sdk.ts new file mode 100644 index 0000000..6b7c9eb --- /dev/null +++ b/lib/mock-sdk.ts @@ -0,0 +1,855 @@ +import { Readable } from "stream"; +import stream from "stream"; +import { RStreamsSdk, ReadEvent, Event, Checkpoint, ReadableQueueStream, WritableStream, TransformStream, BotInvocationEvent } from "../index"; +import { Callback, StreamUtil, CheckpointData, WriteOptions, ReadOptions, ToCheckpointOptions, StatsStream } from "./lib"; +import { LeoCron } from "./cron"; +import { promisify } from "util"; + +// ─── Spy infrastructure (zero external dependencies) ─────────────────────── + +export interface SpyCall { + args: any[]; + timestamp: number; +} + +export interface SpyFn any = (...args: any[]) => any> { + /** The callable spy function */ + (...args: Parameters): ReturnType; + /** All recorded calls */ + calls: SpyCall[]; + /** Number of times called */ + callCount: number; + /** Whether the spy was called at least once */ + called: boolean; + /** Arguments from the last call, or undefined */ + lastCall: SpyCall | undefined; + /** Reset this spy's call history */ + reset(): void; +} + +function createSpy any>( + impl?: (...args: any[]) => any +): SpyFn { + const calls: SpyCall[] = []; + + const spy = function (...args: any[]) { + const call: SpyCall = { args: [...args], timestamp: Date.now() }; + calls.push(call); + if (impl) { + return impl(...args); + } + // Default: invoke last arg if it's a callback + const lastArg = args[args.length - 1]; + if (typeof lastArg === "function") { + lastArg(null); + } + } as SpyFn; + + spy.calls = calls; + Object.defineProperty(spy, "callCount", { get: () => calls.length }); + Object.defineProperty(spy, "called", { get: () => calls.length > 0 }); + Object.defineProperty(spy, "lastCall", { get: () => calls.length > 0 ? calls[calls.length - 1] : undefined }); + spy.reset = () => { calls.length = 0; }; + + return spy; +} + +// ─── Mock queue read stream ──────────────────────────────────────────────── + +class MockReadStream extends Readable implements ReadableQueueStream { + private queue: (ReadEvent)[]; + private idx: number; + private _stats: Checkpoint; + private _hasCustomStats: boolean; + private _opts: ReadOptions; + checkpointSpy: SpyFn; + + constructor( + items: (T | ReadEvent)[], + queueName: string, + eventIdFn: (ts: number, gran?: string, record?: number) => string, + stats?: Partial + ) { + super({ objectMode: true }); + const now = Date.now(); + this.idx = 0; + this._hasCustomStats = stats != null && "records" in stats; + this._opts = {} as ReadOptions; + + // Normalize items to ReadEvent shape + this.queue = items.map((item, i) => { + if (isReadEvent(item)) { + return item; + } + return { + id: "mock-bot", + event: queueName, + eid: eventIdFn(now, "full", i), + timestamp: now, + event_source_timestamp: now, + payload: item as T, + } as ReadEvent; + }); + + // Pre-compute stats + const lastItem = this.queue[this.queue.length - 1]; + this._stats = { + checkpoint: lastItem?.eid, + records: this.queue.length, + source_timestamp: this.queue[0]?.event_source_timestamp || now, + started_timestamp: this.queue[0]?.timestamp || now, + ended_timestamp: lastItem?.timestamp || now, + ...stats, + }; + + this.checkpointSpy = createSpy((...args: any[]) => { + const cb = args[args.length - 1]; + if (typeof cb === "function") cb(null); + }); + } + + _read(_size: number): void { + while (this.idx < this.queue.length) { + const item = this.queue[this.idx++]; + if (!this.push(item)) return; + } + this.push(null); + } + + get(): Checkpoint { + return { ...this._stats, records: this._hasCustomStats ? this._stats.records : this.idx }; + } + + getOpts(): ReadOptions { + return this._opts; + } + + checkpoint(paramsOrDone: any, done?: Callback): void { + if (typeof paramsOrDone === "function") { + this.checkpointSpy(paramsOrDone); + } else { + this.checkpointSpy(paramsOrDone, done); + } + } +} + +function isReadEvent(item: any): item is ReadEvent { + return item != null && typeof item === "object" && "eid" in item && "payload" in item; +} + +// ─── Mock write capture ──────────────────────────────────────────────────── + +export interface QueueWriteCapture { + /** All complete event objects written */ + events: Event[]; + /** Just the payloads for convenience */ + payloads: T[]; +} + +// ─── Bot spy surface ─────────────────────────────────────────────────────── + +export interface MockBotControl { + checkLock: SpyFn; + reportComplete: SpyFn; + createLock: SpyFn; + removeLock: SpyFn; + checkpoint: SpyFn; + getCheckpoint: SpyFn & { returnValue: Record }; +} + +// ─── Checkpoint spy surface ──────────────────────────────────────────────── + +export interface MockCheckpointControl { + /** Calls to bot.checkpoint */ + calls: SpyCall[]; + /** Calls recorded on the toCheckpoint stream */ + toCheckpointCalls: SpyCall[]; +} + +// ─── S3 mock capture ─────────────────────────────────────────────────────── + +export interface MockS3Control { + /** Events/data written via streams.toS3, keyed by `${Bucket}/${File}` */ + written: Record; + /** Spy on toS3 calls */ + toS3Spy: SpyFn; + /** Spy on fromS3 calls */ + fromS3Spy: SpyFn; + /** + * Configure data that fromS3 returns, keyed by `${bucket}/${key}`. + * Value should be a Buffer or string. + */ + files: Record; +} + +// ─── Main mock control ──────────────────────────────────────────────────── + +export interface MockControl { + /** + * Configure what events each queue returns when read. + * Key is queue name, value is array of payloads or ReadEvent objects. + * + * @example + * mock.queues["my-queue"] = [{ orderId: "123" }, { orderId: "456" }]; + */ + queues: Record; + + /** + * Access events that were written to each queue. + * Key is queue name (auto-populated as events flow through write streams). + * Returns `{ events: [], payloads: [] }` for queues that were never written to. + */ + written: Record; + + /** + * Access checkpoint-related call records. + */ + checkpoints: MockCheckpointControl; + + /** + * Access bot/cron method call records. + */ + bot: MockBotControl; + + /** + * Override the stats returned by the read stream's `.get()` method. + * Set to null to use auto-computed stats. + */ + stats: Partial | null; + + /** + * Reset all mock state: queues config, written events, call records, stats. + */ + reset(): void; + + /** + * Access the spy on `fromLeo` / `read` calls. + * Useful to assert which bot/queue/config was used. + */ + readSpy: SpyFn; + + /** + * Access the spy on `toLeo` / `write` calls. + */ + writeSpy: SpyFn; + + /** + * Access the spy on `load` calls. + */ + loadSpy: SpyFn; + + /** + * Access S3 mock state: captured writes, file data for reads, and spies. + */ + s3: MockS3Control; +} + +export type MockRStreamsSdk = RStreamsSdk & { mock: MockControl }; + +// ─── Options ─────────────────────────────────────────────────────────────── + +export interface MockRStreamsSdkOptions { + /** + * If true, strip `useS3` from all write options passed to `enrich`, `offload`, + * `load`, and `toLeo`. This prevents any code path from attempting real S3 + * operations during tests. + * + * @default true + */ + disableS3?: boolean; +} + +// ─── Written proxy ───────────────────────────────────────────────────────── + +const EMPTY_CAPTURE: QueueWriteCapture = Object.freeze({ events: [], payloads: [] }); + +/** + * Returns a Proxy over the written record that returns { events: [], payloads: [] } + * for queues that were never written to, avoiding TypeError on assertion. + */ +function createWrittenProxy(written: Record): Record { + return new Proxy(written, { + get(target, prop: string) { + if (prop in target) { + return target[prop]; + } + // Return frozen empty capture for unwritten queues so + // sdk.mock.written["q"].payloads.length doesn't throw + return EMPTY_CAPTURE; + }, + }); +} + +// ─── Main entry point ────────────────────────────────────────────────────── + +/** + * Wraps an RStreamsSdk instance with test-friendly mocks. + * + * All real streaming utilities (through, throughAsync, pipeline, devnull, parse, + * stringify, batch, etc.) are preserved. Only bus-connected operations (read/write + * queues, checkpointing, bot/cron calls) are intercepted. + * + * NOTE: This function mutates `sdk.configuration.validate` to skip config + * validation. This is necessary because enrich/offload close over the + * configuration object. All other SDK state is left untouched. + * + * @param sdk An instance of RStreamsSdk (e.g., `new RStreamsSdk()`) + * @param opts Options to control mock behavior + * @returns A MockRStreamsSdk with all original capabilities plus a `.mock` control surface + */ +export function mockRStreamsSdk(sdk: RStreamsSdk, opts?: MockRStreamsSdkOptions): MockRStreamsSdk { + const disableS3 = opts?.disableS3 !== false; // default true + // ── State ────────────────────────────────────────────────────────── + const state = createMockState(); + + // ── Create mock streams object ──────────────────────────────────── + const realStreams = sdk.streams; + const mockStreams = Object.create(realStreams); + + // Override fromLeo (read from queue) + const readSpy = createSpy(function (botId: string, inQueue: string, config?: ReadOptions): ReadableQueueStream { + const queueData = state.queues[inQueue] || []; + const eventIdFn = realStreams.eventIdFromTimestamp; + return new MockReadStream(queueData, inQueue, eventIdFn, state.stats || undefined); + }); + mockStreams.fromLeo = readSpy; + + // Override toLeo (write to queue) + const writeSpy = createSpy(function (_botId: string, _config?: any): TransformStream { + const eventIdFn = realStreams.eventIdFromTimestamp; + let recordCounter = 0; + const now = Date.now(); + + return realStreams.through((event: any, callback: Callback) => { + if (!event || !event.event) { + // Command events (e.g. __cmd: "checkpoint") have no queue — pass through silently. + // These are internal SDK plumbing, not user events. + return callback(); + } + const queueName = event.event; + if (!state.written[queueName]) { + state.written[queueName] = { events: [], payloads: [] }; + } + // Add eid if missing + if (!event.eid) { + event.eid = eventIdFn(now, "full", recordCounter++); + } + state.written[queueName].events.push(event); + if (event.payload !== undefined) { + state.written[queueName].payloads.push(event.payload); + } + callback(); + }) as any; + }); + mockStreams.toLeo = writeSpy; + + // Override toCheckpoint + mockStreams.toCheckpoint = (_config?: ToCheckpointOptions) => { + state.checkpoints.toCheckpointCalls.push({ args: [_config], timestamp: Date.now() }); + return realStreams.devnull(); + }; + + // Override load + const loadSpy = createSpy(function (_botId?: string, _outQueue?: string, _config?: WriteOptions): WritableStream { + const eventIdFn = realStreams.eventIdFromTimestamp; + let recordCounter = 0; + const now = Date.now(); + const defaultQueue = _outQueue; + + const writeStream = realStreams.through((event: any, callback: Callback) => { + // Determine queue name from event or default + let queueName = defaultQueue; + if (event && event.event) { + queueName = event.event; + } + if (!queueName) { + return callback(); + } + + // Wrap raw payloads into Event shape + let normalized = event; + if (event && !event.event) { + normalized = { + id: _botId || "mock-bot", + event: queueName, + payload: event, + timestamp: Date.now(), + }; + } + if (!normalized.eid) { + normalized.eid = eventIdFn(now, "full", recordCounter++); + } + + if (!state.written[queueName]) { + state.written[queueName] = { events: [], payloads: [] }; + } + state.written[queueName].events.push(normalized); + if (normalized.payload !== undefined) { + state.written[queueName].payloads.push(normalized.payload); + } + callback(); + }); + + return writeStream as any; + }); + mockStreams.load = loadSpy; + + // Override stats + mockStreams.stats = (botId: string, queue: string, _opts?: any): StatsStream => { + const passthrough = realStreams.through((data: any, callback: Callback) => { + callback(null, data); + }) as any as StatsStream; + + passthrough.get = () => { + if (state.stats) { + return state.stats as CheckpointData; + } + return { + eid: realStreams.eventIdFromTimestamp(Date.now()), + units: 0, + source_timestamp: Date.now(), + }; + }; + + passthrough.checkpoint = createSpy((...args: any[]) => { + state.checkpoints.calls.push({ args: [...args], timestamp: Date.now() }); + const cb = args[args.length - 1]; + if (typeof cb === "function") cb(null); + }); + + return passthrough; + }; + + // Override toS3 (capture written data) + const s3State: MockS3Control = { + written: {}, + files: {}, + toS3Spy: createSpy(), + fromS3Spy: createSpy(), + }; + + const toS3Spy = createSpy(function (Bucket: string, File: string): stream.Writable { + const key = `${Bucket}/${File}`; + if (!s3State.written[key]) { + s3State.written[key] = []; + } + return realStreams.write((data: any, callback: Callback) => { + s3State.written[key].push(data); + callback(); + }) as any; + }); + s3State.toS3Spy = toS3Spy; + mockStreams.toS3 = toS3Spy; + + // Override fromS3 (return configured data). + // NOTE: The real S3 SDK fails asynchronously on the stream, but this mock + // throws synchronously for missing keys. This is intentional for test clarity — + // a missing key in tests is a test setup error, not an async condition. + const fromS3Spy = createSpy(function (file: { bucket: string; key: string; range?: string }): stream.Readable { + const key = `${file.bucket}/${file.key}`; + const data = s3State.files[key]; + if (data == null) { + const err = new Error("The specified key does not exist.") as any; + err.code = "NoSuchKey"; + throw err; + } + const buf = Buffer.isBuffer(data) ? data : Buffer.from(data); + const readable = new Readable({ + read() { + this.push(buf); + this.push(null); + }, + }); + return readable; + }); + s3State.fromS3Spy = fromS3Spy; + mockStreams.fromS3 = fromS3Spy; + + // ── Reimplemented enrich/offload ────────────────────────────────── + // + // The real enrich/offload in leo-stream.js reference a closed-over `ls` + // variable, so Object.create overrides on mockStreams are invisible to + // them. We reimplement the pipeline assembly here so it uses the mock's + // fromLeo/toLeo/toCheckpoint. + // + // IMPORTANT: If the real enrich/offload pipeline assembly changes in + // leo-stream.js, this reimplementation must be updated to match. + // This is a known trade-off — the alternative (patching the closed-over + // `ls` directly) would require mutating the original SDK's streams. + + mockStreams.enrich = function (opts: any, callback: Callback) { + const id = opts.id; + const inQueue = opts.inQueue; + const outQueue = opts.outQueue; + const func = opts.transform || opts.each; + const config = opts.config || {}; + config.start = config.start || opts.start; + config.debug = opts.debug; + + const args: any[] = []; + args.push(mockStreams.fromLeo(id, inQueue, config)); + + if (opts.batch) { + args.push(realStreams.batch(opts.batch)); + } + + args.push(realStreams.process(id, func, outQueue)); + args.push(mockStreams.toLeo(id, opts)); + args.push(mockStreams.toCheckpoint({ + debug: opts.debug, + force: opts.force, + onCheckpoint: opts.onCheckpoint, + })); + args.push(callback); + return realStreams.pipe.apply(realStreams, args); + }; + + mockStreams.offload = function (opts: any, callback: Callback) { + const id = opts.id; + const inQueue = opts.inQueue || opts.queue; + const func = opts.each || opts.transform; + let batchConfig: any = { size: 1, map: (payload: any, meta: any, done: any) => done(null, payload) }; + + // Normalize top-level batch shorthand options (matches real offload behavior) + if (typeof opts.size != "object" && (opts.count || opts.records || opts.units || opts.time || opts.bytes)) { + const size = {} as any; + size.count = opts.count || opts.records || opts.units; + size.time = opts.time; + size.bytes = opts.size || opts.bytes; + size.highWaterMark = opts.highWaterMark || 2; + opts.size = size; + } + + if (!opts.batch || typeof opts.batch === "number") { + batchConfig.size = opts.batch || batchConfig.size; + } else { + batchConfig.size = opts.batch.size || ((opts.batch.count || opts.batch.bytes || opts.batch.time || opts.batch.highWaterMark) && opts.batch) || batchConfig.size; + batchConfig.map = opts.batch.map || batchConfig.map; + } + if (typeof batchConfig.size !== "object") { + batchConfig.size = { count: batchConfig.size }; + } + batchConfig.size.highWaterMark = batchConfig.size.highWaterMark || 2; + + const batchSize = typeof batchConfig.size === "number" ? batchConfig.size : (batchConfig.size.count || batchConfig.size.records); + + return realStreams.pipe( + mockStreams.fromLeo(id, inQueue, opts), + realStreams.through((obj: any, done: any) => { + batchConfig.map(obj.payload, obj, (err: any, r: any) => { + if (err || !r) { + done(err); + } else { + obj.payload = r; + done(null, obj); + } + }); + }), + realStreams.batch(batchConfig.size), + realStreams.through({ highWaterMark: 1 }, (batch: any, done: any) => { + batch.units = batch.payload.length; + const last = batch.payload[batch.units - 1]; + if (batchSize == 1) { + done(null, last); + } else { + batch.event_source_timestamp = last.event_source_timestamp; + batch.event = last.event; + batch.eid = last.eid; + done(null, batch); + } + }), + realStreams.process(id, func, null, undefined, { highWaterMark: 1 }), + mockStreams.toCheckpoint({ + debug: opts.debug, + force: opts.force, + onCheckpoint: opts.onCheckpoint, + }), + callback + ); + }; + + // Override cron on the streams object + const botSpies = createBotSpies(state); + mockStreams.cron = botSpies.cron; + + // ── Create the mock SDK ─────────────────────────────────────────── + const mockSdk = Object.create(sdk) as MockRStreamsSdk; + mockSdk.streams = mockStreams; + + // Skip configuration validation in tests. + // NOTE: This mutates the original sdk.configuration object. This is + // necessary because enrich/offload in leo-stream.js close over the + // configure object passed at factory time — the same object reference + // as sdk.configuration. There is no way to intercept the validate() + // call without patching this object directly. + const origValidate = sdk.configuration && (sdk.configuration as any).validate; + if (sdk.configuration) { + (sdk.configuration as any).validate = () => true; + } + + // Override top-level aliases + mockSdk.read = mockStreams.fromLeo.bind(mockStreams); + mockSdk.write = mockStreams.toLeo.bind(mockStreams); + mockSdk.checkpoint = mockStreams.toCheckpoint.bind(mockStreams); + mockSdk.load = mockStreams.load.bind(mockStreams); + mockSdk.throughAsync = realStreams.throughAsync?.bind(realStreams); + + // Override bot + mockSdk.bot = botSpies.cron; + + // Wrap enrich/offload with disableS3 stripping if enabled + const wrappedEnrich = disableS3 + ? (enrichOpts: any, callback: Callback) => { + delete enrichOpts.useS3; + return mockStreams.enrich(enrichOpts, callback); + } + : mockStreams.enrich; + mockSdk.enrich = wrappedEnrich; + mockSdk.enrichEvents = promisify(wrappedEnrich) as any; + + const wrappedOffload = disableS3 + ? (offloadOpts: any, callback: Callback) => { + delete offloadOpts.useS3; + return mockStreams.offload(offloadOpts, callback); + } + : mockStreams.offload; + mockSdk.offload = wrappedOffload; + mockSdk.offloadEvents = promisify(wrappedOffload) as any; + + // Override put/putEvent/putEvents to route through mock load + mockSdk.put = (botId: string, outQueue: string, payload: Event | T, callback: Callback) => { + try { + const event: Event = isEvent(payload) + ? payload + : { id: botId, event: outQueue, payload: payload as T } as Event; + + if (!state.written[outQueue]) { + state.written[outQueue] = { events: [], payloads: [] }; + } + state.written[outQueue].events.push(event); + if (event.payload !== undefined) { + state.written[outQueue].payloads.push(event.payload); + } + callback(null); + } catch (err) { + callback(err); + } + }; + + mockSdk.putEvent = (botId: string, outQueue: string, payload: Event | T): Promise => { + return new Promise((resolve, reject) => { + mockSdk.put(botId, outQueue, payload, (err: any) => { + if (err) reject(err); else resolve(); + }); + }); + }; + + mockSdk.putEvents = (payloads: (Event | T)[], settings?: { botId?: string; queue?: string; writeOptions?: WriteOptions }): Promise => { + const botId = settings?.botId || "mock-bot"; + const queue = settings?.queue || "unknown-queue"; + return Promise.all(payloads.map(p => mockSdk.putEvent(botId, queue, p))).then(() => { }); + }; + + // ── Attach mock control surface ─────────────────────────────────── + const writtenProxy = createWrittenProxy(state.written); + + const mockControl: MockControl = Object.create(null); + Object.defineProperties(mockControl, { + queues: { value: state.queues, writable: true }, + written: { value: writtenProxy }, + checkpoints: { value: state.checkpoints }, + bot: { value: botSpies.control }, + readSpy: { value: readSpy }, + writeSpy: { value: writeSpy }, + loadSpy: { value: loadSpy }, + s3: { value: s3State }, + stats: { + get() { return state.stats; }, + set(v: Partial | null) { state.stats = v; }, + enumerable: true, + }, + reset: { + value() { + // Clear queues config + for (const key of Object.keys(state.queues)) { + delete state.queues[key]; + } + // Clear written + for (const key of Object.keys(state.written)) { + delete state.written[key]; + } + // Clear checkpoint records + state.checkpoints.calls.length = 0; + state.checkpoints.toCheckpointCalls.length = 0; + // Reset bot spies + botSpies.control.checkLock.reset(); + botSpies.control.reportComplete.reset(); + botSpies.control.createLock.reset(); + botSpies.control.removeLock.reset(); + botSpies.control.checkpoint.reset(); + botSpies.control.getCheckpoint.reset(); + botSpies.control.getCheckpoint.returnValue = {}; + // Reset operation spies + readSpy.reset(); + writeSpy.reset(); + loadSpy.reset(); + // Clear S3 state + for (const key of Object.keys(s3State.written)) { + delete s3State.written[key]; + } + for (const key of Object.keys(s3State.files)) { + delete s3State.files[key]; + } + s3State.toS3Spy.reset(); + s3State.fromS3Spy.reset(); + // Clear stats override + state.stats = null; + }, + }, + }); + mockSdk.mock = mockControl; + + return mockSdk; +} + +// ─── Internal helpers ────────────────────────────────────────────────────── + +function createMockState() { + return { + queues: {} as Record, + written: {} as Record, + checkpoints: { + calls: [] as SpyCall[], + toCheckpointCalls: [] as SpyCall[], + }, + stats: null as Partial | null, + }; +} + +function createBotSpies(state: ReturnType) { + const checkLockSpy = createSpy(); + const reportCompleteSpy = createSpy(); + const createLockSpy = createSpy(); + const removeLockSpy = createSpy(); + + const checkpointSpy = createSpy((...args: any[]) => { + state.checkpoints.calls.push({ args: [...args], timestamp: Date.now() }); + const cb = args[args.length - 1]; + if (typeof cb === "function") cb(null); + }); + + const getCheckpointReturnValue: Record = {}; + const getCheckpointSpy = createSpy(async (botId?: string, queue?: string) => { + if (queue && getCheckpointReturnValue[queue]) { + return getCheckpointReturnValue[queue]; + } + return undefined; + }) as SpyFn & { returnValue: Record }; + getCheckpointSpy.returnValue = getCheckpointReturnValue; + + const cron: LeoCron = { + checkLock: checkLockSpy as any, + reportComplete: reportCompleteSpy as any, + createLock: createLockSpy as any, + removeLock: removeLockSpy as any, + checkpoint: checkpointSpy as any, + getCheckpoint: getCheckpointSpy as any, + trigger: createSpy() as any, + schedule: createSpy(async () => { }) as any, + update: createSpy() as any, + subscribe: createSpy() as any, + runAgain: createSpy() as any, + getLastResult: createSpy() as any, + setMessage: createSpy() as any, + getAttachedSystemByAlias: createSpy() as any, + getAttachedSystem: createSpy() as any, + get: createSpy() as any, + buildPayloads: createSpy() as any, + shouldRun: createSpy() as any, + hasMoreToDo: createSpy() as any, + start: createSpy() as any, + end: createSpy() as any, + run: createSpy() as any, + createBot: createSpy() as any, + }; + + const control: MockBotControl = { + checkLock: checkLockSpy, + reportComplete: reportCompleteSpy, + createLock: createLockSpy, + removeLock: removeLockSpy, + checkpoint: checkpointSpy, + getCheckpoint: getCheckpointSpy, + }; + + return { cron, control }; +} + +function isEvent(val: any): val is Event { + return val != null && typeof val === "object" && "id" in val && "event" in val; +} + +// ─── Test helpers ────────────────────────────────────────────────────────── + +export interface CreateContextOptions { + /** Lambda timeout in seconds. Defaults to 300 (5 minutes). */ + Timeout?: number; +} + +/** + * Creates a fake Lambda Context object suitable for calling bot handlers in tests. + * Provides `awsRequestId` and a working `getRemainingTimeInMillis()`. + * + * @param config Optional configuration. `Timeout` is in seconds (default 300). + * @returns A partial Lambda Context object + */ +export function createContext(config?: CreateContextOptions) { + const start = Date.now(); + const maxTime = (config?.Timeout ?? 300) * 1000; + return { + awsRequestId: "requestid-mock-" + Date.now().toString(), + functionName: "mock-function", + functionVersion: "$LATEST", + invokedFunctionArn: "arn:aws:lambda:us-east-1:000000000000:function:mock-function", + memoryLimitInMB: "256", + logGroupName: "/aws/lambda/mock-function", + logStreamName: "mock-log-stream", + getRemainingTimeInMillis: () => { + const elapsed = Date.now() - start; + return elapsed < maxTime ? maxTime - elapsed : 0; + }, + callbackWaitsForEmptyEventLoop: true, + done: () => { }, + fail: () => { }, + succeed: () => { }, + }; +} + +/** + * Creates a fake BotInvocationEvent suitable for calling bot handlers in tests. + * Sets up `__cron` with `force: true` and `ignoreLock: true` so the handler + * skips lock checks. + * + * @param botId The bot ID to use + * @param settings Additional settings to merge into the event (e.g., queue, botNumber) + * @returns A BotInvocationEvent-compatible object + */ +export function createBotInvocationEvent = {}>( + botId: string, + settings?: T +): BotInvocationEvent & T { + return { + ...settings, + botId, + __cron: { + id: botId, + iid: "0", + name: botId, + ts: Date.now(), + force: true, + ignoreLock: true, + }, + } as BotInvocationEvent & T; +} + +export default mockRStreamsSdk; diff --git a/test/lib.mock-sdk.utest.ts b/test/lib.mock-sdk.utest.ts new file mode 100644 index 0000000..fe25708 --- /dev/null +++ b/test/lib.mock-sdk.utest.ts @@ -0,0 +1,1019 @@ +import { assert } from "chai"; +import { mockRStreamsSdk, MockRStreamsSdk, SpyFn, createContext, createBotInvocationEvent, MockRStreamsSdkOptions } from "../lib/mock-sdk"; + +// Minimal SDK construction that doesn't need AWS resources +import utilFn from "../lib/stream/leo-stream"; + +function createMinimalSdk(): any { + const config = { onUpdate: () => { }, resources: {}, aws: {}, validate: () => true } as any; + const streams = utilFn(config); + // Simulate RStreamsSdk shape with real streaming utilities + return { + configuration: config, + streams, + bot: streams.cron, + read: streams.fromLeo?.bind(streams), + write: streams.toLeo?.bind(streams), + load: streams.load?.bind(streams), + enrich: streams.enrich?.bind(streams), + offload: streams.offload?.bind(streams), + checkpoint: streams.toCheckpoint?.bind(streams), + throughAsync: streams.throughAsync?.bind(streams), + aws: {}, + put: () => { }, + putEvent: async () => { }, + putEvents: async () => { }, + }; +} + +describe("lib/mock-sdk.ts", function () { + let sdk: MockRStreamsSdk; + + beforeEach(() => { + const baseSdk = createMinimalSdk(); + sdk = mockRStreamsSdk(baseSdk); + }); + + afterEach(() => { + sdk.mock.reset(); + }); + + describe("mockRStreamsSdk", () => { + it("returns an object with a mock property", () => { + assert.isObject(sdk.mock); + assert.isObject(sdk.mock.queues); + assert.isObject(sdk.mock.written); + assert.isObject(sdk.mock.checkpoints); + assert.isObject(sdk.mock.bot); + assert.isFunction(sdk.mock.reset); + }); + + it("preserves real streaming utilities", () => { + assert.isFunction(sdk.streams.through); + assert.isFunction(sdk.streams.throughAsync); + assert.isFunction(sdk.streams.pipeline); + assert.isFunction(sdk.streams.devnull); + assert.isFunction(sdk.streams.parse); + assert.isFunction(sdk.streams.stringify); + }); + }); + + describe("reading from queues", () => { + it("reads configured queue data as a stream", async () => { + sdk.mock.queues["test-queue"] = [ + { orderId: "123", amount: 10 }, + { orderId: "456", amount: 20 }, + ]; + + const events: any[] = []; + await sdk.streams.pipeAsync( + sdk.streams.fromLeo("mock-bot", "test-queue"), + sdk.streams.through((event: any, done: any) => { + events.push(event); + done(); + }), + sdk.streams.devnull() + ); + + assert.equal(events.length, 2); + assert.deepEqual(events[0].payload, { orderId: "123", amount: 10 }); + assert.deepEqual(events[1].payload, { orderId: "456", amount: 20 }); + }); + + it("auto-generates eid and event fields on read events", async () => { + sdk.mock.queues["test-queue"] = [{ data: "hello" }]; + + const events: any[] = []; + await sdk.streams.pipeAsync( + sdk.streams.fromLeo("my-bot", "test-queue"), + sdk.streams.through((event: any, done: any) => { + events.push(event); + done(); + }), + sdk.streams.devnull() + ); + + assert.equal(events.length, 1); + assert.isString(events[0].eid); + assert.equal(events[0].event, "test-queue"); + assert.equal(events[0].id, "mock-bot"); + }); + + it("passes through pre-formed ReadEvent objects", async () => { + sdk.mock.queues["test-queue"] = [{ + id: "custom-bot", + event: "test-queue", + eid: "z/2025/01/01/00/00/1234567890", + timestamp: 1234567890, + event_source_timestamp: 1234567890, + payload: { key: "value" }, + }]; + + const events: any[] = []; + await sdk.streams.pipeAsync( + sdk.streams.fromLeo("my-bot", "test-queue"), + sdk.streams.through((event: any, done: any) => { + events.push(event); + done(); + }), + sdk.streams.devnull() + ); + + assert.equal(events.length, 1); + assert.equal(events[0].eid, "z/2025/01/01/00/00/1234567890"); + assert.equal(events[0].id, "custom-bot"); + assert.deepEqual(events[0].payload, { key: "value" }); + }); + + it("returns an empty stream for unconfigured queues", async () => { + const events: any[] = []; + await sdk.streams.pipeAsync( + sdk.streams.fromLeo("my-bot", "nonexistent-queue"), + sdk.streams.through((event: any, done: any) => { + events.push(event); + done(); + }), + sdk.streams.devnull() + ); + + assert.equal(events.length, 0); + }); + + it("provides stats via .get() on the read stream", () => { + sdk.mock.queues["test-queue"] = [ + { a: 1 }, + { a: 2 }, + { a: 3 }, + ]; + + const stream = sdk.streams.fromLeo("my-bot", "test-queue"); + const stats = stream.get(); + + assert.isObject(stats); + assert.isNumber(stats.source_timestamp); + assert.isNumber(stats.records); + }); + + it("records readSpy calls", () => { + sdk.mock.queues["test-queue"] = []; + sdk.streams.fromLeo("bot-1", "test-queue", { limit: 100 }); + + assert.equal(sdk.mock.readSpy.callCount, 1); + assert.equal(sdk.mock.readSpy.lastCall!.args[0], "bot-1"); + assert.equal(sdk.mock.readSpy.lastCall!.args[1], "test-queue"); + }); + + it("sdk.read alias works the same as streams.fromLeo", async () => { + sdk.mock.queues["alias-queue"] = [{ x: 1 }]; + + const events: any[] = []; + await sdk.streams.pipeAsync( + sdk.read("my-bot", "alias-queue"), + sdk.streams.through((event: any, done: any) => { + events.push(event); + done(); + }), + sdk.streams.devnull() + ); + + assert.equal(events.length, 1); + assert.deepEqual(events[0].payload, { x: 1 }); + }); + }); + + describe("writing to queues", () => { + it("captures written events", async () => { + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([ + { id: "bot", event: "out-queue", payload: { item: "A" } }, + { id: "bot", event: "out-queue", payload: { item: "B" } }, + ]), + sdk.streams.toLeo("my-bot"), + sdk.streams.devnull() + ); + + assert.isDefined(sdk.mock.written["out-queue"]); + assert.equal(sdk.mock.written["out-queue"].events.length, 2); + assert.deepEqual(sdk.mock.written["out-queue"].payloads, [ + { item: "A" }, + { item: "B" }, + ]); + }); + + it("auto-generates eid on written events", async () => { + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([ + { id: "bot", event: "q", payload: { x: 1 } }, + ]), + sdk.streams.toLeo("my-bot"), + sdk.streams.devnull() + ); + + const event = sdk.mock.written["q"].events[0] as any; + assert.isString(event.eid); + assert.match(event.eid, /^z\//); + }); + + it("separates events by queue name", async () => { + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([ + { id: "bot", event: "queue-a", payload: { val: 1 } }, + { id: "bot", event: "queue-b", payload: { val: 2 } }, + { id: "bot", event: "queue-a", payload: { val: 3 } }, + ]), + sdk.streams.toLeo("my-bot"), + sdk.streams.devnull() + ); + + assert.equal(sdk.mock.written["queue-a"].events.length, 2); + assert.equal(sdk.mock.written["queue-b"].events.length, 1); + }); + + it("records writeSpy calls", () => { + sdk.streams.toLeo("my-bot", { useS3: false }); + assert.equal(sdk.mock.writeSpy.callCount, 1); + assert.equal(sdk.mock.writeSpy.lastCall!.args[0], "my-bot"); + }); + + it("sdk.write alias works", async () => { + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([ + { id: "bot", event: "q", payload: { y: 99 } }, + ]), + sdk.write("my-bot"), + sdk.streams.devnull() + ); + + assert.equal(sdk.mock.written["q"].payloads.length, 1); + }); + }); + + describe("load", () => { + it("captures events written via load", async () => { + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([ + { id: "bot", event: "load-queue", payload: { data: "test" } }, + ]), + sdk.streams.load("my-bot", "load-queue"), + ); + + assert.isDefined(sdk.mock.written["load-queue"]); + assert.equal(sdk.mock.written["load-queue"].payloads.length, 1); + assert.deepEqual(sdk.mock.written["load-queue"].payloads[0], { data: "test" }); + }); + + it("wraps raw payloads into Event shape", async () => { + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([ + { rawData: "hello" }, + ]), + sdk.streams.load("my-bot", "raw-queue"), + ); + + const events = sdk.mock.written["raw-queue"].events; + assert.equal(events.length, 1); + assert.equal(events[0].id, "my-bot"); + assert.equal(events[0].event, "raw-queue"); + }); + + it("records loadSpy calls", () => { + sdk.streams.load("bot-x", "queue-y", { useS3: false }); + assert.equal(sdk.mock.loadSpy.callCount, 1); + }); + }); + + describe("checkpointing", () => { + it("toCheckpoint returns a working stream (devnull)", async () => { + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([{ a: 1 }, { a: 2 }]), + sdk.streams.toCheckpoint(), + ); + // No error means the stream worked + assert.isTrue(true); + }); + + it("records toCheckpoint creation calls", () => { + sdk.streams.toCheckpoint({ records: 100 }); + assert.equal(sdk.mock.checkpoints.toCheckpointCalls.length, 1); + }); + + it("checkpoint on read stream is a spy", async () => { + sdk.mock.queues["cp-queue"] = [{ a: 1 }]; + const readStream = sdk.streams.fromLeo("bot", "cp-queue") as any; + + let callbackCalled = false; + readStream.checkpoint(() => { callbackCalled = true; }); + + assert.isTrue(callbackCalled); + assert.equal(readStream.checkpointSpy.callCount, 1); + }); + }); + + describe("bot/cron spying", () => { + it("records checkLock calls", () => { + let called = false; + sdk.bot.checkLock({} as any, "run-1", 5000, ((err: any) => { called = true; }) as any); + + assert.isTrue(called); + assert.equal(sdk.mock.bot.checkLock.callCount, 1); + assert.equal(sdk.mock.bot.checkLock.lastCall!.args[1], "run-1"); + }); + + it("records reportComplete calls", () => { + let called = false; + sdk.bot.reportComplete({} as any, "run-1", "complete", {}, {} as any, ((err: any) => { called = true; }) as any); + + assert.isTrue(called); + assert.equal(sdk.mock.bot.reportComplete.callCount, 1); + }); + + it("records createLock calls", () => { + let called = false; + sdk.bot.createLock("my-bot", "run-1", 60000, ((err: any) => { called = true; }) as any); + + assert.isTrue(called); + assert.equal(sdk.mock.bot.createLock.callCount, 1); + }); + + it("records removeLock calls", () => { + let called = false; + sdk.bot.removeLock("my-bot", "run-1", ((err: any) => { called = true; }) as any); + + assert.isTrue(called); + assert.equal(sdk.mock.bot.removeLock.callCount, 1); + }); + + it("records checkpoint calls on bot", () => { + let called = false; + sdk.bot.checkpoint("my-bot", "queue", { eid: "z/123", source_timestamp: 100, units: 5 }, ((err: any) => { called = true; }) as any); + + assert.isTrue(called); + assert.equal(sdk.mock.bot.checkpoint.callCount, 1); + assert.equal(sdk.mock.bot.checkpoint.lastCall!.args[0], "my-bot"); + }); + + it("getCheckpoint returns configured values", async () => { + sdk.mock.bot.getCheckpoint.returnValue["my-queue"] = { + checkpoint: "z/2025/01/01/00/00/1234", + records: 100, + }; + + const result = await sdk.bot.getCheckpoint("my-bot", "my-queue"); + assert.deepEqual(result, { + checkpoint: "z/2025/01/01/00/00/1234", + records: 100, + }); + assert.equal(sdk.mock.bot.getCheckpoint.callCount, 1); + }); + + it("getCheckpoint returns undefined for unconfigured queue", async () => { + const result = await sdk.bot.getCheckpoint("my-bot", "other-queue"); + assert.isUndefined(result); + }); + }); + + describe("put / putEvent / putEvents", () => { + it("put captures events", (done) => { + sdk.put("bot-1", "put-queue", { message: "hello" }, (err: any) => { + assert.isNull(err); + assert.equal(sdk.mock.written["put-queue"].payloads.length, 1); + assert.deepEqual(sdk.mock.written["put-queue"].payloads[0], { message: "hello" }); + done(); + }); + }); + + it("putEvent captures events", async () => { + await sdk.putEvent("bot-1", "pe-queue", { val: 42 }); + + assert.equal(sdk.mock.written["pe-queue"].payloads.length, 1); + assert.deepEqual(sdk.mock.written["pe-queue"].payloads[0], { val: 42 }); + }); + + it("putEvents captures multiple events", async () => { + await sdk.putEvents( + [{ data: "a" }, { data: "b" }, { data: "c" }], + { botId: "bot-1", queue: "multi-queue" } + ); + + assert.equal(sdk.mock.written["multi-queue"].payloads.length, 3); + }); + }); + + describe("stats override", () => { + it("uses custom stats when configured", () => { + sdk.mock.stats = { + checkpoint: "z/2025/01/01/00/00/custom", + records: 999, + source_timestamp: 1700000000000, + }; + + sdk.mock.queues["stats-queue"] = [{ a: 1 }]; + const stream = sdk.streams.fromLeo("bot", "stats-queue"); + const stats = stream.get(); + + assert.equal(stats.checkpoint, "z/2025/01/01/00/00/custom"); + assert.equal(stats.source_timestamp, 1700000000000); + assert.equal(stats.records, 999); + }); + + it("read stream get() uses idx for records when no custom stats", async () => { + sdk.mock.queues["count-queue"] = [{ a: 1 }, { a: 2 }, { a: 3 }]; + const stream = sdk.streams.fromLeo("bot", "count-queue"); + + // Before reading, records should be 0 + assert.equal(stream.get().records, 0); + + // Read all events + const events: any[] = []; + await sdk.streams.pipeAsync( + stream, + sdk.streams.through((e: any, done: any) => { events.push(e); done(); }), + sdk.streams.devnull() + ); + + // After reading, records should reflect items read + assert.equal(events.length, 3); + }); + + it("stats on stats stream uses override", () => { + sdk.mock.stats = { + checkpoint: "custom-checkpoint", + records: 42, + }; + + const statsStream = sdk.streams.stats("bot", "queue"); + const result = statsStream.get(); + + assert.equal((result as any).checkpoint, "custom-checkpoint"); + assert.equal((result as any).records, 42); + }); + }); + + describe("real streaming utilities still work", () => { + it("through transforms data", async () => { + const results: number[] = []; + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([1, 2, 3]), + sdk.streams.through((n: number, done: any) => { + done(null, n * 2); + }), + sdk.streams.through((n: number, done: any) => { + results.push(n); + done(); + }), + sdk.streams.devnull() + ); + + assert.deepEqual(results, [2, 4, 6]); + }); + + it("pipeline combines streams", async () => { + const results: any[] = []; + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray(["a", "b", "c"]), + sdk.streams.through((s: string, done: any) => { + done(null, s.toUpperCase()); + }), + sdk.streams.through((s: string, done: any) => { + results.push(s); + done(); + }), + sdk.streams.devnull() + ); + + assert.deepEqual(results, ["A", "B", "C"]); + }); + }); + + describe("reset", () => { + it("clears all mock state", async () => { + // Set up some state + sdk.mock.queues["q1"] = [{ a: 1 }]; + sdk.mock.stats = { records: 10 }; + sdk.mock.bot.getCheckpoint.returnValue["q1"] = { checkpoint: "z/123" }; + + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([ + { id: "bot", event: "out", payload: { x: 1 } }, + ]), + sdk.streams.toLeo("bot"), + sdk.streams.devnull() + ); + + sdk.bot.checkLock({} as any, "r", 1, (() => { }) as any); + + // Verify state exists + assert.isNotEmpty(Object.keys(sdk.mock.written)); + assert.equal(sdk.mock.bot.checkLock.callCount, 1); + + // Reset + sdk.mock.reset(); + + // Verify all cleared + assert.isEmpty(Object.keys(sdk.mock.queues)); + assert.isEmpty(Object.keys(sdk.mock.written)); + assert.equal(sdk.mock.checkpoints.calls.length, 0); + assert.equal(sdk.mock.checkpoints.toCheckpointCalls.length, 0); + assert.equal(sdk.mock.bot.checkLock.callCount, 0); + assert.equal(sdk.mock.bot.reportComplete.callCount, 0); + assert.equal(sdk.mock.bot.checkpoint.callCount, 0); + assert.equal(sdk.mock.bot.getCheckpoint.callCount, 0); + assert.equal(sdk.mock.readSpy.callCount, 0); + assert.equal(sdk.mock.writeSpy.callCount, 0); + assert.equal(sdk.mock.loadSpy.callCount, 0); + assert.isNull(sdk.mock.stats); + }); + }); + + describe("SpyFn interface", () => { + it("tracks callCount and called", () => { + assert.isFalse(sdk.mock.readSpy.called); + assert.equal(sdk.mock.readSpy.callCount, 0); + + sdk.mock.queues["q"] = []; + sdk.streams.fromLeo("bot", "q"); + + assert.isTrue(sdk.mock.readSpy.called); + assert.equal(sdk.mock.readSpy.callCount, 1); + }); + + it("tracks lastCall", () => { + assert.isUndefined(sdk.mock.readSpy.lastCall); + + sdk.mock.queues["q"] = []; + sdk.streams.fromLeo("bot-a", "q"); + sdk.streams.fromLeo("bot-b", "q"); + + assert.equal(sdk.mock.readSpy.lastCall!.args[0], "bot-b"); + }); + + it("spy.reset clears call history", () => { + sdk.mock.queues["q"] = []; + sdk.streams.fromLeo("bot", "q"); + assert.equal(sdk.mock.readSpy.callCount, 1); + + sdk.mock.readSpy.reset(); + assert.equal(sdk.mock.readSpy.callCount, 0); + assert.isFalse(sdk.mock.readSpy.called); + }); + }); + + describe("S3 mocking", () => { + it("toS3 captures written data", async () => { + const writeStream = sdk.streams.toS3("my-bucket", "my-file.gz"); + await new Promise((resolve, reject) => { + writeStream.write("chunk1", (err) => { + if (err) return reject(err); + writeStream.write("chunk2", (err) => { + if (err) return reject(err); + writeStream.end(resolve); + }); + }); + }); + + const key = "my-bucket/my-file.gz"; + assert.isDefined(sdk.mock.s3.written[key]); + assert.equal(sdk.mock.s3.written[key].length, 2); + assert.equal(sdk.mock.s3.written[key][0], "chunk1"); + assert.equal(sdk.mock.s3.written[key][1], "chunk2"); + }); + + it("toS3 records spy calls", () => { + sdk.streams.toS3("bucket-a", "file-b.txt"); + assert.equal(sdk.mock.s3.toS3Spy.callCount, 1); + assert.equal(sdk.mock.s3.toS3Spy.lastCall!.args[0], "bucket-a"); + assert.equal(sdk.mock.s3.toS3Spy.lastCall!.args[1], "file-b.txt"); + }); + + it("fromS3 returns configured file data", async () => { + sdk.mock.s3.files["my-bucket/path/to/file.json"] = '{"hello":"world"}'; + + const readStream = sdk.streams.fromS3({ + bucket: "my-bucket", + key: "path/to/file.json", + }); + + const chunks: Buffer[] = []; + await new Promise((resolve, reject) => { + readStream.on("data", (chunk) => chunks.push(chunk)); + readStream.on("end", resolve); + readStream.on("error", reject); + }); + + const result = Buffer.concat(chunks).toString(); + assert.equal(result, '{"hello":"world"}'); + }); + + it("fromS3 returns Buffer data", async () => { + const buf = Buffer.from([0x01, 0x02, 0x03]); + sdk.mock.s3.files["bucket/binary.dat"] = buf; + + const readStream = sdk.streams.fromS3({ + bucket: "bucket", + key: "binary.dat", + }); + + const chunks: Buffer[] = []; + await new Promise((resolve, reject) => { + readStream.on("data", (chunk) => chunks.push(chunk)); + readStream.on("end", resolve); + readStream.on("error", reject); + }); + + assert.deepEqual(Buffer.concat(chunks), buf); + }); + + it("fromS3 throws NoSuchKey for unconfigured files", () => { + assert.throws(() => { + sdk.streams.fromS3({ bucket: "b", key: "missing.txt" }); + }, /specified key does not exist/); + }); + + it("fromS3 records spy calls", () => { + sdk.mock.s3.files["b/k"] = "data"; + sdk.streams.fromS3({ bucket: "b", key: "k" }); + assert.equal(sdk.mock.s3.fromS3Spy.callCount, 1); + }); + + it("reset clears S3 state", () => { + sdk.mock.s3.files["b/k"] = "data"; + sdk.mock.s3.written["b/f"] = ["chunk"]; + sdk.streams.toS3("x", "y"); + + sdk.mock.reset(); + + assert.isEmpty(Object.keys(sdk.mock.s3.files)); + assert.isEmpty(Object.keys(sdk.mock.s3.written)); + assert.equal(sdk.mock.s3.toS3Spy.callCount, 0); + assert.equal(sdk.mock.s3.fromS3Spy.callCount, 0); + }); + }); + + describe("createContext", () => { + it("returns a context with awsRequestId", () => { + const ctx = createContext(); + assert.isString(ctx.awsRequestId); + assert.match(ctx.awsRequestId, /^requestid-mock-/); + }); + + it("getRemainingTimeInMillis returns positive time", () => { + const ctx = createContext({ Timeout: 10 }); + const remaining = ctx.getRemainingTimeInMillis(); + assert.isAbove(remaining, 0); + assert.isAtMost(remaining, 10000); + }); + + it("defaults to 300 second timeout", () => { + const ctx = createContext(); + const remaining = ctx.getRemainingTimeInMillis(); + assert.isAbove(remaining, 299000); + assert.isAtMost(remaining, 300000); + }); + + it("has standard Lambda context fields", () => { + const ctx = createContext(); + assert.isString(ctx.functionName); + assert.isString(ctx.functionVersion); + assert.isString(ctx.invokedFunctionArn); + assert.isString(ctx.memoryLimitInMB); + assert.isFunction(ctx.getRemainingTimeInMillis); + }); + }); + + describe("createBotInvocationEvent", () => { + it("creates an event with botId and __cron", () => { + const event = createBotInvocationEvent("my-bot"); + assert.equal(event.botId, "my-bot"); + assert.isObject(event.__cron); + assert.equal(event.__cron.id, "my-bot"); + assert.equal(event.__cron.name, "my-bot"); + assert.isTrue(event.__cron.force); + assert.isTrue(event.__cron.ignoreLock); + assert.isNumber(event.__cron.ts); + }); + + it("merges additional settings", () => { + const event = createBotInvocationEvent("my-bot", { + queue: "input-queue", + botNumber: 2, + source: "upstream", + }); + + assert.equal(event.botId, "my-bot"); + assert.equal((event as any).queue, "input-queue"); + assert.equal((event as any).botNumber, 2); + assert.equal((event as any).source, "upstream"); + }); + + it("settings do not override botId or __cron", () => { + const event = createBotInvocationEvent("correct-bot", { + botId: "wrong-bot", + } as any); + + // botId from the first arg wins because it's spread after settings + assert.equal(event.botId, "correct-bot"); + }); + }); + + describe("configuration.validate", () => { + it("is overridden to return true", () => { + assert.isFunction((sdk.configuration as any).validate); + assert.isTrue((sdk.configuration as any).validate()); + }); + }); + + describe("enrich end-to-end", () => { + it("reads from mock queue, transforms, writes to output queue", (done) => { + sdk.mock.queues["raw-items"] = [ + { name: "Widget", price: 10 }, + { name: "Gadget", price: 20 }, + ]; + + sdk.enrich({ + id: "enricher-bot", + inQueue: "raw-items", + outQueue: "enriched-items", + transform(payload: any, event: any, cb: any) { + cb(null, { ...payload, taxed_price: payload.price * 1.1 }); + }, + }, (err: any) => { + assert.isNotOk(err); + const output = sdk.mock.written["enriched-items"]; + assert.equal(output.payloads.length, 2); + assert.closeTo(output.payloads[0].taxed_price, 11, 0.01); + assert.closeTo(output.payloads[1].taxed_price, 22, 0.01); + assert.equal(output.payloads[0].name, "Widget"); + done(); + }); + }); + + it("can filter events by returning nothing from transform", (done) => { + sdk.mock.queues["input"] = [ + { active: true, name: "A" }, + { active: false, name: "B" }, + { active: true, name: "C" }, + ]; + + sdk.enrich({ + id: "filter-bot", + inQueue: "input", + outQueue: "output", + transform(payload: any, event: any, cb: any) { + if (!payload.active) return cb(); + cb(null, payload); + }, + }, (err: any) => { + assert.isNotOk(err); + const output = sdk.mock.written["output"]; + assert.equal(output.payloads.length, 2); + assert.equal(output.payloads[0].name, "A"); + assert.equal(output.payloads[1].name, "C"); + done(); + }); + }); + + it("enrichEvents (promisified) works", async () => { + sdk.mock.queues["in"] = [{ x: 1 }, { x: 2 }]; + + await sdk.enrichEvents({ + id: "bot", + inQueue: "in", + outQueue: "out", + transform(payload: any, event: any, cb: any) { + cb(null, { ...payload, doubled: payload.x * 2 }); + }, + }); + + assert.equal(sdk.mock.written["out"].payloads.length, 2); + assert.equal(sdk.mock.written["out"].payloads[0].doubled, 2); + }); + }); + + describe("offload end-to-end", () => { + it("reads from mock queue and processes events", (done) => { + sdk.mock.queues["work-queue"] = [ + { task: "build" }, + { task: "deploy" }, + ]; + + const processed: any[] = []; + sdk.offload({ + id: "offload-bot", + inQueue: "work-queue", + transform(payload: any, event: any, cb: any) { + processed.push(payload); + cb(); + }, + }, (err: any) => { + assert.isNotOk(err); + assert.equal(processed.length, 2); + assert.equal(processed[0].task, "build"); + assert.equal(processed[1].task, "deploy"); + done(); + }); + }); + + it("offloadEvents (promisified) works", async () => { + sdk.mock.queues["oq"] = [{ v: 10 }, { v: 20 }]; + + const results: number[] = []; + await sdk.offloadEvents({ + id: "bot", + inQueue: "oq", + transform(payload: any, event: any, cb: any) { + results.push(payload.v); + cb(); + }, + }); + + assert.deepEqual(results, [10, 20]); + }); + }); + + describe("disableS3 option", () => { + it("is enabled by default — strips useS3 from enrich opts", (done) => { + sdk.mock.queues["in-q"] = [{ a: 1 }]; + + const enrichOpts: any = { + id: "bot", + inQueue: "in-q", + outQueue: "out-q", + useS3: true, + transform(payload: any, event: any, cb: any) { cb(null, payload); }, + }; + + sdk.enrich(enrichOpts, (err: any) => { + assert.notProperty(enrichOpts, "useS3"); + done(err); + }); + }); + + it("strips useS3 from offload opts", (done) => { + sdk.mock.queues["in-q"] = [{ a: 1 }]; + + const offloadOpts: any = { + id: "bot", + inQueue: "in-q", + useS3: true, + transform(payload: any, event: any, cb: any) { cb(); }, + }; + + sdk.offload(offloadOpts, (err: any) => { + assert.notProperty(offloadOpts, "useS3"); + done(err); + }); + }); + + it("can be explicitly disabled to preserve useS3", (done) => { + const baseSdk = createMinimalSdk(); + const sdkWithS3 = mockRStreamsSdk(baseSdk, { disableS3: false }); + sdkWithS3.mock.queues["in-q"] = [{ a: 1 }]; + + const enrichOpts: any = { + id: "bot", + inQueue: "in-q", + outQueue: "out-q", + useS3: true, + transform(payload: any, event: any, cb: any) { cb(null, payload); }, + }; + + sdkWithS3.enrich(enrichOpts, (err: any) => { + assert.isTrue(enrichOpts.useS3); + done(err); + }); + }); + }); + + describe("written proxy for unwritten queues", () => { + it("returns empty capture for never-written queue", () => { + const capture = sdk.mock.written["nonexistent-queue"]; + assert.isArray(capture.events); + assert.isArray(capture.payloads); + assert.equal(capture.events.length, 0); + assert.equal(capture.payloads.length, 0); + }); + + it("returns real capture for written queue", async () => { + await sdk.putEvent("bot", "real-queue", { x: 1 }); + const capture = sdk.mock.written["real-queue"]; + assert.equal(capture.payloads.length, 1); + }); + }); + + describe("load edge cases", () => { + it("silently skips events with no determinable queue", async () => { + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray([ + { rawData: "no queue info" }, + ]), + // No outQueue provided, event has no .event field + sdk.streams.load("my-bot"), + ); + + // Nothing should have been written + assert.isEmpty(Object.keys(sdk.mock.written)); + }); + }); +}); + +// ─── Integration test using real RStreamsSdk ─────────────────────────────── +// Validates the mock against the actual SDK class shape, not a hand-built fake. + +describe("lib/mock-sdk.ts (integration with real RStreamsSdk)", function () { + let sdk: MockRStreamsSdk; + + before(() => { + // Construct a real SDK with dummy config to skip AWS resource discovery + const RealSdk = require("../index"); + const realSdk = new RealSdk({ + Region: "us-east-1", + LeoStream: "mock-stream", + LeoCron: "mock-cron", + LeoEvent: "mock-event", + LeoS3: "mock-s3", + LeoKinesisStream: "mock-kinesis", + LeoFirehoseStream: "mock-firehose", + LeoSettings: "mock-settings", + }); + sdk = mockRStreamsSdk(realSdk); + }); + + after(() => { + sdk.mock.reset(); + }); + + it("mock wraps real SDK and has expected shape", () => { + assert.isObject(sdk.mock); + assert.isFunction(sdk.read); + assert.isFunction(sdk.write); + assert.isFunction(sdk.enrich); + assert.isFunction(sdk.offload); + assert.isFunction(sdk.enrichEvents); + assert.isFunction(sdk.offloadEvents); + assert.isFunction(sdk.load); + assert.isFunction(sdk.checkpoint); + assert.isFunction(sdk.put); + assert.isFunction(sdk.putEvent); + assert.isFunction(sdk.putEvents); + assert.isObject(sdk.bot); + assert.isObject(sdk.streams); + assert.isObject(sdk.configuration); + }); + + it("read returns mock data, not real DynamoDB", async () => { + sdk.mock.queues["integration-queue"] = [ + { hello: "world" }, + ]; + + const events: any[] = []; + await sdk.streams.pipeAsync( + sdk.read("test-bot", "integration-queue"), + sdk.streams.through((event: any, done: any) => { + events.push(event); + done(); + }), + sdk.streams.devnull() + ); + + assert.equal(events.length, 1); + assert.deepEqual(events[0].payload, { hello: "world" }); + }); + + it("enrich pipeline works end-to-end against real SDK", (done) => { + sdk.mock.queues["real-in"] = [ + { val: 1 }, + { val: 2 }, + ]; + + sdk.enrich({ + id: "integration-bot", + inQueue: "real-in", + outQueue: "real-out", + transform(payload: any, event: any, cb: any) { + cb(null, { ...payload, doubled: payload.val * 2 }); + }, + }, (err: any) => { + assert.isNotOk(err); + assert.equal(sdk.mock.written["real-out"].payloads.length, 2); + assert.equal(sdk.mock.written["real-out"].payloads[0].doubled, 2); + assert.equal(sdk.mock.written["real-out"].payloads[1].doubled, 4); + done(); + }); + }); + + it("real streaming utilities are preserved", async () => { + const results: string[] = []; + await sdk.streams.pipeAsync( + sdk.streams.eventstream.readArray(["a", "b"]), + sdk.streams.through((s: string, done: any) => { + done(null, s.toUpperCase()); + }), + sdk.streams.through((s: string, done: any) => { + results.push(s); + done(); + }), + sdk.streams.devnull() + ); + assert.deepEqual(results, ["A", "B"]); + }); +});