diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..167aca9 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,20 @@ +version: 2 +updates: + - package-ecosystem: 'github-actions' + directory: '/' + schedule: + interval: 'daily' + commit-message: + prefix: 'chore' + include: 'scope' + cooldown: + default-days: 5 + - package-ecosystem: 'npm' + directory: '/' + schedule: + interval: 'daily' + commit-message: + prefix: 'chore' + include: 'scope' + cooldown: + default-days: 5 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml deleted file mode 100644 index 27a0ef6..0000000 --- a/.github/workflows/ci.yml +++ /dev/null @@ -1,19 +0,0 @@ -on: [push, pull_request] -name: Build & Test -jobs: - test: - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - node-version: [8.x, 10.x, 12.x, 14.x] - steps: - - uses: actions/checkout@v2 - - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v1 - with: - node-version: ${{ matrix.node-version }} - - name: Build - run: npm install - - name: Test - run: npm test diff --git a/.github/workflows/test-and-release.yml b/.github/workflows/test-and-release.yml new file mode 100644 index 0000000..7f1eef3 --- /dev/null +++ b/.github/workflows/test-and-release.yml @@ -0,0 +1,72 @@ +name: Test & Maybe Release +on: [push, pull_request] + +jobs: + test: + strategy: + fail-fast: false + matrix: + node: [lts/*, current] + os: [macos-latest, ubuntu-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - name: Checkout Repository + uses: actions/checkout@v6.0.2 + - name: Use Node.js ${{ matrix.node }} + uses: actions/setup-node@v6.4.0 + with: + node-version: ${{ matrix.node }} + - name: Install Dependencies + run: npm install --no-progress + - name: Check build is up to date + run: | + npm run build + git diff --exit-code || (echo "::error::Build artifacts not committed. Run 'npm run build' and commit the changes." && exit 1) + - name: Lint + run: npm run lint + - name: Run node tests + run: npm run test:node + + browser-test: + runs-on: ubuntu-latest + steps: + - name: Checkout Repository + uses: actions/checkout@v6.0.2 + - name: Use Node.js lts/* + uses: actions/setup-node@v6.4.0 + with: + node-version: lts/* + - name: Install Dependencies + run: npm install --no-progress + - name: Run browser tests + run: npm run test:browser + + release: + name: Release + needs: [test, browser-test] + runs-on: ubuntu-latest + if: github.event_name == 'push' && github.ref == 'refs/heads/master' + permissions: + contents: write + issues: write + pull-requests: write + id-token: write + steps: + - name: Checkout + uses: actions/checkout@v6.0.2 + with: + fetch-depth: 0 + - name: Setup Node.js + uses: actions/setup-node@v6.4.0 + with: + node-version: lts/* + registry-url: 'https://registry.npmjs.org' + - name: Install dependencies + run: npm install --no-progress --no-package-lock --no-save + - name: Build + run: npm run build + - name: Release + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + NPM_CONFIG_PROVENANCE: true + run: npx semantic-release diff --git a/.gitignore b/.gitignore index b3f240e..97a2b2b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ node_modules +package-lock.json .nyc_output -coverage/ +types/tsconfig.tsbuildinfo +.codex +.claude diff --git a/.npmignore b/.npmignore deleted file mode 100644 index ba23edb..0000000 --- a/.npmignore +++ /dev/null @@ -1,3 +0,0 @@ -test -coverage/ -.github/ diff --git a/README.md b/README.md index 1e07799..9c752a8 100644 --- a/README.md +++ b/README.md @@ -1,140 +1,371 @@ # through2 -![Build & Test](https://github.com/rvagg/through2/workflows/Build%20&%20Test/badge.svg) +[![NPM](https://nodei.co/npm/through2.svg?style=flat&data=n,v&color=blue)](https://nodei.co/npm/through2/) -[![NPM](https://nodei.co/npm/through2.png?downloads&downloadRank)](https://nodei.co/npm/through2/) +**Tiny utilities for inserting transformation logic into Node.js streams and Web Streams pipelines.** -**A tiny wrapper around Node.js streams.Transform (Streams2/3) to avoid explicit subclassing noise** +```bash +npm install through2 +``` + +```js +import { transform } from 'through2' -Inspired by [Dominic Tarr](https://github.com/dominictarr)'s [through](https://github.com/dominictarr/through) in that it's so much easier to make a stream out of a function than it is to set up the prototype chain properly: `through(function (chunk) { ... })`. +readableStream + .pipe(transform(async (chunk) => chunk.toString().toUpperCase())) + .pipe(writableStream) +``` + +## Contents + +* [Contents](#contents) +* [Why](#why) +* [Migrating from v4](#migrating-from-v4) +* [API](#api) + * [Transform function styles](#transform-function-styles) +* [Recipes](#recipes) + * [Map](#map) + * [Filter](#filter) + * [FlatMap](#flatmap) + * [Batch](#batch) + * [Tap](#tap) + * [Parse newline-delimited input](#parse-newline-delimited-input) +* [Node-style streams (`from 'through2'`)](#node-style-streams-from-through2) + * [`transform()`](#transform) + * [`objectTransform()`](#objecttransform) + * [`transformer()`](#transformer) + * [Composing pipelines](#composing-pipelines) + * [Default export (legacy)](#default-export-legacy) +* [Web Streams (`from 'through2/web'`)](#web-streams-from-through2web) + * [`transform()` (web)](#transform-web) + * [Implementation notes](#implementation-notes) +* [License](#license) + +## Why + +Writing a `Transform` stream usually means subclassing, wiring up `_transform`, choosing `objectMode`, and minding backpressure. **through2** wraps a function instead: ```js -fs.createReadStream('ex.txt') - .pipe(through2(function (chunk, enc, callback) { - for (let i = 0; i < chunk.length; i++) - if (chunk[i] == 97) - chunk[i] = 122 // swap 'a' for 'z' +// Without through2 +import { Transform } from 'node:stream' - this.push(chunk) +class Upper extends Transform { + _transform (chunk, _enc, cb) { + this.push(chunk.toString().toUpperCase()) + cb() + } +} +input.pipe(new Upper()).pipe(output) - callback() - })) - .pipe(fs.createWriteStream('out.txt')) - .on('finish', () => doSomethingSpecial()) +// With through2 +import { transform } from 'through2' + +input.pipe(transform(async (chunk) => chunk.toString().toUpperCase())).pipe(output) ``` -Or object streams: +Same idea for the modern, cross-runtime Web Streams API: ```js -const all = [] +import { transform } from 'through2/web' -fs.createReadStream('data.csv') - .pipe(csv2()) - .pipe(through2.obj(function (chunk, enc, callback) { - const data = { - name : chunk[0] - , address : chunk[3] - , phone : chunk[10] - } - this.push(data) +await response.body + .pipeThrough(transform(async (chunk) => /* ... */)) + .pipeTo(destination) +``` - callback() - })) - .on('data', (data) => { - all.push(data) - }) - .on('end', () => { - doSomethingSpecial(all) - }) +## Migrating from v4 + +v5 is a major version bump. Headline changes: + +- **ESM only.** `require('through2')` no longer works. Convert callers to `import` or pin to `through2@4`. +- **Named exports added.** `transform`, `objectTransform`, `transformer` are the preferred surface in new code. +- **Default export still works.** `through2(fn)`, `through2.obj(fn)`, and `through2.ctor(fn)` produce equivalent stream instances. Once your callers are converted to ESM imports, the call-site syntax and runtime behaviour are unchanged (modulo the `instanceof` caveat below). +- **Async functions and async generators are now accepted** as transform functions, in addition to the classic `(chunk, enc, cb)` callback form. See [Transform function styles](#transform-function-styles). +- **`through2/web` subpath added** for Web Streams (`TransformStream`) pipelines. +- **`readable-stream@4`** (was `@3`) is the underlying dependency. +- **`transformer` / `.ctor` returns a factory function**, not a true constructor. The returned instance is still a `Transform`, but `instanceof YourFactory` no longer holds. Use `instanceof Transform` instead. + +Mapping for legacy code: + +| v4 / legacy | v5 named export | +| --------------------- | ------------------------------ | +| `through2(fn)` | `transform(fn)` | +| `through2.obj(fn)` | `objectTransform(fn)` | +| `through2.ctor(fn)` | `transformer(fn)` | + +## API + +Two import paths, mirroring the two stream worlds: + +| Import | Stream API | Returns | Use when | +| ---------------------------- | ---------------------------- | ---------------------- | ------------------------------------------------ | +| `from 'through2'` | Node-style streams | `stream.Transform` | You're working with `Readable`/`Writable`/`Transform`-shaped streams (`.pipe(...)`) | +| `from 'through2/web'` | Web Streams (WHATWG) | `TransformStream` | You're working with `ReadableStream`/`WritableStream`-shaped streams (`.pipeThrough(...)`) | + +The two entries differ in the *stream API* they target, not the runtime they run on. Either entry can run in Node.js, browsers, Deno, Bun, or Cloudflare Workers. Pick the one that matches the streams you're piping with. + +- **`from 'through2'`** uses the Node-style streams API (`Readable`/`Writable`/`Transform`, `.pipe()`, callback-driven `_transform`). It depends on [`readable-stream`](https://github.com/nodejs/readable-stream); browser bundlers pick up that package's `browser` field automatically and ship its self-contained shim, so no Node-builtin polyfill is needed. +- **`from 'through2/web'`** uses the WHATWG Web Streams API (`TransformStream`, `.pipeThrough()`). Zero runtime dependencies; relies only on `TransformStream` being a global (it is in modern browsers, Node.js >= 18, Deno, Bun, Workers). + +### Transform function styles + +Every transform-creating export accepts the same three function styles, auto-dispatched by inspecting the function's kind: + +```js +// 1. Classic Node-style callback (use `this.push()` and the callback) +transform(function (chunk, encoding, callback) { + this.push(chunk) + callback() +}) + +// 2. Async function (resolved value is pushed; `undefined` skips) +transform(async (chunk) => chunk.toString().toUpperCase()) + +// 3. Async generator (1-to-many; full pipeline coroutine) +transform(async function * (source) { + for await (const chunk of source) { + yield chunk + yield chunk + } +}) ``` -Note that `through2.obj(fn)` is a convenience wrapper around `through2({ objectMode: true }, fn)`. +A flush function may be passed as the trailing argument; it follows the same dispatch rules. + +> **Note on async**: the async function form does **not** use `this.push`. To emit zero or many chunks per input, use the async generator form. To emit one chunk per input (or skip), return the value (or `undefined`). + +## Recipes -## Do you need this? +```js +import { objectTransform } from 'through2' +``` -Since Node.js introduced [Simplified Stream Construction](https://nodejs.org/api/stream.html#stream_simplified_construction), many uses of **through2** have become redundant. Consider whether you really need to use **through2** or just want to use the `'readable-stream'` package, or the core `'stream'` package (which is derived from `'readable-stream'`): +### Map + +One in, one out. Async function form; the resolved value is pushed. ```js -const { Transform } = require('readable-stream') +objectTransform(async (item) => doSomething(item)) +``` -const transformer = new Transform({ - transform(chunk, enc, callback) { - // ... +### Filter + +Return `undefined` to drop a chunk. + +```js +objectTransform(async (item) => predicate(item) ? item : undefined) +``` + +### FlatMap + +One in, many out. Async generator form. + +```js +objectTransform(async function * (source) { + for await (const item of source) { + for (const x of expand(item)) yield x } }) ``` -## API +### Batch + +Collect a fixed-size batch, emit at size or at flush. + +```js +objectTransform(async function * (source) { + let batch = [] + for await (const item of source) { + batch.push(item) + if (batch.length >= 100) { yield batch; batch = [] } + } + if (batch.length) yield batch +}) +``` + +### Tap -through2([ options, ] [ transformFunction ] [, flushFunction ]) +Side effect, pass through unchanged. -Consult the **[stream.Transform](https://nodejs.org/docs/latest/api/stream.html#stream_class_stream_transform)** documentation for the exact rules of the `transformFunction` (i.e. `this._transform`) and the optional `flushFunction` (i.e. `this._flush`). +```js +objectTransform(async (item) => { observe(item); return item }) +``` + +### Parse newline-delimited input + +Byte chunks in, line strings out. The async generator buffers across chunk boundaries. + +```js +objectTransform(async function * (source) { + let buf = '' + for await (const chunk of source) { + buf += chunk.toString() + const lines = buf.split('\n') + buf = lines.pop() + for (const line of lines) yield line + } + if (buf) yield buf +}) +``` -### options +For a runnable end-to-end demo combining NDJSON parsing, level filtering, formatting, and a tally, see [`example-ndjson.js`](./example-ndjson.js): -The options argument is optional and is passed straight through to `stream.Transform`. So you can use `objectMode:true` if you are processing non-binary streams (or just use `through2.obj()`). +```bash +cat app.log | node example-ndjson.js warn +``` -The `options` argument is first, unlike standard convention, because if I'm passing in an anonymous function then I'd prefer for the options argument to not get lost at the end of the call: +## Node-style streams (`from 'through2'`) ```js -fs.createReadStream('/tmp/important.dat') - .pipe(through2({ objectMode: true, allowHalfOpen: false }, - (chunk, enc, cb) => { - cb(null, 'wut?') // note we can use the second argument on the callback - // to provide data as an alternative to this.push('wut?') +import { transform, objectTransform, transformer } from 'through2' +``` + +### `transform()` + +``` +transform([options], transformFn[, flushFn]) -> stream.Transform +``` + +Returns a `stream.Transform`. `options` is forwarded to the underlying `Transform` constructor. If `transformFn` is omitted, a passthrough is returned. + +```js +fs.createReadStream('in.txt') + .pipe(transform(function (chunk, _enc, cb) { + for (let i = 0; i < chunk.length; i++) { + if (chunk[i] === 97) chunk[i] = 122 // swap 'a' for 'z' } - )) - .pipe(fs.createWriteStream('/tmp/wut.txt')) + this.push(chunk) + cb() + })) + .pipe(fs.createWriteStream('out.txt')) ``` -### transformFunction +### `objectTransform()` -The `transformFunction` must have the following signature: `function (chunk, encoding, callback) {}`. A minimal implementation should call the `callback` function to indicate that the transformation is done, even if that transformation means discarding the chunk. +``` +objectTransform([options], transformFn[, flushFn]) -> stream.Transform +``` + +Like `transform`, with `objectMode: true` enabled by default. Most async/async-generator use cases want this. + +### `transformer()` + +``` +transformer([options], transformFn[, flushFn]) -> (overrideOptions?) -> stream.Transform +``` -To queue a new chunk, call `this.push(chunk)`—this can be called as many times as required before the `callback()` if you have multiple pieces to send on. +Returns a factory function. Calling it (with or without `new`) produces a fresh `Transform` instance with the configured behaviour. Per-call options merge on top of the configured defaults; the merged options are exposed as `this.options` inside the transform function. -Alternatively, you may use `callback(err, chunk)` as shorthand for emitting a single chunk or an error. +```js +const Counter = transformer({ objectMode: true }, function (chunk, _enc, cb) { + this.count = (this.count || 0) + 1 + this.push(chunk) + cb() +}) -If you **do not provide a `transformFunction`** then you will get a simple pass-through stream. +const a = Counter() +const b = new Counter({ highWaterMark: 32 }) // override per-call +``` -### flushFunction +### Composing pipelines -The optional `flushFunction` is provided as the last argument (2nd or 3rd, depending on whether you've supplied options) is called just prior to the stream ending. Can be used to finish up any processing that may be in progress. +For anything beyond a quick demo, prefer `node:stream/promises`'s `pipeline()` over chained `.pipe()`. It propagates errors, awaits completion, and destroys all streams on failure (chained `.pipe()` silently leaves streams hanging on error). ```js -fs.createReadStream('/tmp/important.dat') - .pipe(through2( - (chunk, enc, cb) => cb(null, chunk), // transform is a noop - function (cb) { // flush function - this.push('tacking on an extra buffer to the end'); - cb(); +import { pipeline } from 'node:stream/promises' +import { createReadStream, createWriteStream } from 'node:fs' +import { objectTransform } from 'through2' + +await pipeline( + createReadStream('in.ndjson'), + objectTransform(async function * (source) { + let buf = '' + for await (const chunk of source) { + buf += chunk.toString() + const lines = buf.split('\n') + buf = lines.pop() + for (const line of lines) yield JSON.parse(line) } - )) - .pipe(fs.createWriteStream('/tmp/wut.txt')); + }), + objectTransform(async (record) => record.active ? record : undefined), + objectTransform(async (record) => JSON.stringify(record) + '\n'), + createWriteStream('out.ndjson') +) ``` -through2.ctor([ options, ] transformFunction[, flushFunction ]) +`pipeline()` accepts any mix of through2-built transforms and other `Readable`/`Writable`/`Transform` instances. Use it whenever the pipeline can fail or you need to know when it's done. + +A runnable, more elaborate version of this NDJSON pipeline lives in [`example-ndjson.js`](./example-ndjson.js) (parses arbitrary structured logs from stdin, filters by level, pretty-prints to stdout, summarises on stderr). + +### Default export (legacy) + +The default export is `transform` with `.obj` and `.ctor` attached for back-compatibility: -Instead of returning a `stream.Transform` instance, `through2.ctor()` returns a **constructor** for a custom Transform. This is useful when you want to use the same transform logic in multiple instances. +```js +import through2 from 'through2' + +through2(fn) // === transform(fn) +through2.obj(fn) // === objectTransform(fn) +through2.ctor(fn) // === transformer(fn) +``` + +## Web Streams (`from 'through2/web'`) ```js -const FToC = through2.ctor({objectMode: true}, function (record, encoding, callback) { - if (record.temp != null && record.unit == "F") { - record.temp = ( ( record.temp - 32 ) * 5 ) / 9 - record.unit = "C" +import { transform } from 'through2/web' +``` + +### `transform()` (web) + +``` +transform([transformFn][, flushFn]) -> TransformStream +``` + +Returns a `TransformStream`. The classic-style function takes `(chunk, controller)` and uses `controller.enqueue()`. Async and async-generator forms work the same as in the Node-style streams entry. + +```js +// 1-to-many fan-out (controller form, stateless) +const tagged = transform((chunk, controller) => { + controller.enqueue({ kind: 'raw', value: chunk }) + controller.enqueue({ kind: 'upper', value: chunk.toString().toUpperCase() }) +}) + +// Splitter: cross-chunk state is needed (a line can span chunks). The async +// generator form lets the buffer be a local variable; the classic controller +// form would need closure state plus a flush handler. +const splitter = transform(async function * (source) { + let buf = '' + for await (const chunk of source) { + buf += chunk.toString() + const lines = buf.split('\n') + buf = lines.pop() + for (const line of lines) yield line } - this.push(record) - callback() + if (buf) yield buf }) -// Create instances of FToC like so: -const converter = new FToC() -// Or: -const converter = FToC() -// Or specify/override options when you instantiate, if you prefer: -const converter = FToC({objectMode: true}) +// Async (1-to-1) with flush +const withTrailer = transform( + async (chunk) => chunk.toString().toUpperCase(), + (controller) => controller.enqueue('END') +) + +// Pipe a fetch response through a transform +await response.body + .pipeThrough(transform(async (chunk) => chunk)) + .pipeTo(destinationWritableStream) ``` +### Implementation notes + +At the time of writing, Chromium hasn't shipped the cleanup hook that `TransformStream` would use to tear down an in-flight async generator on cancel. Without a workaround, calling `reader.cancel()` on a browser-side pipeline would leave your `async function *` suspended and its `finally` block would never run. The web entry handles this for you: + +- Backpressure works in both directions through `pipeThrough`. +- Cancelling the consumer or aborting the producer cleans up your generator (its `finally` runs) and errors the other side. +- Same behaviour in browsers, Node.js, Deno, Bun, and Cloudflare Workers. + +One thing to note: `transform(asyncGenFn)` returns a `{ readable, writable }` pair rather than a `TransformStream` instance. `pipeThrough` and `pipeTo` accept it identically. + ## License -**through2** is Copyright © Rod Vagg and additional contributors and licensed under the MIT license. All rights not explicitly granted in the MIT license are reserved. See the included LICENSE file for more details. +**through2** is Copyright (c) Rod Vagg and additional contributors and licensed under the MIT license. See the included LICENSE file for more details. diff --git a/example-ndjson.js b/example-ndjson.js new file mode 100644 index 0000000..2e7762b --- /dev/null +++ b/example-ndjson.js @@ -0,0 +1,94 @@ +// example-ndjson.js +// +// Reads NDJSON log entries from stdin, filters by minimum level, pretty-prints +// matches to stdout, writes a summary to stderr. A small demonstration of +// composing through2 transforms with `node:stream/promises` `pipeline()`. +// +// Usage: +// cat app.log | node example-ndjson.js [minLevel] +// +// Recognises common shapes for level / timestamp / logger / message fields, +// so it works with structured logs from zap (Go), pino (Node.js), bunyan, +// winston, and similar. + +import { pipeline } from 'node:stream/promises' +import { objectTransform } from './through2.js' + +const minLevel = (process.argv[2] || 'warn').toLowerCase() + +// Map common level names + pino numeric levels to a single ordinal scale. +const levelNames = { + trace: 0, + debug: 1, + info: 2, + notice: 2, + warn: 3, + warning: 3, + error: 4, + err: 4, + fatal: 5, + crit: 5, + critical: 5, + panic: 5, + dpanic: 5 +} +const pinoNumeric = (n) => + n >= 60 ? 5 : n >= 50 ? 4 : n >= 40 ? 3 : n >= 30 ? 2 : n >= 20 ? 1 : 0 +const lvlIdx = (l) => + typeof l === 'number' ? pinoNumeric(l) : levelNames[String(l).toLowerCase()] ?? 0 + +const minIdx = lvlIdx(minLevel) + +let scanned = 0 +let matched = 0 +const byLogger = new Map() + +await pipeline( + process.stdin, + + // 1. Parse NDJSON: byte chunks in, objects out. The async generator handles + // line splitting across chunk boundaries. + objectTransform(async function * (source) { + let buf = '' + for await (const chunk of source) { + buf += chunk.toString() + const lines = buf.split('\n') + buf = lines.pop() + for (const line of lines) { + scanned++ + if (!line) continue + try { yield JSON.parse(line) } catch { /* skip non-JSON */ } + } + } + if (buf.trim()) try { yield JSON.parse(buf) } catch { /* skip non-JSON */ } + }), + + // 2. Filter by minimum level. Returning undefined from an async transform + // drops the chunk. + objectTransform(async (entry) => + lvlIdx(entry.level) >= minIdx ? entry : undefined + ), + + // 3. Format each match as a one-line text record; tally as we go. + objectTransform(async (entry) => { + matched++ + const logger = entry.logger || entry.name || entry.component || '-' + byLogger.set(logger, (byLogger.get(logger) || 0) + 1) + const ts = String(entry.ts || entry.time || '').slice(0, 19).padEnd(19) + const lvl = String(entry.level).padEnd(5).slice(0, 5) + const msg = entry.msg || entry.message || '' + return `${ts} ${lvl} ${logger.padEnd(20).slice(0, 20)} ${msg}\n` + }), + + process.stdout +) + +const fmt = (n) => n.toLocaleString() +process.stderr.write('\n--\n') +process.stderr.write(`${fmt(scanned)} lines scanned, ${fmt(matched)} matched (level >= ${minLevel})\n`) +if (byLogger.size > 0) { + process.stderr.write('top loggers:\n') + for (const [k, v] of [...byLogger.entries()].sort((a, b) => b[1] - a[1]).slice(0, 5)) { + process.stderr.write(` ${fmt(v).padStart(8)} ${k}\n`) + } +} diff --git a/package.json b/package.json index a5ca532..49ae363 100644 --- a/package.json +++ b/package.json @@ -1,38 +1,140 @@ { "name": "through2", "version": "4.0.2", - "description": "A tiny wrapper around Node.js streams.Transform (Streams2/3) to avoid explicit subclassing noise", - "main": "through2.js", + "description": "Tiny utilities for inserting transformation logic into Node.js stream and Web Streams pipelines", + "license": "MIT", + "type": "module", + "main": "./through2.js", + "exports": { + ".": { + "import": "./through2.js", + "types": "./types/through2.d.ts" + }, + "./web": { + "import": "./web.js", + "types": "./types/web.d.ts" + } + }, + "types": "./types/through2.d.ts", "scripts": { - "test:node": "hundreds mocha test/test.js", - "test:browser": "node -e 'process.exit(process.version.startsWith(\"v8.\") ? 0 : 1)' || polendina --cleanup --runner=mocha test/test.js", - "test": "npm run lint && npm run test:node && npm run test:browser", "lint": "standard", - "coverage": "c8 --reporter=text --reporter=html mocha test/test.js && npx st -d coverage -p 8888" + "build:types": "tsc --build", + "build": "npm run build:types", + "test:node": "polendina-node bare-async test/test.js test/web.js", + "test:browser": "polendina --cleanup --runner=bare-async test/test.js test/web.js", + "test:ci": "npm run lint && npm run test:node && npm run test:browser", + "test": "npm run lint && npm run test:node" }, "repository": { "type": "git", "url": "https://github.com/rvagg/through2.git" }, + "homepage": "https://github.com/rvagg/through2", "keywords": [ "stream", - "streams2", - "through", - "transform" + "transform", + "transformstream", + "web-streams", + "async-iterable", + "pipeline" ], "author": "Rod Vagg (https://github.com/rvagg)", - "license": "MIT", "dependencies": { - "readable-stream": "3" + "readable-stream": "^4.7.0" }, "devDependencies": { - "bl": "^4.0.2", - "buffer": "^5.6.0", - "chai": "^4.2.0", - "hundreds": "~0.0.7", - "mocha": "^7.2.0", - "polendina": "^1.0.0", - "standard": "^14.3.4", - "stream-spigot": "^3.0.6" + "@semantic-release/changelog": "^6.0.3", + "@semantic-release/commit-analyzer": "^13.0.1", + "@semantic-release/git": "^10.0.1", + "@semantic-release/github": "^12.0.6", + "@semantic-release/npm": "^13.1.5", + "@semantic-release/release-notes-generator": "^14.1.0", + "@types/node": "^25.6.0", + "conventional-changelog-conventionalcommits": "^9.3.1", + "polendina": "^3.2.21", + "semantic-release": "^25.0.3", + "standard": "^17.1.2", + "typescript": "^6.0.3" + }, + "release": { + "branches": [ + "master" + ], + "plugins": [ + [ + "@semantic-release/commit-analyzer", + { + "preset": "conventionalcommits", + "releaseRules": [ + { + "breaking": true, + "release": "major" + }, + { + "revert": true, + "release": "patch" + }, + { + "type": "feat", + "release": "minor" + }, + { + "type": "fix", + "release": "patch" + }, + { + "type": "chore", + "release": "patch" + }, + { + "type": "docs", + "release": "patch" + }, + { + "type": "test", + "release": "patch" + }, + { + "scope": "no-release", + "release": false + } + ] + } + ], + [ + "@semantic-release/release-notes-generator", + { + "preset": "conventionalcommits", + "presetConfig": { + "types": [ + { + "type": "feat", + "section": "Features" + }, + { + "type": "fix", + "section": "Bug Fixes" + }, + { + "type": "chore", + "section": "Trivial Changes" + }, + { + "type": "docs", + "section": "Trivial Changes" + }, + { + "type": "test", + "section": "Tests" + } + ] + } + } + ], + "@semantic-release/changelog", + "@semantic-release/npm", + "@semantic-release/github", + "@semantic-release/git" + ] } } diff --git a/test/bench.js b/test/bench.js deleted file mode 100644 index 5d3d100..0000000 --- a/test/bench.js +++ /dev/null @@ -1,36 +0,0 @@ -const through2 = require('../') -const { Buffer } = require('buffer') -const bl = require('bl') -const crypto = require('crypto') -const assert = require('assert') - -function run (callback) { - const bufs = Array.apply(null, Array(10)).map(() => crypto.randomBytes(32)) - const stream = through2((chunk, env, callback) => { - callback(null, chunk.toString('hex')) - }) - - stream.pipe(bl((err, data) => { - assert(!err) - assert.strictEqual(data.toString(), Buffer.concat(bufs).toString('hex')) - callback() - })) - - bufs.forEach((b) => stream.write(b)) - stream.end() -} - -let count = 0 -const start = Date.now() - -;(function exec () { - count++ - run(() => { - if (Date.now() - start < 1000 * 10) { - return setImmediate(exec) - } - console.log('Ran', count, 'iterations in', Date.now() - start, 'ms') - }) -}()) - -console.log('Running for ~10s') diff --git a/test/test.js b/test/test.js index 5d3b1b2..42979a8 100644 --- a/test/test.js +++ b/test/test.js @@ -1,428 +1,404 @@ -/* eslint-env mocha */ +import { Readable } from 'readable-stream' +import through2, { + transform, + objectTransform, + transformer +} from '../through2.js' + +const decoder = new TextDecoder('ascii') +const encoder = new TextEncoder() + +const eq = (actual, expected, msg) => { + if (actual !== expected) { + throw new Error(`${msg || 'eq'}: expected ${JSON.stringify(expected)}, got ${JSON.stringify(actual)}`) + } +} + +const deepEq = (actual, expected, msg) => { + if (JSON.stringify(actual) !== JSON.stringify(expected)) { + throw new Error(`${msg || 'deepEq'}: expected ${JSON.stringify(expected)}, got ${JSON.stringify(actual)}`) + } +} -const test = it -const { assert: t } = require('chai') -const through2 = require('../') -const { Buffer } = require('buffer') -const bl = require('bl') -const spigot = require('stream-spigot') +const ok = (val, msg) => { + if (!val) throw new Error(msg || 'expected truthy') +} function randomBytes (len) { const bytes = new Uint8Array(len) - for (let i = 0; i < len; i++) { - bytes[i] = Math.floor(Math.random() * 0xff) - } + globalThis.crypto.getRandomValues(bytes) return bytes } -test('plain through', (done) => { - const th2 = through2(function (chunk, enc, callback) { - if (!this._i) { - this._i = 97 - } else { // 'a' - this._i++ - } - const b = Buffer.alloc(chunk.length) - for (let i = 0; i < chunk.length; i++) { - b[i] = this._i - } - this.push(b) - callback() +function collect (stream) { + return new Promise((resolve, reject) => { + /** @type {Uint8Array[]} */ + const chunks = [] + stream.on('data', (chunk) => chunks.push(chunk)) + stream.on('end', () => { + let total = 0 + for (const c of chunks) total += c.length + const out = new Uint8Array(total) + let i = 0 + for (const c of chunks) { out.set(c, i); i += c.length } + resolve(out) + }) + stream.on('error', reject) }) +} - th2.pipe(bl((err, b) => { - t.ifError(err) - const s = b.toString('ascii') - t.equal('aaaaaaaaaabbbbbcccccccccc', s, 'got transformed string') - done() - })) - - th2.write(randomBytes(10)) - th2.write(randomBytes(5)) - th2.write(randomBytes(10)) - th2.end() -}) - -test('pipeable through', (done) => { - const th2 = through2(function (chunk, enc, callback) { - if (!this._i) { - this._i = 97 - } else { // 'a' - this._i++ - } - const b = Buffer.alloc(chunk.length) - for (let i = 0; i < chunk.length; i++) { - b[i] = this._i - } - this.push(b) - callback() +function collectObjects (stream) { + return new Promise((resolve, reject) => { + const objs = [] + stream.on('data', (o) => objs.push(o)) + stream.on('end', () => resolve(objs)) + stream.on('error', reject) }) +} - th2.pipe(bl((err, b) => { - t.ifError(err) - const s = b.toString('ascii') - // bl() acts like a proper streams2 stream and passes as much as it's - // asked for, so we really only get one write with such a small amount - // of data - t.equal(s, 'aaaaaaaaaaaaaaaaaaaaaaaaa', 'got transformed string') - done() - })) - - const bufs = bl() - bufs.append(randomBytes(10)) - bufs.append(randomBytes(5)) - bufs.append(randomBytes(10)) - bufs.pipe(th2) -}) - -test('object through', (done) => { - const th2 = through2({ objectMode: true }, function (chunk, enc, callback) { - this.push({ out: chunk.in + 1 }) - callback() +export async function classicByteTransform () { + const stream = transform(function (chunk, _enc, cb) { + if (!this._i) this._i = 97 + else this._i++ + const out = new Uint8Array(chunk.length) + out.fill(this._i) + this.push(out) + cb() }) + const done = collect(stream) + stream.write(randomBytes(10)) + stream.write(randomBytes(5)) + stream.write(randomBytes(10)) + stream.end() + eq(decoder.decode(await done), 'aaaaaaaaaabbbbbcccccccccc') +} + +export async function classicNoopTransform () { + const stream = transform() + const done = collect(stream) + stream.end(encoder.encode('eeee')) + eq(decoder.decode(await done), 'eeee') +} - let e = 0 - th2.on('data', (o) => { - t.deepEqual(o, { out: e === 0 ? 102 : e === 1 ? 203 : -99 }, 'got transformed object') - if (++e === 3) { - done() +export async function classicFlush () { + const stream = transform( + function (chunk, _enc, cb) { + if (!this._i) this._i = 97 + else this._i++ + const out = new Uint8Array(chunk.length) + out.fill(this._i) + this.push(out) + cb() + }, + function (cb) { + this.push(encoder.encode('end')) + cb() } - }) + ) + const done = collect(stream) + stream.write(randomBytes(10)) + stream.write(randomBytes(5)) + stream.write(randomBytes(10)) + stream.end() + eq(decoder.decode(await done), 'aaaaaaaaaabbbbbccccccccccend') +} - th2.write({ in: 101 }) - th2.write({ in: 202 }) - th2.write({ in: -100 }) - th2.end() -}) +export async function classicCallbackShorthand () { + const stream = transform((chunk, _enc, cb) => cb(null, chunk)) + const done = collect(stream) + stream.end(encoder.encode('hello')) + eq(decoder.decode(await done), 'hello') +} -test('object through with through2.obj', (done) => { - const th2 = through2.obj(function (chunk, enc, callback) { - this.push({ out: chunk.in + 1 }) - callback() - }) +export async function asyncReturnedValuePushed () { + const stream = transform(async (_chunk, _enc) => encoder.encode('X')) + const done = collect(stream) + stream.write(new Uint8Array([1])) + stream.write(new Uint8Array([2])) + stream.end() + eq(decoder.decode(await done), 'XX') +} - let e = 0 - th2.on('data', (o) => { - t.deepEqual(o, { out: e === 0 ? 102 : e === 1 ? 203 : -99 }, 'got transformed object') - if (++e === 3) { - done() - } - }) +export async function asyncUndefinedSkipsEmission () { + const stream = objectTransform(async (chunk) => chunk % 2 === 0 ? chunk : undefined) + const done = collectObjects(stream) + for (let i = 0; i < 6; i++) stream.write(i) + stream.end() + deepEq(await done, [0, 2, 4]) +} - th2.write({ in: 101 }) - th2.write({ in: 202 }) - th2.write({ in: -100 }) - th2.end() -}) - -test('flushing through', (done) => { - const th2 = through2(function (chunk, enc, callback) { - if (!this._i) { - this._i = 97 - } else { // 'a' - this._i++ - } - const b = Buffer.alloc(chunk.length) - for (let i = 0; i < chunk.length; i++) { - b[i] = this._i - } - this.push(b) - callback() - }, function (callback) { - this.push(Buffer.from([101, 110, 100])) - callback() +export async function asyncRejectedPropagatesError () { + const stream = transform(async () => { throw new Error('boom') }) + stream.write(new Uint8Array([1])) + await new Promise((resolve, reject) => { + stream.on('error', (err) => { + try { eq(err.message, 'boom'); resolve() } catch (e) { reject(e) } + }) }) +} - th2.pipe(bl((err, b) => { - t.ifError(err) - const s = b.toString('ascii') - t.equal(s, 'aaaaaaaaaabbbbbccccccccccend', 'got transformed string') - done() - })) - - th2.write(randomBytes(10)) - th2.write(randomBytes(5)) - th2.write(randomBytes(10)) - th2.end() -}) - -test('plain through ctor', (done) => { - const Th2 = through2.ctor(function (chunk, enc, callback) { - if (!this._i) { - this._i = 97 // 'a' - } else { - this._i++ - } - const b = Buffer.alloc(chunk.length) - for (let i = 0; i < chunk.length; i++) { - b[i] = this._i - } - this.push(b) - callback() +export async function asyncgenOneToOne () { + const stream = objectTransform(async function * (source) { + for await (const chunk of source) yield chunk * 2 }) + const done = collectObjects(stream) + for (let i = 1; i <= 4; i++) stream.write(i) + stream.end() + deepEq(await done, [2, 4, 6, 8]) +} - const th2 = new Th2() - - th2.pipe(bl((err, b) => { - t.ifError(err) - const s = b.toString('ascii') - t.equal('aaaaaaaaaabbbbbcccccccccc', s, 'got transformed string') - done() - })) - - th2.write(randomBytes(10)) - th2.write(randomBytes(5)) - th2.write(randomBytes(10)) - th2.end() -}) - -test('reuse through ctor', (done) => { - let uses = 0 - const Th2 = through2.ctor(function (chunk, enc, callback) { - if (!this._i) { - uses++ - this._i = 97 // 'a' - } else { - this._i++ +export async function asyncgenFanOut () { + const stream = objectTransform(async function * (source) { + for await (const chunk of source) { + yield chunk + yield chunk } - const b = Buffer.alloc(chunk.length) - for (let i = 0; i < chunk.length; i++) { - b[i] = this._i - } - this.push(b) - callback() }) + const done = collectObjects(stream) + stream.write('a') + stream.write('b') + stream.end() + deepEq(await done, ['a', 'a', 'b', 'b']) +} - const th2 = Th2() - - th2.pipe(bl((err, b) => { - t.ifError(err) - const s = b.toString('ascii') - t.equal('aaaaaaaaaabbbbbcccccccccc', s, 'got transformed string') - - const newInstance = Th2() - newInstance.pipe(bl((err, b) => { - t.ifError(err) - const s = b.toString('ascii') - t.equal('aaaaaaabbbbccccccc', s, 'got transformed string') - t.equal(uses, 2) - done() - })) - - newInstance.write(randomBytes(7)) - newInstance.write(randomBytes(4)) - newInstance.write(randomBytes(7)) - newInstance.end() - })) - - th2.write(randomBytes(10)) - th2.write(randomBytes(5)) - th2.write(randomBytes(10)) - th2.end() -}) - -test('object through ctor', (done) => { - const Th2 = through2.ctor({ objectMode: true }, function (chunk, enc, callback) { - this.push({ out: chunk.in + 1 }) - callback() +export async function asyncgenFilter () { + const stream = objectTransform(async function * (source) { + for await (const chunk of source) { + if (chunk % 2 === 0) yield chunk + } }) + const done = collectObjects(stream) + for (let i = 0; i < 6; i++) stream.write(i) + stream.end() + deepEq(await done, [0, 2, 4]) +} - const th2 = new Th2() +export async function asyncgenWithClassicFlush () { + const stream = objectTransform( + async function * (source) { + for await (const chunk of source) yield chunk + }, + function (cb) { this.push('END'); cb() } + ) + const done = collectObjects(stream) + stream.write('a') + stream.write('b') + stream.end() + deepEq(await done, ['a', 'b', 'END']) +} - let e = 0 - th2.on('data', (o) => { - t.deepEqual(o, { out: e === 0 ? 102 : e === 1 ? 203 : -99 }, 'got transformed object') - if (++e === 3) { - done() - } +export async function asyncgenThrownPropagates () { + const stream = objectTransform(async function * () { + throw new Error('gen-boom') }) - - th2.write({ in: 101 }) - th2.write({ in: 202 }) - th2.write({ in: -100 }) - th2.end() -}) - -test('pipeable object through ctor', (done) => { - const Th2 = through2.ctor({ objectMode: true }, function (record, enc, callback) { - if (record.temp != null && record.unit === 'F') { - record.temp = ((record.temp - 32) * 5) / 9 - record.unit = 'C' - } - this.push(record) - callback() + stream.write('x') + stream.end() + await new Promise((resolve, reject) => { + stream.on('error', (err) => { + try { eq(err.message, 'gen-boom'); resolve() } catch (e) { reject(e) } + }) }) +} - const th2 = Th2() - - const expect = [-19, -40, 100, 22] - th2.on('data', (o) => { - t.deepEqual(o, { temp: expect.shift(), unit: 'C' }, 'got transformed object') - if (!expect.length) { - done() - } +export async function objectTransformPassthrough () { + const stream = objectTransform(function (chunk, _enc, cb) { + this.push({ out: chunk.in + 1 }) + cb() }) + const done = collectObjects(stream) + stream.write({ in: 1 }) + stream.write({ in: 2 }) + stream.write({ in: 3 }) + stream.end() + deepEq(await done, [{ out: 2 }, { out: 3 }, { out: 4 }]) +} - spigot({ objectMode: true }, [ +export async function objectTransformFromReadable () { + const source = Readable.from([ { temp: -2.2, unit: 'F' }, { temp: -40, unit: 'F' }, { temp: 212, unit: 'F' }, { temp: 22, unit: 'C' } - ]).pipe(th2) -}) - -test('object through ctor override', (done) => { - const Th2 = through2.ctor(function (chunk, enc, callback) { - this.push({ out: chunk.in + 1 }) - callback() + ]) + const conv = objectTransform(async (record) => { + if (record.unit === 'F') { + record = { temp: ((record.temp - 32) * 5) / 9, unit: 'C' } + } + return record }) + source.pipe(conv) + const out = await collectObjects(conv) + deepEq(out.map((r) => Math.round(r.temp)), [-19, -40, 100, 22]) + ok(out.every((r) => r.unit === 'C')) +} - const th2 = Th2({ objectMode: true }) - - let e = 0 - th2.on('data', (o) => { - t.deepEqual(o, { out: e === 0 ? 102 : e === 1 ? 203 : -99 }, 'got transformed object') - if (++e === 3) { - done() - } +export async function transformerReusableFactory () { + const Th = transformer(function (chunk, _enc, cb) { + if (!this._i) this._i = 97 + else this._i++ + const out = new Uint8Array(chunk.length) + out.fill(this._i) + this.push(out) + cb() }) - th2.write({ in: 101 }) - th2.write({ in: 202 }) - th2.write({ in: -100 }) - th2.end() -}) + const a = new Th() + const b = Th() -test('object settings available in transform', (done) => { - const Th2 = through2.ctor({ objectMode: true, peek: true }, function (chunk, enc, callback) { - t.ok(this.options.peek, 'reading options from inside _transform') - this.push({ out: chunk.in + 1 }) - callback() - }) + const da = collect(a); const db = collect(b) + a.write(randomBytes(3)); a.write(randomBytes(2)); a.end() + b.write(randomBytes(2)); b.write(randomBytes(3)); b.end() - const th2 = Th2() + eq(decoder.decode(await da), 'aaabb') + eq(decoder.decode(await db), 'aabbb') +} - let e = 0 - th2.on('data', (o) => { - t.deepEqual(o, { out: e === 0 ? 102 : e === 1 ? 203 : -99 }, 'got transformed object') - if (++e === 3) { - done() - } +export async function transformerOptionsMerge () { + const Th = transformer({ objectMode: true, peek: true }, function (chunk, _enc, cb) { + ok(this.options.peek, 'options visible inside transform') + this.push({ out: chunk.in + 1 }) + cb() }) + const inst = Th() + const done = collectObjects(inst) + inst.write({ in: 1 }) + inst.write({ in: 2 }) + inst.end() + deepEq(await done, [{ out: 2 }, { out: 3 }]) +} - th2.write({ in: 101 }) - th2.write({ in: 202 }) - th2.write({ in: -100 }) - th2.end() -}) - -test('object settings available in transform override', (done) => { - const Th2 = through2.ctor(function (chunk, enc, callback) { - t.ok(this.options.peek, 'reading options from inside _transform') +export async function transformerOverrideOptions () { + const Th = transformer(function (chunk, _enc, cb) { this.push({ out: chunk.in + 1 }) - callback() + cb() }) + const inst = Th({ objectMode: true }) + const done = collectObjects(inst) + inst.write({ in: 10 }) + inst.end() + deepEq(await done, [{ out: 11 }]) +} - const th2 = Th2({ objectMode: true, peek: true }) +export async function transformerAsyncSupported () { + const Th = transformer({ objectMode: true }, async (chunk) => chunk * 10) + const inst = Th() + const done = collectObjects(inst) + inst.write(1); inst.write(2); inst.end() + deepEq(await done, [10, 20]) +} - let e = 0 - th2.on('data', (o) => { - t.deepEqual(o, { out: e === 0 ? 102 : e === 1 ? 203 : -99 }, 'got transformed object') - if (++e === 3) { - done() - } - }) +export async function legacyDefault () { + const stream = through2((chunk, _enc, cb) => cb(null, chunk)) + const done = collect(stream) + stream.end(encoder.encode('legacy')) + eq(decoder.decode(await done), 'legacy') +} - th2.write({ in: 101 }) - th2.write({ in: 202 }) - th2.write({ in: -100 }) - th2.end() -}) +export async function legacyDefaultObj () { + const stream = through2.obj(function (chunk, _enc, cb) { + this.push({ out: chunk.in + 1 }) + cb() + }) + const done = collectObjects(stream) + stream.write({ in: 1 }) + stream.end() + deepEq(await done, [{ out: 2 }]) +} -test('object override extends options', (done) => { - const Th2 = through2.ctor({ objectMode: true }, function (chunk, enc, callback) { - t.ok(this.options.peek, 'reading options from inside _transform') +export async function legacyDefaultCtor () { + const Th = through2.ctor({ objectMode: true }, function (chunk, _enc, cb) { this.push({ out: chunk.in + 1 }) - callback() + cb() }) + const inst = new Th() + const done = collectObjects(inst) + inst.write({ in: 5 }) + inst.end() + deepEq(await done, [{ out: 6 }]) +} - const th2 = Th2({ peek: true }) +export async function transformerAsyncgenSupported () { + // Each call gets its own generator pipeline. + const Doubled = transformer({ objectMode: true }, async function * (source) { + for await (const x of source) yield x * 2 + }) + const a = Doubled() + const b = Doubled() + const da = collectObjects(a); const db = collectObjects(b) + a.write(1); a.write(2); a.end() + b.write(10); b.write(20); b.end() + deepEq(await da, [2, 4]) + deepEq(await db, [20, 40]) +} - let e = 0 - th2.on('data', (o) => { - t.deepEqual(o, { out: e === 0 ? 102 : e === 1 ? 203 : -99 }, 'got transformed object') - if (++e === 3) { - done() - } +// Regression: with low readable HWM, async-gen fan-out used to deadlock +// awaiting `drain` (a writable event that never fires here). +export async function asyncgenDownstreamBackpressure () { + const t = objectTransform({ highWaterMark: 1 }, async function * (source) { + // eslint-disable-next-line no-unused-vars + for await (const _ of source) for (let i = 0; i < 20; i++) yield i }) + t.write('start'); t.end() + await new Promise((resolve) => setTimeout(resolve, 50)) // delay consumption + const out = await collectObjects(t) + deepEq(out, Array.from({ length: 20 }, (_, i) => i)) +} - th2.write({ in: 101 }) - th2.write({ in: 202 }) - th2.write({ in: -100 }) - th2.end() -}) - -test('ctor flush()', (done) => { - let chunkCalled = false - const th2 = through2.ctor( - (chunk, enc, callback) => { - t.equal(chunk.toString(), 'aa') - chunkCalled = true - callback() - }, function fl () { - t(chunkCalled) - done() +// Slow generator + many fast writes must propagate backpressure upstream +// (drain awaits) instead of growing an internal queue without bound. +export async function asyncgenWritableBackpressure () { + const t = objectTransform({ highWaterMark: 4 }, async function * (source) { + for await (const x of source) { + await new Promise((resolve) => setTimeout(resolve, 5)) + yield x } - )() - - th2.end('aa') -}) - -test('obj flush()', (done) => { - let chunkCalled = false - const th2 = through2.obj( - (chunk, enc, callback) => { - t.deepEqual(chunk, { a: 'a' }) - chunkCalled = true - callback() - }, function fl () { - t(chunkCalled) - done() + }) + let drainsAwaited = 0 + const writePromise = (async () => { + for (let i = 0; i < 20; i++) { + if (!t.write(i)) { + drainsAwaited++ + await new Promise((resolve) => t.once('drain', resolve)) + } } - ) - - th2.end({ a: 'a' }) -}) - -test('can be destroyed', (done) => { - const th = through2() + t.end() + })() + const out = await collectObjects(t) + await writePromise + deepEq(out, Array.from({ length: 20 }, (_, i) => i)) + ok(drainsAwaited > 0, 'expected drain awaits, got 0') +} - th.on('close', () => { - t.ok(true, 'shoud emit close') - done() +// Mid-stream destroy must release pending awaits so the consumer loop exits. +export async function asyncgenDestroyMidStream () { + const t = objectTransform(async function * (source) { + for await (const x of source) { yield x; yield x } }) - - th.destroy() -}) - -test('can be destroyed twice', (done) => { - const th = through2() - - th.on('close', () => { - t.ok(true, 'shoud emit close') - done() + const seen = [] + t.on('data', (d) => { + seen.push(d) + if (seen.length === 2) t.destroy() }) + t.write(1); t.write(2); t.write(3) + await new Promise((resolve) => t.once('close', resolve)) + ok(seen.length >= 2) +} - th.destroy() - th.destroy() -}) - -test('noop through', (done) => { - const th = through2() - th.pipe(bl((err, data) => { - t.ifError(err) - t.equal(data.toString(), 'eeee') - done() - })) - th.end('eeee') -}) +// Destroy must finalize the user's async generator: its `finally` block +// must run so resources (file handles, subscriptions, etc.) can be released. +export async function asyncgenDestroyRunsFinally () { + let finalized = false + const t = objectTransform(async function * (source) { + try { + for await (const x of source) yield x + } finally { + finalized = true + } + }) + t.write('a') + await new Promise((resolve) => setTimeout(resolve, 20)) + t.destroy() + await new Promise((resolve) => t.once('close', resolve)) + await new Promise((resolve) => setTimeout(resolve, 20)) // let finally settle + ok(finalized, 'generator finally must run on destroy') +} diff --git a/test/web.js b/test/web.js new file mode 100644 index 0000000..f098022 --- /dev/null +++ b/test/web.js @@ -0,0 +1,191 @@ +import { transform } from '../web.js' + +const decoder = new TextDecoder('ascii') +const encoder = new TextEncoder() + +const eq = (actual, expected, msg) => { + if (actual !== expected) { + throw new Error(`${msg || 'eq'}: expected ${JSON.stringify(expected)}, got ${JSON.stringify(actual)}`) + } +} + +const deepEq = (actual, expected, msg) => { + if (JSON.stringify(actual) !== JSON.stringify(expected)) { + throw new Error(`${msg || 'deepEq'}: expected ${JSON.stringify(expected)}, got ${JSON.stringify(actual)}`) + } +} + +const concat = (chunks) => { + let total = 0 + for (const c of chunks) total += c.length + const out = new Uint8Array(total) + let i = 0 + for (const c of chunks) { out.set(c, i); i += c.length } + return out +} + +const collect = async (readable) => { + const reader = readable.getReader() + const out = [] + while (true) { + const { value, done } = await reader.read() + if (done) return out + out.push(value) + } +} + +const pipeThrough = (input, ts) => { + const r = new ReadableStream({ + start (controller) { + for (const chunk of input) controller.enqueue(chunk) + controller.close() + } + }) + return collect(r.pipeThrough(ts)) +} + +export async function classicControllerEnqueue () { + const ts = transform((chunk, controller) => { + controller.enqueue(chunk) + controller.enqueue(chunk) + }) + const out = await pipeThrough([encoder.encode('a'), encoder.encode('b')], ts) + eq(decoder.decode(concat(out)), 'aabb') +} + +export async function asyncReturnedValueEnqueued () { + const ts = transform(async (chunk) => chunk % 2 === 0 ? chunk : undefined) + const out = await pipeThrough([0, 1, 2, 3, 4], ts) + deepEq(out, [0, 2, 4]) +} + +export async function asyncgenFanOut () { + const ts = transform(async function * (source) { + for await (const chunk of source) { + yield chunk + yield chunk + } + }) + const out = await pipeThrough(['a', 'b'], ts) + deepEq(out, ['a', 'a', 'b', 'b']) +} + +export async function asyncgenFilter () { + const ts = transform(async function * (source) { + for await (const chunk of source) if (chunk % 2 === 0) yield chunk + }) + const out = await pipeThrough([0, 1, 2, 3, 4, 5], ts) + deepEq(out, [0, 2, 4]) +} + +export async function flushClassic () { + const ts = transform( + (chunk, controller) => controller.enqueue(chunk), + (controller) => controller.enqueue('END') + ) + const out = await pipeThrough(['a', 'b'], ts) + deepEq(out, ['a', 'b', 'END']) +} + +export async function flushAsync () { + const ts = transform( + async (chunk) => chunk, + async () => 'END' + ) + const out = await pipeThrough(['x'], ts) + deepEq(out, ['x', 'END']) +} + +export async function asyncRejectedPropagates () { + const ts = transform(async () => { throw new Error('boom') }) + const r = new ReadableStream({ + start (controller) { controller.enqueue('x'); controller.close() } + }) + let err + try { await collect(r.pipeThrough(ts)) } catch (e) { err = e } + if (!err) throw new Error('expected error') + eq(err.message, 'boom') +} + +export async function asyncgenThrownPropagates () { + const ts = transform(async function * () { throw new Error('gen-boom') }) + const r = new ReadableStream({ + start (controller) { controller.enqueue('x'); controller.close() } + }) + let err + try { await collect(r.pipeThrough(ts)) } catch (e) { err = e } + if (!err) throw new Error('expected error') + eq(err.message, 'gen-boom') +} + +export async function noopDefault () { + const ts = transform() + const out = await pipeThrough(['a', 'b', 'c'], ts) + deepEq(out, ['a', 'b', 'c']) +} + +// Memory backpressure: a slow async generator with many fast upstream chunks +// must not let the internal queue grow unbounded. The transform method holds +// when its queue is full, propagating backpressure through pipeThrough. +export async function asyncgenWritableBackpressure () { + const ts = transform(async function * (source) { + for await (const x of source) { + await new Promise((resolve) => setTimeout(resolve, 2)) + yield x + } + }) + const N = 200 + const r = new ReadableStream({ + start (controller) { + for (let i = 0; i < N; i++) controller.enqueue(i) + controller.close() + } + }) + const out = await collect(r.pipeThrough(ts)) + deepEq(out, Array.from({ length: N }, (_, i) => i)) +} + +// Reader cancel must finalize the user's async generator: its `finally` +// block must run so resources can be released. +export async function asyncgenCancelRunsFinally () { + let finalized = false + const ts = transform(async function * (source) { + try { + for await (const x of source) yield x + } finally { + finalized = true + } + }) + const r = new ReadableStream({ + start (controller) { controller.enqueue('a') } + // intentionally never closes; reader will cancel + }) + const out = r.pipeThrough(ts) + const reader = out.getReader() + await reader.read() + await new Promise((resolve) => setTimeout(resolve, 20)) + await reader.cancel() + await new Promise((resolve) => setTimeout(resolve, 20)) + if (!finalized) throw new Error('generator finally must run on cancel') +} + +// Downstream backpressure: a fast generator emitting many chunks, with a +// slow reader, must not flood the readable buffer unboundedly. +export async function asyncgenReaderBackpressure () { + const ts = transform(async function * (source) { + // eslint-disable-next-line no-unused-vars + for await (const _ of source) for (let i = 0; i < 50; i++) yield i + }) + const r = new ReadableStream({ + start (controller) { controller.enqueue('go'); controller.close() } + }) + const reader = r.pipeThrough(ts).getReader() + const out = [] + while (true) { + const { value, done } = await reader.read() + if (done) break + out.push(value) + await new Promise((resolve) => setTimeout(resolve, 1)) // slow reader + } + deepEq(out, Array.from({ length: 50 }, (_, i) => i)) +} diff --git a/through2.js b/through2.js index 925da9d..22ae464 100644 --- a/through2.js +++ b/through2.js @@ -1,83 +1,293 @@ -const { Transform } = require('readable-stream') +/** + * @file Tiny utilities for inserting transformation logic into Node.js stream + * pipelines without subclassing `Transform`. Exports `transform`, + * `objectTransform`, `transformer`. See `through2/web` for the Web Streams + * variant. + */ -function inherits (fn, sup) { - fn.super_ = sup - fn.prototype = Object.create(sup.prototype, { - constructor: { value: fn, enumerable: false, writable: true, configurable: true } - }) -} +import { Transform } from 'readable-stream' -// create a new export function, used by both the main export and -// the .ctor export, contains common logic for dealing with arguments -function through2 (construct) { - return (options, transform, flush) => { - if (typeof options === 'function') { - flush = transform - transform = options - options = {} - } +/** @typedef {(chunk: any, encoding: BufferEncoding, callback: (err?: Error | null, data?: any) => void) => void} ClassicTransformFn */ +/** @typedef {(chunk: any, encoding: BufferEncoding) => any | Promise} AsyncTransformFn */ +/** @typedef {(source: AsyncIterable) => AsyncGenerator} AsyncGenTransformFn */ +/** @typedef {ClassicTransformFn | AsyncTransformFn | AsyncGenTransformFn} TransformFn */ +/** @typedef {(callback: (err?: Error | null) => void) => void} ClassicFlushFn */ +/** @typedef {() => any | Promise} AsyncFlushFn */ +/** @typedef {ClassicFlushFn | AsyncFlushFn} FlushFn */ +/** @typedef {import('readable-stream').TransformOptions} TransformOptions */ - if (typeof transform !== 'function') { - // noop - transform = (chunk, enc, cb) => cb(null, chunk) - } +const AsyncFunction = async function () {}.constructor +const AsyncGeneratorFunction = async function * () {}.constructor - if (typeof flush !== 'function') { - flush = null - } +/** + * @param {Function} fn + * @returns {'async' | 'asyncgen' | 'classic'} + */ +function fnKind (fn) { + if (fn instanceof AsyncGeneratorFunction) return 'asyncgen' + if (fn instanceof AsyncFunction) return 'async' + return 'classic' +} - return construct(options, transform, flush) +/** + * @param {TransformOptions | TransformFn} [options] + * @param {TransformFn} [transform] + * @param {FlushFn} [flush] + * @returns {{ options: TransformOptions, transform: TransformFn, flush: FlushFn | null }} + */ +function resolveArgs (options, transform, flush) { + if (typeof options === 'function') { + flush = /** @type {FlushFn} */ (transform) + transform = options + options = {} + } + if (typeof transform !== 'function') { + /** @type {ClassicTransformFn} */ + const passthrough = (chunk, _enc, cb) => cb(null, chunk) + transform = passthrough } + return { options: options || {}, transform, flush: typeof flush === 'function' ? flush : null } } -// main export, just make me a transform stream! -const make = through2((options, transform, flush) => { - const t2 = new Transform(options) +/** + * @param {import('readable-stream').Transform} t + * @param {TransformFn} transformFn + * @param {FlushFn | null} flushFn + */ +function setupTransform (t, transformFn, flushFn) { + const tKind = fnKind(transformFn) - t2._transform = transform - - if (flush) { - t2._flush = flush + if (tKind === 'asyncgen') { + setupAsyncGen(t, /** @type {AsyncGenTransformFn} */ (transformFn), flushFn) + return } - return t2 -}) + if (tKind === 'async') { + const fn = /** @type {AsyncTransformFn} */ (transformFn) + t._transform = function (chunk, enc, cb) { + ;(async () => fn.call(this, chunk, enc))().then( + (out) => cb(null, out), + (err) => cb(err) + ) + } + } else { + t._transform = /** @type {ClassicTransformFn} */ (transformFn) + } -// make me a reusable prototype that I can `new`, or implicitly `new` -// with a constructor call -const ctor = through2((options, transform, flush) => { - function Through2 (override) { - if (!(this instanceof Through2)) { - return new Through2(override) + if (flushFn) { + if (fnKind(flushFn) === 'async') { + const fn = /** @type {AsyncFlushFn} */ (flushFn) + t._flush = function (cb) { + ;(async () => fn.call(this))().then( + (out) => { if (out !== undefined) this.push(out); cb() }, + (err) => cb(err) + ) + } + } else { + t._flush = /** @type {ClassicFlushFn} */ (flushFn) } + } +} - this.options = Object.assign({}, options, override) +/** + * Drive an async-generator transform. Incoming chunks are queued and exposed + * as an async-iterable source; yielded values are pushed downstream. Honours + * backpressure on both sides: writes pause when the queue reaches the + * writable HWM; the consumer pauses when `t.push()` reports a full readable + * buffer (resumed by the next `_read()` call). + * + * @param {import('readable-stream').Transform} t + * @param {AsyncGenTransformFn} gen + * @param {FlushFn | null} flushFn + */ +function setupAsyncGen (t, gen, flushFn) { + /** @type {any[]} */ + const queue = [] + /** @type {Array<(value?: any) => void>} */ + const sourceWakeups = [] + /** @type {Array<(value?: any) => void>} */ + const writeWaiters = [] + /** @type {Array<(value?: any) => void>} */ + const readWaiters = [] + let writableEnded = false + /** @type {{ cb: Function | null }} */ + const flushState = { cb: null } - Transform.call(this, this.options) + const writableHWM = t.writableHighWaterMark ?? 16 - this._transform = transform - if (flush) { - this._flush = flush + const wakeOne = (/** @type {Array<(value?: any) => void>} */ arr) => { + const w = arr.shift() + if (w) w() + } + const wakeAll = (/** @type {Array<(value?: any) => void>} */ arr) => { + while (arr.length) /** @type {(value?: any) => void} */ (arr.shift())() + } + + const source = (async function * () { + while (true) { + while (queue.length > 0) { + yield queue.shift() + wakeOne(writeWaiters) + } + if (writableEnded) return + await new Promise((resolve) => { sourceWakeups.push(resolve) }) } + })() + + t._transform = function (chunk, _enc, cb) { + queue.push(chunk) + wakeOne(sourceWakeups) + if (queue.length >= writableHWM) writeWaiters.push(cb) + else cb() } - inherits(Through2, Transform) + t._flush = function (cb) { + flushState.cb = cb + writableEnded = true + wakeOne(sourceWakeups) + } + + // Default Transform._read drives _transform pull semantics, which we + // bypass. We use it solely as the "downstream wants more" signal. + t._read = function () { wakeAll(readWaiters) } + + // On destroy/close, end the source iterable so the user's `for await` + // unwinds (running its `finally`) and our consumer loop exits. + t.once('close', () => { + writableEnded = true + wakeAll(sourceWakeups); wakeAll(readWaiters); wakeAll(writeWaiters) + }) - return Through2 -}) + ;(async () => { + try { + for await (const out of gen(source)) { + if (t.destroyed) return + if (!t.push(out)) { + await new Promise((resolve) => { readWaiters.push(resolve) }) + } + } + if (t.destroyed) return + if (flushFn) { + if (fnKind(flushFn) === 'async') { + const out = await /** @type {AsyncFlushFn} */ (flushFn).call(t) + if (out !== undefined && !t.push(out)) { + await new Promise((resolve) => { readWaiters.push(resolve) }) + } + } else { + await new Promise((resolve, reject) => { + /** @type {ClassicFlushFn} */ (flushFn).call(t, (err) => err ? reject(err) : resolve(undefined)) + }) + } + } + if (flushState.cb) flushState.cb() + } catch (err) { + if (flushState.cb) flushState.cb(/** @type {Error} */ (err)) + else t.destroy(/** @type {Error} */ (err)) + } + })() +} -const obj = through2(function (options, transform, flush) { - const t2 = new Transform(Object.assign({ objectMode: true, highWaterMark: 16 }, options)) +/** + * Build a Transform around a transformation function (classic / async / async + * generator; auto-dispatched). + * + * @example Classic + * transform(function (chunk, _enc, cb) { this.push(chunk); cb() }) + * @example Async + * transform(async (chunk) => chunk.toString().toUpperCase()) + * @example Async generator (1-to-many; cross-chunk state is just a local var) + * transform(async function * (source) { + * let buf = '' + * for await (const chunk of source) { + * buf += chunk.toString() + * const lines = buf.split('\n') + * buf = lines.pop() + * for (const line of lines) yield line + * } + * if (buf) yield buf + * }) + * + * @param {TransformOptions | TransformFn} [options] + * @param {TransformFn} [transformFn] + * @param {FlushFn} [flushFn] + * @returns {import('readable-stream').Transform} + */ +export function transform (options, transformFn, flushFn) { + const args = resolveArgs(options, transformFn, flushFn) + const t = new Transform(args.options) + setupTransform(t, args.transform, args.flush) + return t +} - t2._transform = transform +/** + * Like {@link transform}, with `objectMode: true` by default. + * + * @example Filter + * objectTransform(async (item) => predicate(item) ? item : undefined) + * @example Batch + * objectTransform(async function * (source) { + * let batch = [] + * for await (const item of source) { + * batch.push(item) + * if (batch.length >= 100) { yield batch; batch = [] } + * } + * if (batch.length) yield batch + * }) + * + * @param {TransformOptions | TransformFn} [options] + * @param {TransformFn} [transformFn] + * @param {FlushFn} [flushFn] + * @returns {import('readable-stream').Transform} + */ +export function objectTransform (options, transformFn, flushFn) { + const args = resolveArgs(options, transformFn, flushFn) + const t = new Transform({ objectMode: true, highWaterMark: 16, ...args.options }) + setupTransform(t, args.transform, args.flush) + return t +} - if (flush) { - t2._flush = flush +/** + * Build a reusable factory. The returned function works with or without + * `new`; per-call options merge over the configured defaults and are exposed + * as `this.options` inside the transform. + * + * Note: in v5 this returns a Transform instance directly, not a true + * constructor; `instanceof FactoryFn` no longer holds (`instanceof Transform` + * still does). + * + * @example + * const Counter = transformer({ objectMode: true }, function (chunk, _enc, cb) { + * this.count = (this.count || 0) + 1 + * this.push(chunk); cb() + * }) + * const a = Counter() + * const b = new Counter({ highWaterMark: 32 }) + * + * @param {TransformOptions | TransformFn} [options] + * @param {TransformFn} [transformFn] + * @param {FlushFn} [flushFn] + * @returns {(override?: TransformOptions) => import('readable-stream').Transform & { options: TransformOptions }} + */ +export function transformer (options, transformFn, flushFn) { + const args = resolveArgs(options, transformFn, flushFn) + return function make (override) { + const merged = { ...args.options, ...override } + const t = /** @type {import('readable-stream').Transform & { options: TransformOptions }} */ ( + new Transform(merged) + ) + t.options = merged + setupTransform(t, args.transform, args.flush) + return t } +} - return t2 -}) +/** + * Default export: `transform` with legacy `.obj` / `.ctor` for v4 + * back-compatibility. + * @type {typeof transform & { obj: typeof objectTransform, ctor: typeof transformer }} + */ +const through2 = /** @type {any} */ (transform) +through2.obj = objectTransform +through2.ctor = transformer -module.exports = make -module.exports.ctor = ctor -module.exports.obj = obj +export default through2 diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..a95a447 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,25 @@ +{ + "compilerOptions": { + "allowJs": true, + "checkJs": true, + "strict": true, + "esModuleInterop": true, + "target": "ESNext", + "module": "nodenext", + "moduleResolution": "nodenext", + "skipLibCheck": true, + "types": ["node"], + "declaration": true, + "declarationMap": true, + "emitDeclarationOnly": true, + "outDir": "types" + }, + "include": [ + "through2.js", + "web.js" + ], + "exclude": [ + "node_modules", + "test" + ] +} diff --git a/types/through2.d.ts b/types/through2.d.ts new file mode 100644 index 0000000..57551eb --- /dev/null +++ b/types/through2.d.ts @@ -0,0 +1,91 @@ +/** + * Build a Transform around a transformation function (classic / async / async + * generator; auto-dispatched). + * + * @example Classic + * transform(function (chunk, _enc, cb) { this.push(chunk); cb() }) + * @example Async + * transform(async (chunk) => chunk.toString().toUpperCase()) + * @example Async generator (1-to-many; cross-chunk state is just a local var) + * transform(async function * (source) { + * let buf = '' + * for await (const chunk of source) { + * buf += chunk.toString() + * const lines = buf.split('\n') + * buf = lines.pop() + * for (const line of lines) yield line + * } + * if (buf) yield buf + * }) + * + * @param {TransformOptions | TransformFn} [options] + * @param {TransformFn} [transformFn] + * @param {FlushFn} [flushFn] + * @returns {import('readable-stream').Transform} + */ +export function transform(options?: TransformOptions | TransformFn, transformFn?: TransformFn, flushFn?: FlushFn): import("readable-stream").Transform; +/** + * Like {@link transform}, with `objectMode: true` by default. + * + * @example Filter + * objectTransform(async (item) => predicate(item) ? item : undefined) + * @example Batch + * objectTransform(async function * (source) { + * let batch = [] + * for await (const item of source) { + * batch.push(item) + * if (batch.length >= 100) { yield batch; batch = [] } + * } + * if (batch.length) yield batch + * }) + * + * @param {TransformOptions | TransformFn} [options] + * @param {TransformFn} [transformFn] + * @param {FlushFn} [flushFn] + * @returns {import('readable-stream').Transform} + */ +export function objectTransform(options?: TransformOptions | TransformFn, transformFn?: TransformFn, flushFn?: FlushFn): import("readable-stream").Transform; +/** + * Build a reusable factory. The returned function works with or without + * `new`; per-call options merge over the configured defaults and are exposed + * as `this.options` inside the transform. + * + * Note: in v5 this returns a Transform instance directly, not a true + * constructor; `instanceof FactoryFn` no longer holds (`instanceof Transform` + * still does). + * + * @example + * const Counter = transformer({ objectMode: true }, function (chunk, _enc, cb) { + * this.count = (this.count || 0) + 1 + * this.push(chunk); cb() + * }) + * const a = Counter() + * const b = new Counter({ highWaterMark: 32 }) + * + * @param {TransformOptions | TransformFn} [options] + * @param {TransformFn} [transformFn] + * @param {FlushFn} [flushFn] + * @returns {(override?: TransformOptions) => import('readable-stream').Transform & { options: TransformOptions }} + */ +export function transformer(options?: TransformOptions | TransformFn, transformFn?: TransformFn, flushFn?: FlushFn): (override?: TransformOptions) => import("readable-stream").Transform & { + options: TransformOptions; +}; +export default through2; +export type ClassicTransformFn = (chunk: any, encoding: BufferEncoding, callback: (err?: Error | null, data?: any) => void) => void; +export type AsyncTransformFn = (chunk: any, encoding: BufferEncoding) => any | Promise; +export type AsyncGenTransformFn = (source: AsyncIterable) => AsyncGenerator; +export type TransformFn = ClassicTransformFn | AsyncTransformFn | AsyncGenTransformFn; +export type ClassicFlushFn = (callback: (err?: Error | null) => void) => void; +export type AsyncFlushFn = () => any | Promise; +export type FlushFn = ClassicFlushFn | AsyncFlushFn; +export type TransformOptions = import("readable-stream").TransformOptions; +/** + * Default export: `transform` with legacy `.obj` / `.ctor` for v4 + * back-compatibility. + * @type {typeof transform & { obj: typeof objectTransform, ctor: typeof transformer }} + */ +declare const through2: typeof transform & { + obj: typeof objectTransform; + ctor: typeof transformer; +}; +//# sourceMappingURL=through2.d.ts.map \ No newline at end of file diff --git a/types/through2.d.ts.map b/types/through2.d.ts.map new file mode 100644 index 0000000..e6d61a4 --- /dev/null +++ b/types/through2.d.ts.map @@ -0,0 +1 @@ +{"version":3,"file":"through2.d.ts","sourceRoot":"","sources":["../through2.js"],"names":[],"mappings":"AA6LA;;;;;;;;;;;;;;;;;;;;;;;;GAwBG;AACH,oCALW,gBAAgB,GAAG,WAAW,gBAC9B,WAAW,YACX,OAAO,GACL,OAAO,iBAAiB,EAAE,SAAS,CAO/C;AAED;;;;;;;;;;;;;;;;;;;GAmBG;AACH,0CALW,gBAAgB,GAAG,WAAW,gBAC9B,WAAW,YACX,OAAO,GACL,OAAO,iBAAiB,EAAE,SAAS,CAO/C;AAED;;;;;;;;;;;;;;;;;;;;;GAqBG;AACH,sCALW,gBAAgB,GAAG,WAAW,gBAC9B,WAAW,YACX,OAAO,GACL,CAAC,QAAQ,CAAC,EAAE,gBAAgB,KAAK,OAAO,iBAAiB,EAAE,SAAS,GAAG;IAAE,OAAO,EAAE,gBAAgB,CAAA;CAAE,CAahH;;iCAhRa,CAAC,KAAK,EAAE,GAAG,EAAE,QAAQ,EAAE,cAAc,EAAE,QAAQ,EAAE,CAAC,GAAG,CAAC,EAAE,KAAK,GAAG,IAAI,EAAE,IAAI,CAAC,EAAE,GAAG,KAAK,IAAI,KAAK,IAAI;+BAClG,CAAC,KAAK,EAAE,GAAG,EAAE,QAAQ,EAAE,cAAc,KAAK,GAAG,GAAG,OAAO,CAAC,GAAG,CAAC;kCAC5D,CAAC,MAAM,EAAE,aAAa,CAAC,GAAG,CAAC,KAAK,cAAc,CAAC,GAAG,EAAE,IAAI,EAAE,IAAI,CAAC;0BAC/D,kBAAkB,GAAG,gBAAgB,GAAG,mBAAmB;6BAC3D,CAAC,QAAQ,EAAE,CAAC,GAAG,CAAC,EAAE,KAAK,GAAG,IAAI,KAAK,IAAI,KAAK,IAAI;2BAChD,MAAM,GAAG,GAAG,OAAO,CAAC,GAAG,CAAC;sBACxB,cAAc,GAAG,YAAY;+BAC7B,OAAO,iBAAiB,EAAE,gBAAgB;AA2QxD;;;;GAIG;AACH,wBAFU,OAAO,SAAS,GAAG;IAAE,GAAG,EAAE,OAAO,eAAe,CAAC;IAAC,IAAI,EAAE,OAAO,WAAW,CAAA;CAAE,CAEvC"} \ No newline at end of file diff --git a/types/web.d.ts b/types/web.d.ts new file mode 100644 index 0000000..8a3003d --- /dev/null +++ b/types/web.d.ts @@ -0,0 +1,42 @@ +/** + * Build a TransformStream around a transformation function (classic / + * async / async generator; auto-dispatched). + * + * @example Classic controller form (multiple enqueue per chunk) + * transform((chunk, controller) => { + * controller.enqueue(chunk) + * controller.enqueue({ size: chunk.length }) + * }) + * @example Async (returned value enqueued) + * transform(async (chunk) => chunk.toString().toUpperCase()) + * @example Async generator (cross-chunk state is just a local var) + * transform(async function * (source) { + * let buf = '' + * for await (const chunk of source) { + * buf += chunk.toString() + * const lines = buf.split('\n') + * buf = lines.pop() + * for (const line of lines) yield line + * } + * if (buf) yield buf + * }) + * @example Pipe a fetch response + * await response.body.pipeThrough(transform(async (chunk) => chunk)).pipeTo(dest) + * + * @param {WebTransformFn} [transformFn] + * @param {WebFlushFn} [flushFn] + * @returns {{ readable: ReadableStream, writable: WritableStream }} + */ +export function transform(transformFn?: WebTransformFn, flushFn?: WebFlushFn): { + readable: ReadableStream; + writable: WritableStream; +}; +export default transform; +export type ClassicWebTransformFn = (chunk: any, controller: TransformStreamDefaultController) => void; +export type AsyncWebTransformFn = (chunk: any) => any | Promise; +export type AsyncGenWebTransformFn = (source: AsyncIterable) => AsyncGenerator; +export type WebTransformFn = ClassicWebTransformFn | AsyncWebTransformFn | AsyncGenWebTransformFn; +export type ClassicWebFlushFn = (controller: TransformStreamDefaultController) => void; +export type AsyncWebFlushFn = () => any | Promise; +export type WebFlushFn = ClassicWebFlushFn | AsyncWebFlushFn; +//# sourceMappingURL=web.d.ts.map \ No newline at end of file diff --git a/types/web.d.ts.map b/types/web.d.ts.map new file mode 100644 index 0000000..12b2589 --- /dev/null +++ b/types/web.d.ts.map @@ -0,0 +1 @@ +{"version":3,"file":"web.d.ts","sourceRoot":"","sources":["../web.js"],"names":[],"mappings":"AA6BA;;;;;;;;;;;;;;;;;;;;;;;;;;;;GA4BG;AACH,wCAJW,cAAc,YACd,UAAU,GACR;IAAE,QAAQ,EAAE,cAAc,CAAC,GAAG,CAAC,CAAC;IAAC,QAAQ,EAAE,cAAc,CAAC,GAAG,CAAC,CAAA;CAAE,CA2C5E;;oCA3Fa,CAAC,KAAK,EAAE,GAAG,EAAE,UAAU,EAAE,gCAAgC,CAAC,GAAG,CAAC,KAAK,IAAI;kCACvE,CAAC,KAAK,EAAE,GAAG,KAAK,GAAG,GAAG,OAAO,CAAC,GAAG,CAAC;qCAClC,CAAC,MAAM,EAAE,aAAa,CAAC,GAAG,CAAC,KAAK,cAAc,CAAC,GAAG,EAAE,IAAI,EAAE,IAAI,CAAC;6BAC/D,qBAAqB,GAAG,mBAAmB,GAAG,sBAAsB;gCACpE,CAAC,UAAU,EAAE,gCAAgC,CAAC,GAAG,CAAC,KAAK,IAAI;8BAC3D,MAAM,GAAG,GAAG,OAAO,CAAC,GAAG,CAAC;yBACxB,iBAAiB,GAAG,eAAe"} \ No newline at end of file diff --git a/web.js b/web.js new file mode 100644 index 0000000..1d4fe4f --- /dev/null +++ b/web.js @@ -0,0 +1,241 @@ +/** + * @file Tiny utility for inserting transformation logic into Web Streams + * pipelines. Returns `TransformStream` instances; zero runtime dependencies; + * runs anywhere `TransformStream` is a global (modern browsers, Node.js, + * Deno, Bun, Cloudflare Workers). See the root `through2` entry for the + * Node.js streams variant. + */ + +/** @typedef {(chunk: any, controller: TransformStreamDefaultController) => void} ClassicWebTransformFn */ +/** @typedef {(chunk: any) => any | Promise} AsyncWebTransformFn */ +/** @typedef {(source: AsyncIterable) => AsyncGenerator} AsyncGenWebTransformFn */ +/** @typedef {ClassicWebTransformFn | AsyncWebTransformFn | AsyncGenWebTransformFn} WebTransformFn */ +/** @typedef {(controller: TransformStreamDefaultController) => void} ClassicWebFlushFn */ +/** @typedef {() => any | Promise} AsyncWebFlushFn */ +/** @typedef {ClassicWebFlushFn | AsyncWebFlushFn} WebFlushFn */ + +const AsyncFunction = async function () {}.constructor +const AsyncGeneratorFunction = async function * () {}.constructor + +/** + * @param {Function} fn + * @returns {'async' | 'asyncgen' | 'classic'} + */ +function fnKind (fn) { + if (fn instanceof AsyncGeneratorFunction) return 'asyncgen' + if (fn instanceof AsyncFunction) return 'async' + return 'classic' +} + +/** + * Build a TransformStream around a transformation function (classic / + * async / async generator; auto-dispatched). + * + * @example Classic controller form (multiple enqueue per chunk) + * transform((chunk, controller) => { + * controller.enqueue(chunk) + * controller.enqueue({ size: chunk.length }) + * }) + * @example Async (returned value enqueued) + * transform(async (chunk) => chunk.toString().toUpperCase()) + * @example Async generator (cross-chunk state is just a local var) + * transform(async function * (source) { + * let buf = '' + * for await (const chunk of source) { + * buf += chunk.toString() + * const lines = buf.split('\n') + * buf = lines.pop() + * for (const line of lines) yield line + * } + * if (buf) yield buf + * }) + * @example Pipe a fetch response + * await response.body.pipeThrough(transform(async (chunk) => chunk)).pipeTo(dest) + * + * @param {WebTransformFn} [transformFn] + * @param {WebFlushFn} [flushFn] + * @returns {{ readable: ReadableStream, writable: WritableStream }} + */ +export function transform (transformFn, flushFn) { + if (typeof transformFn !== 'function') { + /** @type {ClassicWebTransformFn} */ + const passthrough = (chunk, controller) => controller.enqueue(chunk) + transformFn = passthrough + } + + const tKind = fnKind(transformFn) + + if (tKind === 'asyncgen') { + return asyncGenTransform(/** @type {AsyncGenWebTransformFn} */ (transformFn), flushFn) + } + + /** @type {Transformer} */ + const inner = {} + + if (tKind === 'async') { + const fn = /** @type {AsyncWebTransformFn} */ (transformFn) + inner.transform = async (chunk, controller) => { + const out = await fn(chunk) + if (out !== undefined) controller.enqueue(out) + } + } else { + const fn = /** @type {ClassicWebTransformFn} */ (transformFn) + inner.transform = (chunk, controller) => fn(chunk, controller) + } + + if (flushFn) { + if (fnKind(flushFn) === 'async') { + const fn = /** @type {AsyncWebFlushFn} */ (flushFn) + inner.flush = async (controller) => { + const out = await fn() + if (out !== undefined) controller.enqueue(out) + } + } else { + const fn = /** @type {ClassicWebFlushFn} */ (flushFn) + inner.flush = (controller) => fn(controller) + } + } + + return new TransformStream(inner) +} + +/** + * Drive an async-generator transform as a `{ readable, writable }` pair. + * + * The pair shape (rather than a `TransformStream`) is required because we + * need cancellation hooks: the `Transformer.cancel` callback is in the + * WHATWG Streams spec but not yet implemented in major browsers as of 2026, + * whereas `ReadableStream.cancel` and `WritableStream.abort` are universally + * available. The pair satisfies `pipeThrough`'s structural contract. + * + * Backpressure follows the standard pull-based model: `pull()` enqueues one + * yielded value per call and is invoked by the runtime exactly when the + * readable buffer has room; `write()` returns a pending promise when the + * internal queue is full, propagating pressure upstream. + * + * @param {AsyncGenWebTransformFn} gen + * @param {WebFlushFn} [flushFn] + * @returns {{ readable: ReadableStream, writable: WritableStream }} + */ +function asyncGenTransform (gen, flushFn) { + /** @type {any[]} */ + const queue = [] + /** @type {Array<(value?: any) => void>} */ + const sourceWakeups = [] + /** @type {Array<(value?: any) => void>} */ + const writeWaiters = [] + let sourceEnded = false + let flushed = false + + const QUEUE_HWM = 16 + + const wakeOne = (/** @type {Array<(value?: any) => void>} */ arr) => { + const w = arr.shift() + if (w) w() + } + const wakeAll = (/** @type {Array<(value?: any) => void>} */ arr) => { + while (arr.length) /** @type {(value?: any) => void} */ (arr.shift())() + } + + // The source iterable user code consumes via `for await (const x of source)`. + const source = (async function * () { + while (true) { + while (queue.length > 0) { + yield queue.shift() + wakeOne(writeWaiters) + } + if (sourceEnded) return + await new Promise((resolve) => { sourceWakeups.push(resolve) }) + } + })() + + // Build the user's iterator once; pull() drives it on demand. + const iter = gen(source) + + // Controllers captured in start() so the two sides can cross-propagate + // termination (matching the WHATWG TransformStream behaviour: cancel on + // one side errors the other). + /** @type {ReadableStreamDefaultController | null} */ + let readableController = null + /** @type {WritableStreamDefaultController | null} */ + let writableController = null + let terminating = false + + // Shared cleanup: end the source, finalize the user's generator (running + // its `finally`), error the opposite side. Idempotent. + const finalize = async (/** @type {any} */ reason, /** @type {'readable'|'writable'} */ origin) => { + if (terminating) return + terminating = true + sourceEnded = true + wakeAll(sourceWakeups) + wakeAll(writeWaiters) + if (typeof iter.return === 'function') { + try { await iter.return() } catch { /* user finally threw; ignored */ } + } + if (origin === 'readable' && writableController) { + try { writableController.error(reason) } catch { /* already errored */ } + } else if (origin === 'writable' && readableController) { + try { readableController.error(reason) } catch { /* already errored */ } + } + } + + return { + readable: new ReadableStream({ + start (controller) { readableController = controller }, + async pull (controller) { + try { + const { value, done } = await iter.next() + if (!done) { + controller.enqueue(value) + return + } + if (!flushed) { + flushed = true + if (flushFn) { + if (fnKind(flushFn) === 'async') { + const fn = /** @type {AsyncWebFlushFn} */ (flushFn) + const out = await fn() + if (out !== undefined) controller.enqueue(out) + } else { + // Classic flush expects a TransformStreamDefaultController-ish + // shape; synthesize one over the ReadableStream's controller. + /** @type {any} */ + const proxy = { + enqueue: (/** @type {any} */ c) => controller.enqueue(c), + error: (/** @type {any} */ e) => controller.error(e), + terminate: () => controller.close() + } + const fn = /** @type {ClassicWebFlushFn} */ (flushFn) + fn(proxy) + } + } + } + controller.close() + } catch (err) { + controller.error(err) + } + }, + cancel (reason) { return finalize(reason, 'readable') } + }), + writable: new WritableStream({ + start (controller) { writableController = controller }, + write (chunk) { + queue.push(chunk) + wakeOne(sourceWakeups) + if (queue.length >= QUEUE_HWM) { + return new Promise((resolve) => { writeWaiters.push(resolve) }) + } + return undefined + }, + close () { + // Clean end: let the source iterable return so the user's `for await` + // exits normally and pull() can run any flush logic. + sourceEnded = true + wakeAll(sourceWakeups) + }, + abort (reason) { return finalize(reason, 'writable') } + }) + } +} + +export default transform