From 6f991464e4ec6f7559b5989f636425d72a8b69ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 16:27:11 +0100 Subject: [PATCH 01/15] Workerize signing --- packages/sdk/createKarmaConfig.ts | 1 + packages/sdk/jest.config.ts | 1 + packages/sdk/rollup.config.mts | 69 +++++++++++-- .../sdk/src/_browser/createSigningWorker.ts | 11 +++ packages/sdk/src/_jest/createSigningWorker.ts | 12 +++ .../sdk/src/_karma/createSigningWorker.ts | 12 +++ .../sdk/src/_nodejs/createSigningWorker.ts | 11 +++ packages/sdk/src/identity/Identity.ts | 8 ++ packages/sdk/src/identity/KeyPairIdentity.ts | 4 +- packages/sdk/src/signature/MessageSigner.ts | 57 ++++++++++- packages/sdk/src/signature/Signing.ts | 28 ++++++ packages/sdk/src/signature/SigningWorker.ts | 18 ++++ packages/sdk/src/signature/signingUtils.ts | 97 +++++++++++++++++++ 13 files changed, 319 insertions(+), 10 deletions(-) create mode 100644 packages/sdk/src/_browser/createSigningWorker.ts create mode 100644 packages/sdk/src/_jest/createSigningWorker.ts create mode 100644 packages/sdk/src/_karma/createSigningWorker.ts create mode 100644 packages/sdk/src/_nodejs/createSigningWorker.ts create mode 100644 packages/sdk/src/signature/Signing.ts create mode 100644 packages/sdk/src/signature/SigningWorker.ts create mode 100644 packages/sdk/src/signature/signingUtils.ts diff --git a/packages/sdk/createKarmaConfig.ts b/packages/sdk/createKarmaConfig.ts index a1adab036e..e455504467 100644 --- a/packages/sdk/createKarmaConfig.ts +++ b/packages/sdk/createKarmaConfig.ts @@ -16,6 +16,7 @@ export function createKarmaConfig(testPaths: string[]): ReturnType/src/_jest/createSignatureValidationWorker.ts", + "^@/createSigningWorker$": "/src/_jest/createSigningWorker.ts", "^@/(.*)$": "/src/_nodejs/$1", }, transform: { diff --git a/packages/sdk/rollup.config.mts b/packages/sdk/rollup.config.mts index db9527580a..ddd3256066 100644 --- a/packages/sdk/rollup.config.mts +++ b/packages/sdk/rollup.config.mts @@ -29,8 +29,10 @@ const browserAliases: Alias[] = [ ] export default defineConfig([ - workerNodejs(), - workerBrowser(), + validationWorkerNodejs(), + validationWorkerBrowser(), + signingWorkerNodejs(), + signingWorkerBrowser(), nodejs(), nodejsTypes(), browser(), @@ -208,9 +210,9 @@ function umdMinified(): RollupOptions { } /** - * Worker bundle for Node.js - ESM format for use with web-worker {type: 'module'} + * Signature validation worker bundle for Node.js - ESM format for use with web-worker {type: 'module'} */ -function workerNodejs(): RollupOptions { +function validationWorkerNodejs(): RollupOptions { return { input: './dist/nodejs/src/signature/SignatureValidationWorker.js', context: 'globalThis', @@ -235,9 +237,9 @@ function workerNodejs(): RollupOptions { } /** - * Worker bundle for browser - ESM format for use with web-worker {type: 'module'} + * Signature validation worker bundle for browser - ESM format for use with web-worker {type: 'module'} */ -function workerBrowser(): RollupOptions { +function validationWorkerBrowser(): RollupOptions { return { input: './dist/browser/src/signature/SignatureValidationWorker.js', context: 'self', @@ -261,3 +263,58 @@ function workerBrowser(): RollupOptions { onwarn, } } + +/** + * Signing worker bundle for Node.js - ESM format for use with web-worker {type: 'module'} + */ +function signingWorkerNodejs(): RollupOptions { + return { + input: './dist/nodejs/src/signature/SigningWorker.js', + context: 'globalThis', + output: { + format: 'es', + file: './dist/workers/SigningWorker.node.mjs', + sourcemap: true, + }, + plugins: [ + json(), + alias({ + entries: nodejsAliases, + }), + nodeResolve({ + preferBuiltins: true, + }), + cjs(), + ], + external: [/node_modules/, /@streamr\//], + onwarn, + } +} + +/** + * Signing worker bundle for browser - ESM format for use with web-worker {type: 'module'} + */ +function signingWorkerBrowser(): RollupOptions { + return { + input: './dist/browser/src/signature/SigningWorker.js', + context: 'self', + output: { + format: 'es', + file: './dist/workers/SigningWorker.browser.mjs', + sourcemap: true, + }, + plugins: [ + json(), + alias({ + entries: browserAliases, + }), + nodeResolve({ + browser: true, + preferBuiltins: false, + }), + cjs(), + ], + external: [], + onwarn, + } +} diff --git a/packages/sdk/src/_browser/createSigningWorker.ts b/packages/sdk/src/_browser/createSigningWorker.ts new file mode 100644 index 0000000000..46a0c3bb84 --- /dev/null +++ b/packages/sdk/src/_browser/createSigningWorker.ts @@ -0,0 +1,11 @@ +/** + * Browser-specific signing worker factory. + */ +import Worker from 'web-worker' + +export function createSigningWorker(): InstanceType { + return new Worker( + new URL('./workers/SigningWorker.browser.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_jest/createSigningWorker.ts b/packages/sdk/src/_jest/createSigningWorker.ts new file mode 100644 index 0000000000..f80d03954a --- /dev/null +++ b/packages/sdk/src/_jest/createSigningWorker.ts @@ -0,0 +1,12 @@ +/** + * Jest-specific signing worker factory. + * Points to the built worker in dist/ for testing. + */ +import Worker from 'web-worker' + +export function createSigningWorker(): InstanceType { + return new Worker( + new URL('../../dist/workers/SigningWorker.node.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_karma/createSigningWorker.ts b/packages/sdk/src/_karma/createSigningWorker.ts new file mode 100644 index 0000000000..daae1dcbe4 --- /dev/null +++ b/packages/sdk/src/_karma/createSigningWorker.ts @@ -0,0 +1,12 @@ +/** + * Karma-specific signing worker factory. + * Points to the built worker in dist/ for browser testing. + */ +import Worker from 'web-worker' + +export function createSigningWorker(): InstanceType { + return new Worker( + new URL('../../dist/workers/SigningWorker.browser.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_nodejs/createSigningWorker.ts b/packages/sdk/src/_nodejs/createSigningWorker.ts new file mode 100644 index 0000000000..fcecd807e8 --- /dev/null +++ b/packages/sdk/src/_nodejs/createSigningWorker.ts @@ -0,0 +1,11 @@ +/** + * Node.js-specific signing worker factory. + */ +import Worker from 'web-worker' + +export function createSigningWorker(): InstanceType { + return new Worker( + new URL('./workers/SigningWorker.node.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/identity/Identity.ts b/packages/sdk/src/identity/Identity.ts index 0641139415..3a737aad5a 100644 --- a/packages/sdk/src/identity/Identity.ts +++ b/packages/sdk/src/identity/Identity.ts @@ -20,4 +20,12 @@ export abstract class Identity { abstract getSignatureType(): SignatureType abstract createMessageSignature(payload: Uint8Array): Promise abstract getTransactionSigner(rpcProviderSource: RpcProviderSource): Promise + + /** + * Returns the private key if this identity supports worker-based signing. + * Returns undefined for identities that rely on external signers (e.g. browser wallets). + */ + getPrivateKey(): Promise | undefined { + return undefined + } } diff --git a/packages/sdk/src/identity/KeyPairIdentity.ts b/packages/sdk/src/identity/KeyPairIdentity.ts index db6becf71a..6915187580 100644 --- a/packages/sdk/src/identity/KeyPairIdentity.ts +++ b/packages/sdk/src/identity/KeyPairIdentity.ts @@ -30,8 +30,8 @@ export abstract class KeyPairIdentity extends Identity { return this.publicKeyString } - async getPrivateKey(): Promise { - return this.privateKey + override getPrivateKey(): Promise { + return Promise.resolve(this.privateKey) } // eslint-disable-next-line class-methods-use-this diff --git a/packages/sdk/src/signature/MessageSigner.ts b/packages/sdk/src/signature/MessageSigner.ts index 24869e6a21..1ce5d2cc79 100644 --- a/packages/sdk/src/signature/MessageSigner.ts +++ b/packages/sdk/src/signature/MessageSigner.ts @@ -4,24 +4,77 @@ import { Identity, IdentityInjectionToken } from '../identity/Identity' import { StreamMessage, StreamMessageOptions } from '../protocol/StreamMessage' import { createSignaturePayload } from './createSignaturePayload' import { SignatureType } from '@streamr/trackerless-network' +import { Signing } from './Signing' +import { toSignaturePayloadData } from './signingUtils' +import { DestroySignal } from '../DestroySignal' @scoped(Lifecycle.ContainerScoped) export class MessageSigner { private readonly identity: Identity + private signing: Signing | undefined - constructor(@inject(IdentityInjectionToken) identity: Identity) { + constructor( + @inject(IdentityInjectionToken) identity: Identity, + destroySignal?: DestroySignal + ) { this.identity = identity + destroySignal?.onDestroy.listen(() => this.destroy()) + } + + private getSigning(): Signing { + return this.signing ??= new Signing() } async createSignedMessage( opts: MarkRequired, 'messageType'>, signatureType: SignatureType ): Promise { - const signature = await this.identity.createMessageSignature(createSignaturePayload(opts)) + let signature: Uint8Array + + // Use worker-based signing if identity provides a private key and signature type supports it + // ERC-1271 signatures require on-chain verification and must be handled by the identity directly + const privateKeyPromise = this.identity.getPrivateKey() + if (privateKeyPromise !== undefined && signatureType !== SignatureType.ERC_1271) { + signature = await this.createSignatureInWorker(opts, signatureType, privateKeyPromise) + } else { + signature = await this.identity.createMessageSignature(createSignaturePayload(opts)) + } + return new StreamMessage({ ...opts, signature, signatureType }) } + + private async createSignatureInWorker( + opts: MarkRequired, 'messageType'>, + signatureType: SignatureType, + privateKeyPromise: Promise + ): Promise { + const privateKey = await privateKeyPromise + const payloadData = toSignaturePayloadData(opts) + + const result = await this.getSigning().createSignature({ + payloadData, + privateKey, + signatureType + }) + + if (result.type === 'error') { + throw new Error(`Signing failed: ${result.message}`) + } + + return result.signature + } + + /** + * Cleanup worker resources when the signer is no longer needed. + */ + destroy(): void { + if (this.signing) { + this.signing.destroy() + this.signing = undefined + } + } } diff --git a/packages/sdk/src/signature/Signing.ts b/packages/sdk/src/signature/Signing.ts new file mode 100644 index 0000000000..f69d8a352f --- /dev/null +++ b/packages/sdk/src/signature/Signing.ts @@ -0,0 +1,28 @@ +/** + * Unified signing using Web Worker. + * This offloads CPU-intensive cryptographic operations to a separate thread. + * Works in both browser and Node.js environments via platform-specific config. + */ +import { wrap, releaseProxy, type Remote } from 'comlink' +import { createSigningWorker } from '@/createSigningWorker' +import { SigningResult, SigningRequest } from './signingUtils' +import type { SigningWorkerApi } from './SigningWorker' + +export class Signing { + private worker: ReturnType + private workerApi: Remote + + constructor() { + this.worker = createSigningWorker() + this.workerApi = wrap(this.worker) + } + + async createSignature(request: SigningRequest): Promise { + return this.workerApi.createSignature(request) + } + + destroy(): void { + this.workerApi[releaseProxy]() + this.worker.terminate() + } +} diff --git a/packages/sdk/src/signature/SigningWorker.ts b/packages/sdk/src/signature/SigningWorker.ts new file mode 100644 index 0000000000..275d792a53 --- /dev/null +++ b/packages/sdk/src/signature/SigningWorker.ts @@ -0,0 +1,18 @@ +import { expose } from 'comlink' +import { + createSignatureFromData, + SigningResult, + SigningRequest, +} from './signingUtils' + +const workerApi = { + createSignature: async ( + request: SigningRequest + ): Promise => { + return createSignatureFromData(request) + }, +} + +export type SigningWorkerApi = typeof workerApi + +expose(workerApi) diff --git a/packages/sdk/src/signature/signingUtils.ts b/packages/sdk/src/signature/signingUtils.ts new file mode 100644 index 0000000000..f0547ba0fe --- /dev/null +++ b/packages/sdk/src/signature/signingUtils.ts @@ -0,0 +1,97 @@ +/** + * Core signing logic - shared between worker and main thread implementations. + * This file contains pure cryptographic signing functions without any network dependencies. + */ +import { SigningUtil } from '@streamr/utils' +import { EncryptedGroupKey, SignatureType } from '@streamr/trackerless-network' +import { IDENTITY_MAPPING } from '../identity/IdentityMapping' +import { createSignaturePayload, MessageIdLike, MessageRefLike } from './createSignaturePayload' +import { StreamMessageType } from '../protocol/StreamMessage' + +// Lookup structure SignatureType -> SigningUtil +const signingUtilBySignatureType: Record = Object.fromEntries( + IDENTITY_MAPPING.map((idMapping) => [idMapping.signatureType, SigningUtil.getInstance(idMapping.keyType)]) +) + +/** + * Result of signing + */ +export type SigningResult = + | { type: 'success', signature: Uint8Array } + | { type: 'error', message: string } + +/** + * Plain data type for message content that needs to be signed. + * This contains only primitive values and simple objects (no class instances). + */ +export interface SignaturePayloadData { + messageId: MessageIdLike + prevMsgRef?: MessageRefLike + messageType: StreamMessageType + content: Uint8Array + newGroupKey?: EncryptedGroupKey +} + +/** + * Complete signing request including private key and signature type. + */ +export interface SigningRequest { + payloadData: SignaturePayloadData + privateKey: Uint8Array + signatureType: SignatureType +} + +/** + * Create a signature for the given data. + * This is the core signing logic that can be run in a worker. + */ +export async function createSignatureFromData(request: SigningRequest): Promise { + try { + const signingUtil = signingUtilBySignatureType[request.signatureType] + if (!signingUtil) { + return { type: 'error', message: `Unsupported signatureType: "${request.signatureType}"` } + } + + const payload = createSignaturePayload({ + messageId: request.payloadData.messageId, + content: request.payloadData.content, + messageType: request.payloadData.messageType, + prevMsgRef: request.payloadData.prevMsgRef, + newGroupKey: request.payloadData.newGroupKey, + }) + + const signature = await signingUtil.createSignature(payload, request.privateKey) + return { type: 'success', signature } + } catch (err) { + return { type: 'error', message: String(err) } + } +} + +/** + * Extract plain serializable payload data from message options for worker communication. + */ +export function toSignaturePayloadData(opts: { + messageId: MessageIdLike + prevMsgRef?: MessageRefLike + messageType: StreamMessageType + content: Uint8Array + newGroupKey?: EncryptedGroupKey +}): SignaturePayloadData { + return { + messageId: { + streamId: opts.messageId.streamId, + streamPartition: opts.messageId.streamPartition, + timestamp: opts.messageId.timestamp, + sequenceNumber: opts.messageId.sequenceNumber, + publisherId: opts.messageId.publisherId, + msgChainId: opts.messageId.msgChainId, + }, + prevMsgRef: opts.prevMsgRef ? { + timestamp: opts.prevMsgRef.timestamp, + sequenceNumber: opts.prevMsgRef.sequenceNumber, + } : undefined, + messageType: opts.messageType, + content: opts.content, + newGroupKey: opts.newGroupKey, + } +} From 80e521f554b4d131ccc44794105cf66b0ca4d8ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 16:38:16 +0100 Subject: [PATCH 02/15] Consolidate worker rollup --- packages/sdk/rollup.config.mts | 93 ++++++++++------------------------ 1 file changed, 26 insertions(+), 67 deletions(-) diff --git a/packages/sdk/rollup.config.mts b/packages/sdk/rollup.config.mts index ddd3256066..c5b97c2185 100644 --- a/packages/sdk/rollup.config.mts +++ b/packages/sdk/rollup.config.mts @@ -28,11 +28,19 @@ const browserAliases: Alias[] = [ { find: 'stream', replacement: 'readable-stream' }, ] +/** + * Worker entry points - add new workers here. + * Key: output name (will become [name].node.mjs and [name].browser.mjs) + * Value: path relative to src/ (without extension) + */ +const WORKERS: Record = { + 'SignatureValidationWorker': 'signature/SignatureValidationWorker', + 'SigningWorker': 'signature/SigningWorker', +} + export default defineConfig([ - validationWorkerNodejs(), - validationWorkerBrowser(), - signingWorkerNodejs(), - signingWorkerBrowser(), + workersNodejs(), + workersBrowser(), nodejs(), nodejsTypes(), browser(), @@ -210,70 +218,18 @@ function umdMinified(): RollupOptions { } /** - * Signature validation worker bundle for Node.js - ESM format for use with web-worker {type: 'module'} - */ -function validationWorkerNodejs(): RollupOptions { - return { - input: './dist/nodejs/src/signature/SignatureValidationWorker.js', - context: 'globalThis', - output: { - format: 'es', - file: './dist/workers/SignatureValidationWorker.node.mjs', - sourcemap: true, - }, - plugins: [ - json(), - alias({ - entries: nodejsAliases, - }), - nodeResolve({ - preferBuiltins: true, - }), - cjs(), - ], - external: [/node_modules/, /@streamr\//], - onwarn, - } -} - -/** - * Signature validation worker bundle for browser - ESM format for use with web-worker {type: 'module'} + * All worker bundles for Node.js - ESM format for use with web-worker {type: 'module'} */ -function validationWorkerBrowser(): RollupOptions { +function workersNodejs(): RollupOptions { return { - input: './dist/browser/src/signature/SignatureValidationWorker.js', - context: 'self', - output: { - format: 'es', - file: './dist/workers/SignatureValidationWorker.browser.mjs', - sourcemap: true, - }, - plugins: [ - json(), - alias({ - entries: browserAliases, - }), - nodeResolve({ - browser: true, - preferBuiltins: false, - }), - cjs(), - ], - external: [], - onwarn, - } -} - -/** - * Signing worker bundle for Node.js - ESM format for use with web-worker {type: 'module'} - */ -function signingWorkerNodejs(): RollupOptions { - return { - input: './dist/nodejs/src/signature/SigningWorker.js', + input: Object.fromEntries( + Object.entries(WORKERS).map(([name, path]) => [name, `./dist/nodejs/src/${path}.js`]) + ), context: 'globalThis', output: { format: 'es', - file: './dist/workers/SigningWorker.node.mjs', + dir: './dist/workers', + entryFileNames: '[name].node.mjs', sourcemap: true, }, plugins: [ @@ -292,15 +248,18 @@ function signingWorkerNodejs(): RollupOptions { } /** - * Signing worker bundle for browser - ESM format for use with web-worker {type: 'module'} + * All worker bundles for browser - ESM format for use with web-worker {type: 'module'} */ -function signingWorkerBrowser(): RollupOptions { +function workersBrowser(): RollupOptions { return { - input: './dist/browser/src/signature/SigningWorker.js', + input: Object.fromEntries( + Object.entries(WORKERS).map(([name, path]) => [name, `./dist/browser/src/${path}.js`]) + ), context: 'self', output: { format: 'es', - file: './dist/workers/SigningWorker.browser.mjs', + dir: './dist/workers', + entryFileNames: '[name].browser.mjs', sourcemap: true, }, plugins: [ From 54068e53a6e86fa27c5c293e92610219c62cd644 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 16:52:16 +0100 Subject: [PATCH 03/15] Make eslint happy --- packages/sdk/src/identity/Identity.ts | 1 + packages/sdk/src/signature/MessageSigner.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/sdk/src/identity/Identity.ts b/packages/sdk/src/identity/Identity.ts index 3a737aad5a..fb25434ea3 100644 --- a/packages/sdk/src/identity/Identity.ts +++ b/packages/sdk/src/identity/Identity.ts @@ -25,6 +25,7 @@ export abstract class Identity { * Returns the private key if this identity supports worker-based signing. * Returns undefined for identities that rely on external signers (e.g. browser wallets). */ + // eslint-disable-next-line class-methods-use-this getPrivateKey(): Promise | undefined { return undefined } diff --git a/packages/sdk/src/signature/MessageSigner.ts b/packages/sdk/src/signature/MessageSigner.ts index 1ce5d2cc79..9aff599724 100644 --- a/packages/sdk/src/signature/MessageSigner.ts +++ b/packages/sdk/src/signature/MessageSigner.ts @@ -15,7 +15,7 @@ export class MessageSigner { constructor( @inject(IdentityInjectionToken) identity: Identity, - destroySignal?: DestroySignal + destroySignal?: DestroySignal ) { this.identity = identity destroySignal?.onDestroy.listen(() => this.destroy()) From 70c5fff11b7dd9baa781ce8e8e91fd10a3a65965 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 16:53:36 +0100 Subject: [PATCH 04/15] Stick to mjs --- packages/sdk/rollup.config.mts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/sdk/rollup.config.mts b/packages/sdk/rollup.config.mts index c5b97c2185..750f2b18a7 100644 --- a/packages/sdk/rollup.config.mts +++ b/packages/sdk/rollup.config.mts @@ -230,6 +230,7 @@ function workersNodejs(): RollupOptions { format: 'es', dir: './dist/workers', entryFileNames: '[name].node.mjs', + chunkFileNames: '[name]-[hash].node.mjs', sourcemap: true, }, plugins: [ @@ -260,6 +261,7 @@ function workersBrowser(): RollupOptions { format: 'es', dir: './dist/workers', entryFileNames: '[name].browser.mjs', + chunkFileNames: '[name]-[hash].browser.mjs', sourcemap: true, }, plugins: [ From afb7cabe883563f977af13c44a9d08b5c6db1e0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 18:26:28 +0100 Subject: [PATCH 05/15] Simplify signing worker types --- packages/sdk/src/signature/MessageSigner.ts | 4 +- .../src/signature/createSignaturePayload.ts | 9 ++- packages/sdk/src/signature/signingUtils.ts | 57 ++----------------- 3 files changed, 12 insertions(+), 58 deletions(-) diff --git a/packages/sdk/src/signature/MessageSigner.ts b/packages/sdk/src/signature/MessageSigner.ts index 9aff599724..e7fa15430f 100644 --- a/packages/sdk/src/signature/MessageSigner.ts +++ b/packages/sdk/src/signature/MessageSigner.ts @@ -5,7 +5,6 @@ import { StreamMessage, StreamMessageOptions } from '../protocol/StreamMessage' import { createSignaturePayload } from './createSignaturePayload' import { SignatureType } from '@streamr/trackerless-network' import { Signing } from './Signing' -import { toSignaturePayloadData } from './signingUtils' import { DestroySignal } from '../DestroySignal' @scoped(Lifecycle.ContainerScoped) @@ -53,10 +52,9 @@ export class MessageSigner { privateKeyPromise: Promise ): Promise { const privateKey = await privateKeyPromise - const payloadData = toSignaturePayloadData(opts) const result = await this.getSigning().createSignature({ - payloadData, + payloadInput: opts, privateKey, signatureType }) diff --git a/packages/sdk/src/signature/createSignaturePayload.ts b/packages/sdk/src/signature/createSignaturePayload.ts index 0162139680..5720c3584e 100644 --- a/packages/sdk/src/signature/createSignaturePayload.ts +++ b/packages/sdk/src/signature/createSignaturePayload.ts @@ -26,13 +26,18 @@ export interface MessageRefLike { sequenceNumber: number } -export const createSignaturePayload = (opts: { +/** + * Input data for creating a signature payload. + */ +export interface SignaturePayloadInput { messageId: MessageIdLike content: Uint8Array messageType: StreamMessageType prevMsgRef?: MessageRefLike newGroupKey?: EncryptedGroupKey -}): Uint8Array | never => { +} + +export const createSignaturePayload = (opts: SignaturePayloadInput): Uint8Array | never => { const header = Buffer.concat([ Buffer.from(`${opts.messageId.streamId}${opts.messageId.streamPartition}${opts.messageId.timestamp}` + `${opts.messageId.sequenceNumber}${opts.messageId.publisherId}${opts.messageId.msgChainId}`), diff --git a/packages/sdk/src/signature/signingUtils.ts b/packages/sdk/src/signature/signingUtils.ts index f0547ba0fe..9bc6e32a56 100644 --- a/packages/sdk/src/signature/signingUtils.ts +++ b/packages/sdk/src/signature/signingUtils.ts @@ -3,10 +3,9 @@ * This file contains pure cryptographic signing functions without any network dependencies. */ import { SigningUtil } from '@streamr/utils' -import { EncryptedGroupKey, SignatureType } from '@streamr/trackerless-network' +import { SignatureType } from '@streamr/trackerless-network' import { IDENTITY_MAPPING } from '../identity/IdentityMapping' -import { createSignaturePayload, MessageIdLike, MessageRefLike } from './createSignaturePayload' -import { StreamMessageType } from '../protocol/StreamMessage' +import { createSignaturePayload, SignaturePayloadInput } from './createSignaturePayload' // Lookup structure SignatureType -> SigningUtil const signingUtilBySignatureType: Record = Object.fromEntries( @@ -20,23 +19,11 @@ export type SigningResult = | { type: 'success', signature: Uint8Array } | { type: 'error', message: string } -/** - * Plain data type for message content that needs to be signed. - * This contains only primitive values and simple objects (no class instances). - */ -export interface SignaturePayloadData { - messageId: MessageIdLike - prevMsgRef?: MessageRefLike - messageType: StreamMessageType - content: Uint8Array - newGroupKey?: EncryptedGroupKey -} - /** * Complete signing request including private key and signature type. */ export interface SigningRequest { - payloadData: SignaturePayloadData + payloadInput: SignaturePayloadInput privateKey: Uint8Array signatureType: SignatureType } @@ -52,46 +39,10 @@ export async function createSignatureFromData(request: SigningRequest): Promise< return { type: 'error', message: `Unsupported signatureType: "${request.signatureType}"` } } - const payload = createSignaturePayload({ - messageId: request.payloadData.messageId, - content: request.payloadData.content, - messageType: request.payloadData.messageType, - prevMsgRef: request.payloadData.prevMsgRef, - newGroupKey: request.payloadData.newGroupKey, - }) - + const payload = createSignaturePayload(request.payloadInput) const signature = await signingUtil.createSignature(payload, request.privateKey) return { type: 'success', signature } } catch (err) { return { type: 'error', message: String(err) } } } - -/** - * Extract plain serializable payload data from message options for worker communication. - */ -export function toSignaturePayloadData(opts: { - messageId: MessageIdLike - prevMsgRef?: MessageRefLike - messageType: StreamMessageType - content: Uint8Array - newGroupKey?: EncryptedGroupKey -}): SignaturePayloadData { - return { - messageId: { - streamId: opts.messageId.streamId, - streamPartition: opts.messageId.streamPartition, - timestamp: opts.messageId.timestamp, - sequenceNumber: opts.messageId.sequenceNumber, - publisherId: opts.messageId.publisherId, - msgChainId: opts.messageId.msgChainId, - }, - prevMsgRef: opts.prevMsgRef ? { - timestamp: opts.prevMsgRef.timestamp, - sequenceNumber: opts.prevMsgRef.sequenceNumber, - } : undefined, - messageType: opts.messageType, - content: opts.content, - newGroupKey: opts.newGroupKey, - } -} From 5f158ab5143297bfbf6391d625a5ce9e8fb6f8a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 18:34:32 +0100 Subject: [PATCH 06/15] Make Identity#getPrivateKey() synchronous --- packages/sdk/src/identity/Identity.ts | 2 +- packages/sdk/src/identity/KeyPairIdentity.ts | 4 ++-- packages/sdk/src/signature/MessageSigner.ts | 10 ++++------ 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/sdk/src/identity/Identity.ts b/packages/sdk/src/identity/Identity.ts index fb25434ea3..0ea5a7be22 100644 --- a/packages/sdk/src/identity/Identity.ts +++ b/packages/sdk/src/identity/Identity.ts @@ -26,7 +26,7 @@ export abstract class Identity { * Returns undefined for identities that rely on external signers (e.g. browser wallets). */ // eslint-disable-next-line class-methods-use-this - getPrivateKey(): Promise | undefined { + getPrivateKey(): Uint8Array | undefined { return undefined } } diff --git a/packages/sdk/src/identity/KeyPairIdentity.ts b/packages/sdk/src/identity/KeyPairIdentity.ts index 6915187580..08ca87217f 100644 --- a/packages/sdk/src/identity/KeyPairIdentity.ts +++ b/packages/sdk/src/identity/KeyPairIdentity.ts @@ -30,8 +30,8 @@ export abstract class KeyPairIdentity extends Identity { return this.publicKeyString } - override getPrivateKey(): Promise { - return Promise.resolve(this.privateKey) + override getPrivateKey(): Uint8Array { + return this.privateKey } // eslint-disable-next-line class-methods-use-this diff --git a/packages/sdk/src/signature/MessageSigner.ts b/packages/sdk/src/signature/MessageSigner.ts index e7fa15430f..6b8b1bb92f 100644 --- a/packages/sdk/src/signature/MessageSigner.ts +++ b/packages/sdk/src/signature/MessageSigner.ts @@ -32,9 +32,9 @@ export class MessageSigner { // Use worker-based signing if identity provides a private key and signature type supports it // ERC-1271 signatures require on-chain verification and must be handled by the identity directly - const privateKeyPromise = this.identity.getPrivateKey() - if (privateKeyPromise !== undefined && signatureType !== SignatureType.ERC_1271) { - signature = await this.createSignatureInWorker(opts, signatureType, privateKeyPromise) + const privateKey = this.identity.getPrivateKey() + if (privateKey !== undefined && signatureType !== SignatureType.ERC_1271) { + signature = await this.createSignatureInWorker(opts, signatureType, privateKey) } else { signature = await this.identity.createMessageSignature(createSignaturePayload(opts)) } @@ -49,10 +49,8 @@ export class MessageSigner { private async createSignatureInWorker( opts: MarkRequired, 'messageType'>, signatureType: SignatureType, - privateKeyPromise: Promise + privateKey: Uint8Array ): Promise { - const privateKey = await privateKeyPromise - const result = await this.getSigning().createSignature({ payloadInput: opts, privateKey, From 384b718e00ed30958c0d22259bbbb8cbbd3cedd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 18:58:57 +0100 Subject: [PATCH 07/15] Reuse MessageSigner instance in test to avoid worker startup overhead --- .../unit/waitForAssignmentsToPropagate.test.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts b/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts index 014f9bed64..33d5981fce 100644 --- a/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts +++ b/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts @@ -1,4 +1,4 @@ -import { StreamID, toStreamID, toStreamPartID, utf8ToBinary, wait } from '@streamr/utils' +import { StreamID, toStreamID, toStreamPartID, UserID, utf8ToBinary, wait } from '@streamr/utils' import range from 'lodash/range' import shuffle from 'lodash/shuffle' import { MessageSigner } from '../../src/signature/MessageSigner' @@ -7,7 +7,6 @@ import { waitForAssignmentsToPropagate } from '../../src/utils/waitForAssignment import { createRandomIdentity, mockLoggerFactory } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' -import { Identity } from '../../src/identity/Identity' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' const RACE_TIMEOUT_IN_MS = 20 @@ -22,11 +21,12 @@ describe(waitForAssignmentsToPropagate, () => { let messageStream: MessageStream let propagatePromiseState: 'rejected' | 'resolved' | 'pending' let propagatePromise: Promise - let identity: Identity + let messageSigner: MessageSigner + let publisherId: UserID async function makeMsg(ts: number, content: unknown): Promise { - return new MessageSigner(identity).createSignedMessage({ - messageId: new MessageID(toStreamID('assignmentStreamId'), 0, ts, 0, await identity.getUserId(), 'msgChain'), + return messageSigner.createSignedMessage({ + messageId: new MessageID(toStreamID('assignmentStreamId'), 0, ts, 0, publisherId, 'msgChain'), messageType: StreamMessageType.MESSAGE, content: utf8ToBinary(JSON.stringify(content)), contentType: ContentType.JSON, @@ -46,7 +46,13 @@ describe(waitForAssignmentsToPropagate, () => { } beforeAll(async () => { - identity = await createRandomIdentity() + const identity = await createRandomIdentity() + messageSigner = new MessageSigner(identity) + publisherId = await identity.getUserId() + }) + + afterAll(() => { + messageSigner.destroy() }) beforeEach(() => { From 4145e1eb178f5f5641a9ff2a804c77cbc7587f58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 19:14:37 +0100 Subject: [PATCH 08/15] Inline browser worker dependencies to avoid chunk copying issues --- packages/sdk/rollup.config.mts | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/packages/sdk/rollup.config.mts b/packages/sdk/rollup.config.mts index 750f2b18a7..35efbadc6e 100644 --- a/packages/sdk/rollup.config.mts +++ b/packages/sdk/rollup.config.mts @@ -40,7 +40,7 @@ const WORKERS: Record = { export default defineConfig([ workersNodejs(), - workersBrowser(), + ...workersBrowser(), nodejs(), nodejsTypes(), browser(), @@ -250,18 +250,17 @@ function workersNodejs(): RollupOptions { /** * All worker bundles for browser - ESM format for use with web-worker {type: 'module'} + * Each worker is built as a self-contained bundle with all dependencies inlined. + * This avoids issues with webpack/karma not copying associated chunk files. */ -function workersBrowser(): RollupOptions { - return { - input: Object.fromEntries( - Object.entries(WORKERS).map(([name, path]) => [name, `./dist/browser/src/${path}.js`]) - ), +function workersBrowser(): RollupOptions[] { + return Object.entries(WORKERS).map(([name, path]) => ({ + input: `./dist/browser/src/${path}.js`, context: 'self', output: { format: 'es', - dir: './dist/workers', - entryFileNames: '[name].browser.mjs', - chunkFileNames: '[name]-[hash].browser.mjs', + file: `./dist/workers/${name}.browser.mjs`, + inlineDynamicImports: true, sourcemap: true, }, plugins: [ @@ -277,5 +276,5 @@ function workersBrowser(): RollupOptions { ], external: [], onwarn, - } + })) } From 90f46f35164e10f927d7db8d724a16e068794160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 19:54:15 +0100 Subject: [PATCH 09/15] Extract SigningService from MessageSigner for clearer separation of concerns - SigningService: Container-scoped service that owns the signing worker, provides sign() method, and handles worker lifecycle (lazy init, cleanup) - MessageSigner: Now injects SigningService instead of managing worker directly This decouples the worker management from message signing logic. Each StreamrClient gets its own SigningService instance, with the worker destroyed automatically when the client is destroyed. Test utilities updated with createMessageSigner() helper that uses a mock SigningService (no worker) for faster, simpler tests. --- packages/sdk/src/signature/MessageSigner.ts | 25 ++-------- packages/sdk/src/signature/Signing.ts | 28 ----------- packages/sdk/src/signature/SigningService.ts | 46 +++++++++++++++++++ .../end-to-end/publish-subscribe-raw.test.ts | 5 +- packages/sdk/test/integration/Resends.test.ts | 5 +- .../sdk/test/integration/Subscriber2.test.ts | 4 +- .../sdk/test/integration/gap-fill.test.ts | 5 +- .../integration/parallel-key-exchange.test.ts | 5 +- .../test/integration/waitForStorage.test.ts | 4 +- packages/sdk/test/test-utils/utils.ts | 23 +++++++++- packages/sdk/test/unit/MessageFactory.test.ts | 5 +- packages/sdk/test/unit/MessageStream.test.ts | 4 +- packages/sdk/test/unit/PushPipeline.test.ts | 4 +- .../sdk/test/unit/messagePipeline.test.ts | 5 +- .../sdk/test/unit/resendSubscription.test.ts | 5 +- .../test/unit/validateStreamMessage2.test.ts | 9 ++-- .../waitForAssignmentsToPropagate.test.ts | 8 +--- 17 files changed, 101 insertions(+), 89 deletions(-) delete mode 100644 packages/sdk/src/signature/Signing.ts create mode 100644 packages/sdk/src/signature/SigningService.ts diff --git a/packages/sdk/src/signature/MessageSigner.ts b/packages/sdk/src/signature/MessageSigner.ts index 6b8b1bb92f..bf46e07382 100644 --- a/packages/sdk/src/signature/MessageSigner.ts +++ b/packages/sdk/src/signature/MessageSigner.ts @@ -4,24 +4,19 @@ import { Identity, IdentityInjectionToken } from '../identity/Identity' import { StreamMessage, StreamMessageOptions } from '../protocol/StreamMessage' import { createSignaturePayload } from './createSignaturePayload' import { SignatureType } from '@streamr/trackerless-network' -import { Signing } from './Signing' -import { DestroySignal } from '../DestroySignal' +import { SigningService } from './SigningService' @scoped(Lifecycle.ContainerScoped) export class MessageSigner { private readonly identity: Identity - private signing: Signing | undefined + private readonly signingService: SigningService constructor( @inject(IdentityInjectionToken) identity: Identity, - destroySignal?: DestroySignal + signingService: SigningService ) { this.identity = identity - destroySignal?.onDestroy.listen(() => this.destroy()) - } - - private getSigning(): Signing { - return this.signing ??= new Signing() + this.signingService = signingService } async createSignedMessage( @@ -51,7 +46,7 @@ export class MessageSigner { signatureType: SignatureType, privateKey: Uint8Array ): Promise { - const result = await this.getSigning().createSignature({ + const result = await this.signingService.sign({ payloadInput: opts, privateKey, signatureType @@ -63,14 +58,4 @@ export class MessageSigner { return result.signature } - - /** - * Cleanup worker resources when the signer is no longer needed. - */ - destroy(): void { - if (this.signing) { - this.signing.destroy() - this.signing = undefined - } - } } diff --git a/packages/sdk/src/signature/Signing.ts b/packages/sdk/src/signature/Signing.ts deleted file mode 100644 index f69d8a352f..0000000000 --- a/packages/sdk/src/signature/Signing.ts +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Unified signing using Web Worker. - * This offloads CPU-intensive cryptographic operations to a separate thread. - * Works in both browser and Node.js environments via platform-specific config. - */ -import { wrap, releaseProxy, type Remote } from 'comlink' -import { createSigningWorker } from '@/createSigningWorker' -import { SigningResult, SigningRequest } from './signingUtils' -import type { SigningWorkerApi } from './SigningWorker' - -export class Signing { - private worker: ReturnType - private workerApi: Remote - - constructor() { - this.worker = createSigningWorker() - this.workerApi = wrap(this.worker) - } - - async createSignature(request: SigningRequest): Promise { - return this.workerApi.createSignature(request) - } - - destroy(): void { - this.workerApi[releaseProxy]() - this.worker.terminate() - } -} diff --git a/packages/sdk/src/signature/SigningService.ts b/packages/sdk/src/signature/SigningService.ts new file mode 100644 index 0000000000..755f7c1fe1 --- /dev/null +++ b/packages/sdk/src/signature/SigningService.ts @@ -0,0 +1,46 @@ +/** + * Singleton signing service using Web Worker. + * This offloads CPU-intensive cryptographic operations to a separate thread. + * Works in both browser and Node.js environments via platform-specific config. + * + * The worker is lazily initialized on first use and shared across all MessageSigner instances. + */ +import { wrap, releaseProxy, type Remote } from 'comlink' +import { Lifecycle, scoped } from 'tsyringe' +import { createSigningWorker } from '@/createSigningWorker' +import { SigningResult, SigningRequest } from './signingUtils' +import type { SigningWorkerApi } from './SigningWorker' +import { DestroySignal } from '../DestroySignal' + +@scoped(Lifecycle.ContainerScoped) +export class SigningService { + private worker: ReturnType | undefined + private workerApi: Remote | undefined + + constructor(destroySignal: DestroySignal) { + destroySignal.onDestroy.listen(() => this.destroy()) + } + + private getWorkerApi(): Remote { + if (this.workerApi === undefined) { + this.worker = createSigningWorker() + this.workerApi = wrap(this.worker) + } + return this.workerApi + } + + async sign(request: SigningRequest): Promise { + return this.getWorkerApi().createSignature(request) + } + + destroy(): void { + if (this.workerApi !== undefined) { + this.workerApi[releaseProxy]() + this.workerApi = undefined + } + if (this.worker !== undefined) { + this.worker.terminate() + this.worker = undefined + } + } +} diff --git a/packages/sdk/test/end-to-end/publish-subscribe-raw.test.ts b/packages/sdk/test/end-to-end/publish-subscribe-raw.test.ts index ee4a796d43..e05a4f5c9f 100644 --- a/packages/sdk/test/end-to-end/publish-subscribe-raw.test.ts +++ b/packages/sdk/test/end-to-end/publish-subscribe-raw.test.ts @@ -1,8 +1,7 @@ import { StreamID, toUserId } from '@streamr/utils' -import { createTestClient, createTestStream } from '../test-utils/utils' +import { createMessageSigner, createTestClient, createTestStream } from '../test-utils/utils' import { nextValue } from '../../src/utils/iterators' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' -import { MessageSigner } from '../../src/signature/MessageSigner' import { Wallet } from 'ethers' import { MessageID } from '../../src/protocol/MessageID' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' @@ -29,7 +28,7 @@ describe('publish-subscribe-raw', () => { }) async function createTestMessage() { - const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(publisherWallet.privateKey)) + const messageSigner = createMessageSigner(EthereumKeyPairIdentity.fromPrivateKey(publisherWallet.privateKey)) return await messageSigner.createSignedMessage({ messageId: new MessageID(streamId, 0, 123456789, 0, toUserId(publisherWallet.address), 'mock-msgChainId'), content: new Uint8Array([1, 2, 3]), diff --git a/packages/sdk/test/integration/Resends.test.ts b/packages/sdk/test/integration/Resends.test.ts index b6ebd3b5de..301433a375 100644 --- a/packages/sdk/test/integration/Resends.test.ts +++ b/packages/sdk/test/integration/Resends.test.ts @@ -6,10 +6,9 @@ import { StreamrClient } from '../../src/StreamrClient' import { GroupKey } from '../../src/encryption/GroupKey' import { StreamPermission } from '../../src/permission' import { MessageFactory } from '../../src/publish/MessageFactory' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' -import { createGroupKeyQueue, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { createStrictConfig } from '../../src/Config' @@ -45,7 +44,7 @@ describe('Resends', () => { streamRegistry: createStreamRegistry(), groupKeyQueue: await createGroupKeyQueue(identity, groupKey), signatureValidator: mock(), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: createStrictConfig() }) // store the encryption key publisher's local group key store diff --git a/packages/sdk/test/integration/Subscriber2.test.ts b/packages/sdk/test/integration/Subscriber2.test.ts index c4468181c5..c81899996d 100644 --- a/packages/sdk/test/integration/Subscriber2.test.ts +++ b/packages/sdk/test/integration/Subscriber2.test.ts @@ -10,7 +10,7 @@ import { MessageSigner } from '../../src/signature/MessageSigner' import { Subscription } from '../../src/subscribe/Subscription' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' import { getPublishTestStreamMessages } from '../test-utils/publish' -import { createTestStream } from '../test-utils/utils' +import { createMessageSigner, createTestStream } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' @@ -69,7 +69,7 @@ describe('Subscriber', () => { } }) const publisherIdentity = EthereumKeyPairIdentity.fromPrivateKey(publisherWallet.privateKey) - messageSigner = new MessageSigner(publisherIdentity) + messageSigner = createMessageSigner(publisherIdentity) }) afterAll(async () => { diff --git a/packages/sdk/test/integration/gap-fill.test.ts b/packages/sdk/test/integration/gap-fill.test.ts index 8162ac09f9..d577ce120b 100644 --- a/packages/sdk/test/integration/gap-fill.test.ts +++ b/packages/sdk/test/integration/gap-fill.test.ts @@ -4,10 +4,9 @@ import { Wallet } from 'ethers' import { mock } from 'jest-mock-extended' import { GroupKey } from '../../src/encryption/GroupKey' import { StreamMessage } from '../../src/protocol/StreamMessage' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' -import { createGroupKeyQueue, createStreamRegistry, createTestStream, startFailingStorageNode } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createStreamRegistry, createTestStream, startFailingStorageNode } from '../test-utils/utils' import { Stream } from './../../src/Stream' import { MessageFactory } from './../../src/publish/MessageFactory' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' @@ -45,7 +44,7 @@ describe('gap fill', () => { streamRegistry: createStreamRegistry(), groupKeyQueue: await createGroupKeyQueue(identity, GROUP_KEY), signatureValidator: mock(), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: createStrictConfig() }) }) diff --git a/packages/sdk/test/integration/parallel-key-exchange.test.ts b/packages/sdk/test/integration/parallel-key-exchange.test.ts index 08853bd974..ff6f872ed7 100644 --- a/packages/sdk/test/integration/parallel-key-exchange.test.ts +++ b/packages/sdk/test/integration/parallel-key-exchange.test.ts @@ -9,9 +9,8 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { StreamPermission } from '../../src/permission' import { StreamMessageType } from '../../src/protocol/StreamMessage' import { MessageFactory } from '../../src/publish/MessageFactory' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' -import { createGroupKeyQueue, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' import { FakeEnvironment } from './../test-utils/fake/FakeEnvironment' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { createStrictConfig } from '../../src/Config' @@ -73,7 +72,7 @@ describe('parallel key exchange', () => { }), groupKeyQueue: await createGroupKeyQueue(identity, publisher.groupKey), signatureValidator: mock(), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: createStrictConfig() }) for (let i = 0; i < MESSAGE_COUNT_PER_PUBLISHER; i++) { diff --git a/packages/sdk/test/integration/waitForStorage.test.ts b/packages/sdk/test/integration/waitForStorage.test.ts index a84ea00b16..a729fe05c5 100644 --- a/packages/sdk/test/integration/waitForStorage.test.ts +++ b/packages/sdk/test/integration/waitForStorage.test.ts @@ -6,7 +6,7 @@ import { StreamPermission } from '../../src/permission' import { MessageSigner } from '../../src/signature/MessageSigner' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' import { FakeStorageNode } from '../test-utils/fake/FakeStorageNode' -import { MOCK_CONTENT, createRandomIdentity, createRelativeTestStreamId } from '../test-utils/utils' +import { MOCK_CONTENT, createMessageSigner, createRandomIdentity, createRelativeTestStreamId } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessageType } from './../../src/protocol/StreamMessage' import { randomUserId } from '@streamr/test-utils' @@ -23,7 +23,7 @@ describe('waitForStorage', () => { let environment: FakeEnvironment beforeEach(async () => { - messageSigner = new MessageSigner(await createRandomIdentity()) + messageSigner = createMessageSigner(await createRandomIdentity()) environment = new FakeEnvironment() client = environment.createClient() stream = await client.createStream({ diff --git a/packages/sdk/test/test-utils/utils.ts b/packages/sdk/test/test-utils/utils.ts index b97bf96e77..8ea62c0fc2 100644 --- a/packages/sdk/test/test-utils/utils.ts +++ b/packages/sdk/test/test-utils/utils.ts @@ -49,6 +49,8 @@ import { StreamMessage } from '../../src/protocol/StreamMessage' import { GroupKeyQueue } from '../../src/publish/GroupKeyQueue' import { MessageFactory } from '../../src/publish/MessageFactory' import { MessageSigner } from '../../src/signature/MessageSigner' +import { SigningService } from '../../src/signature/SigningService' +import { createSignatureFromData } from '../../src/signature/signingUtils' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { LoggerFactory } from '../../src/utils/LoggerFactory' import { counterId } from '../../src/utils/utils' @@ -59,6 +61,25 @@ import { StreamIDBuilder } from '../../src/StreamIDBuilder' const logger = new Logger('sdk-test-utils') +/** + * Creates a mock SigningService that performs signing synchronously on the main thread. + * Use this in tests instead of the real SigningService which spawns a worker. + */ +export function createMockSigningService(): SigningService { + return { + sign: createSignatureFromData, + destroy: () => {} + } as unknown as SigningService +} + +/** + * Creates a MessageSigner for testing purposes. + * Uses a mock SigningService that doesn't spawn a worker. + */ +export function createMessageSigner(identity: Identity): MessageSigner { + return new MessageSigner(identity, createMockSigningService()) +} + export function mockLoggerFactory(clientId?: string): LoggerFactory { return new LoggerFactory({ id: clientId ?? counterId('TestCtx'), @@ -152,7 +173,7 @@ export const createMockMessage = async ( }), groupKeyQueue: await createGroupKeyQueue(identity, opts.encryptionKey, opts.nextEncryptionKey), signatureValidator: mock(), - messageSigner: new MessageSigner(identity) + messageSigner: createMessageSigner(identity) }) const DEFAULT_CONTENT = {} const plainContent = opts.content ?? DEFAULT_CONTENT diff --git a/packages/sdk/test/unit/MessageFactory.test.ts b/packages/sdk/test/unit/MessageFactory.test.ts index 03d09f31d3..76d0edb8bb 100644 --- a/packages/sdk/test/unit/MessageFactory.test.ts +++ b/packages/sdk/test/unit/MessageFactory.test.ts @@ -9,9 +9,8 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyQueue } from '../../src/publish/GroupKeyQueue' import { MessageFactory, MessageFactoryOptions } from '../../src/publish/MessageFactory' import { PublishMetadata } from '../../src/publish/Publisher' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' -import { createGroupKeyQueue, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { EncryptionType, SignatureType, ContentType } from '@streamr/trackerless-network' @@ -60,7 +59,7 @@ describe('MessageFactory', () => { }), groupKeyQueue: await createGroupKeyQueue(identity, GROUP_KEY), signatureValidator: new SignatureValidator(opts?.erc1271ContractFacade ?? mock(), new DestroySignal()), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: { validation: { permissions: true, diff --git a/packages/sdk/test/unit/MessageStream.test.ts b/packages/sdk/test/unit/MessageStream.test.ts index 7022601f62..ac6c540838 100644 --- a/packages/sdk/test/unit/MessageStream.test.ts +++ b/packages/sdk/test/unit/MessageStream.test.ts @@ -3,7 +3,7 @@ import omit from 'lodash/omit' import { MessageSigner } from '../../src/signature/MessageSigner' import { MessageStream } from '../../src/subscribe/MessageStream' import { Msg } from '../test-utils/publish' -import { createRandomIdentity, waitForCalls } from '../test-utils/utils' +import { createMessageSigner, createRandomIdentity, waitForCalls } from '../test-utils/utils' import { convertStreamMessageToMessage } from './../../src/Message' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessageType } from './../../src/protocol/StreamMessage' @@ -28,7 +28,7 @@ describe('MessageStream', () => { } beforeEach(async () => { - messageSigner = new MessageSigner(await createRandomIdentity()) + messageSigner = createMessageSigner(await createRandomIdentity()) }) it('onMessage', async () => { diff --git a/packages/sdk/test/unit/PushPipeline.test.ts b/packages/sdk/test/unit/PushPipeline.test.ts index 5246f0402f..b0fcf20a7d 100644 --- a/packages/sdk/test/unit/PushPipeline.test.ts +++ b/packages/sdk/test/unit/PushPipeline.test.ts @@ -6,7 +6,7 @@ import { PushPipeline } from '../../src/utils/PushPipeline' import { counterId, instanceId } from '../../src/utils/utils' import { LeaksDetector } from '../test-utils/LeaksDetector' import { Msg } from '../test-utils/publish' -import { createRandomIdentity } from '../test-utils/utils' +import { createMessageSigner, createRandomIdentity } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' @@ -31,7 +31,7 @@ describe('PushPipeline', () => { beforeEach(async () => { leaksDetector = new LeaksDetector() - messageSigner = new MessageSigner(await createRandomIdentity()) + messageSigner = createMessageSigner(await createRandomIdentity()) }) afterEach(async () => { diff --git a/packages/sdk/test/unit/messagePipeline.test.ts b/packages/sdk/test/unit/messagePipeline.test.ts index c267813b6f..4f9d00f773 100644 --- a/packages/sdk/test/unit/messagePipeline.test.ts +++ b/packages/sdk/test/unit/messagePipeline.test.ts @@ -11,11 +11,10 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyManager } from '../../src/encryption/GroupKeyManager' import { SubscriberKeyExchange } from '../../src/encryption/SubscriberKeyExchange' import { StreamrClientEventEmitter } from '../../src/events' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { createMessagePipeline } from '../../src/subscribe/messagePipeline' import { PushPipeline } from '../../src/utils/PushPipeline' -import { mockLoggerFactory } from '../test-utils/utils' +import { createMessageSigner, mockLoggerFactory } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { EncryptionType, ContentType, SignatureType } from '@streamr/trackerless-network' @@ -40,7 +39,7 @@ describe('messagePipeline', () => { contentType?: ContentType } = {}): Promise => { const [streamId, partition] = StreamPartIDUtils.getStreamIDAndPartition(streamPartId) - const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(publisher.privateKey)) + const messageSigner = createMessageSigner(EthereumKeyPairIdentity.fromPrivateKey(publisher.privateKey)) return messageSigner.createSignedMessage({ messageId: new MessageID( streamId, diff --git a/packages/sdk/test/unit/resendSubscription.test.ts b/packages/sdk/test/unit/resendSubscription.test.ts index 40b8caae6a..c716c48a96 100644 --- a/packages/sdk/test/unit/resendSubscription.test.ts +++ b/packages/sdk/test/unit/resendSubscription.test.ts @@ -6,13 +6,12 @@ import isEqual from 'lodash/isEqual' import last from 'lodash/last' import { Message } from '../../src/Message' import { MessageFactory } from '../../src/publish/MessageFactory' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { ResendRangeOptions } from '../../src/subscribe/Resends' import { Subscription, SubscriptionEvents } from '../../src/subscribe/Subscription' import { initResendSubscription } from '../../src/subscribe/resendSubscription' import { PushPipeline } from '../../src/utils/PushPipeline' -import { createGroupKeyQueue, createRandomIdentity, createStreamRegistry, mockLoggerFactory } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createRandomIdentity, createStreamRegistry, mockLoggerFactory } from '../test-utils/utils' import { StreamMessage } from './../../src/protocol/StreamMessage' import { createStrictConfig } from '../../src/Config' @@ -61,7 +60,7 @@ describe('resend subscription', () => { }), groupKeyQueue: await createGroupKeyQueue(identity), signatureValidator: mock(), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: createStrictConfig(), }) }) diff --git a/packages/sdk/test/unit/validateStreamMessage2.test.ts b/packages/sdk/test/unit/validateStreamMessage2.test.ts index 9010e93cba..3dc617dc35 100644 --- a/packages/sdk/test/unit/validateStreamMessage2.test.ts +++ b/packages/sdk/test/unit/validateStreamMessage2.test.ts @@ -4,10 +4,9 @@ import { mock } from 'jest-mock-extended' import { Identity } from '../../src/identity/Identity' import { StreamMetadata } from '../../src/StreamMetadata' import { ERC1271ContractFacade } from '../../src/contracts/ERC1271ContractFacade' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { validateStreamMessage } from '../../src/utils/validateStreamMessage' -import { MOCK_CONTENT, createRandomIdentity } from '../test-utils/utils' +import { MOCK_CONTENT, createMessageSigner, createRandomIdentity } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { MessageRef } from './../../src/protocol/MessageRef' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' @@ -20,7 +19,7 @@ const groupKeyRequestToStreamMessage = async ( prevMsgRef: MessageRef | undefined, identity: Identity ): Promise => { - const messageSigner = new MessageSigner(identity) + const messageSigner = createMessageSigner(identity) return messageSigner.createSignedMessage({ messageId, prevMsgRef, @@ -37,7 +36,7 @@ const groupKeyResponseToStreamMessage = async ( prevMsgRef: MessageRef | undefined, identity: Identity ): Promise => { - const messageSigner = new MessageSigner(identity) + const messageSigner = createMessageSigner(identity) return messageSigner.createSignedMessage({ messageId, prevMsgRef, @@ -100,7 +99,7 @@ describe('Validator2', () => { return userId === subscriber && streamId === 'streamId' } - const publisherSigner = new MessageSigner(publisherIdentity) + const publisherSigner = createMessageSigner(publisherIdentity) msg = await publisherSigner.createSignedMessage({ messageId: new MessageID(toStreamID('streamId'), 0, 0, 0, publisher, 'msgChainId'), diff --git a/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts b/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts index 33d5981fce..e268bf0e83 100644 --- a/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts +++ b/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts @@ -4,7 +4,7 @@ import shuffle from 'lodash/shuffle' import { MessageSigner } from '../../src/signature/MessageSigner' import { MessageStream } from '../../src/subscribe/MessageStream' import { waitForAssignmentsToPropagate } from '../../src/utils/waitForAssignmentsToPropagate' -import { createRandomIdentity, mockLoggerFactory } from '../test-utils/utils' +import { createMessageSigner, createRandomIdentity, mockLoggerFactory } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' @@ -47,14 +47,10 @@ describe(waitForAssignmentsToPropagate, () => { beforeAll(async () => { const identity = await createRandomIdentity() - messageSigner = new MessageSigner(identity) + messageSigner = createMessageSigner(identity) publisherId = await identity.getUserId() }) - afterAll(() => { - messageSigner.destroy() - }) - beforeEach(() => { messageStream = new MessageStream() propagatePromiseState = 'pending' From 902eb49bd5c4c98cb262e10921f2cd4d92a47037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 20:08:06 +0100 Subject: [PATCH 10/15] Add unit tests for SigningService with actual worker --- packages/sdk/test/unit/SigningService.test.ts | 190 ++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 packages/sdk/test/unit/SigningService.test.ts diff --git a/packages/sdk/test/unit/SigningService.test.ts b/packages/sdk/test/unit/SigningService.test.ts new file mode 100644 index 0000000000..cbbf334445 --- /dev/null +++ b/packages/sdk/test/unit/SigningService.test.ts @@ -0,0 +1,190 @@ +import { toStreamID, utf8ToBinary } from '@streamr/utils' +import { SignatureType } from '@streamr/trackerless-network' +import { SigningService } from '../../src/signature/SigningService' +import { SigningRequest } from '../../src/signature/signingUtils' +import { StreamMessageType } from '../../src/protocol/StreamMessage' +import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' +import { DestroySignal } from '../../src/DestroySignal' +import { createSignaturePayload } from '../../src/signature/createSignaturePayload' +import { EcdsaSecp256k1Evm } from '@streamr/utils' + +describe('SigningService', () => { + + let signingService: SigningService + let destroySignal: DestroySignal + + beforeEach(() => { + destroySignal = new DestroySignal() + signingService = new SigningService(destroySignal) + }) + + afterEach(() => { + signingService.destroy() + }) + + it('signs a message using the worker and produces a valid signature', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + const payloadInput = { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now(), + sequenceNumber: 0, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ hello: 'world' })), + messageType: StreamMessageType.MESSAGE + } + + const request: SigningRequest = { + payloadInput, + privateKey, + signatureType: SignatureType.ECDSA_SECP256K1_EVM + } + + const result = await signingService.sign(request) + + if (result.type !== 'success') { + throw new Error(`Expected success but got error: ${result.message}`) + } + expect(result.signature).toBeInstanceOf(Uint8Array) + expect(result.signature.length).toBeGreaterThan(0) + + // Verify the signature is valid by checking it against the payload + const payload = createSignaturePayload(payloadInput) + const signingUtil = new EcdsaSecp256k1Evm() + const isValid = await signingUtil.verifySignature(await identity.getUserIdRaw(), payload, result.signature) + expect(isValid).toBe(true) + }) + + it('can sign multiple messages sequentially', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + const signatures: Uint8Array[] = [] + + for (let i = 0; i < 3; i++) { + const request: SigningRequest = { + payloadInput: { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now() + i, + sequenceNumber: i, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ index: i })), + messageType: StreamMessageType.MESSAGE + }, + privateKey, + signatureType: SignatureType.ECDSA_SECP256K1_EVM + } + + const result = await signingService.sign(request) + if (result.type !== 'success') { + throw new Error(`Expected success but got error: ${result.message}`) + } + signatures.push(result.signature) + } + + expect(signatures).toHaveLength(3) + // All signatures should be different (different payloads) + expect(new Set(signatures.map(s => Buffer.from(s).toString('hex'))).size).toBe(3) + }) + + it('returns error for unsupported signature type', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + const request: SigningRequest = { + payloadInput: { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now(), + sequenceNumber: 0, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ hello: 'world' })), + messageType: StreamMessageType.MESSAGE + }, + privateKey, + signatureType: 999 as SignatureType // Invalid signature type + } + + const result = await signingService.sign(request) + + if (result.type !== 'error') { + throw new Error('Expected error but got success') + } + expect(result.message).toContain('Unsupported signatureType') + }) + + it('cleans up worker on destroy', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + // First sign to ensure worker is created + const request: SigningRequest = { + payloadInput: { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now(), + sequenceNumber: 0, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ hello: 'world' })), + messageType: StreamMessageType.MESSAGE + }, + privateKey, + signatureType: SignatureType.ECDSA_SECP256K1_EVM + } + + await signingService.sign(request) + + // Destroy should not throw + expect(() => signingService.destroy()).not.toThrow() + + // Calling destroy again should be safe (idempotent) + expect(() => signingService.destroy()).not.toThrow() + }) + + it('cleans up via DestroySignal', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + const request: SigningRequest = { + payloadInput: { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now(), + sequenceNumber: 0, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ hello: 'world' })), + messageType: StreamMessageType.MESSAGE + }, + privateKey, + signatureType: SignatureType.ECDSA_SECP256K1_EVM + } + + await signingService.sign(request) + + // Trigger destroy via signal - should not throw + await destroySignal.destroy() + }) +}) From 618b89e483fb9f793f00dceaecdd57e120149b9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 20:29:40 +0100 Subject: [PATCH 11/15] Eslint --- packages/sdk/test/unit/SigningService.test.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/sdk/test/unit/SigningService.test.ts b/packages/sdk/test/unit/SigningService.test.ts index cbbf334445..efa36b5a8e 100644 --- a/packages/sdk/test/unit/SigningService.test.ts +++ b/packages/sdk/test/unit/SigningService.test.ts @@ -1,4 +1,4 @@ -import { toStreamID, utf8ToBinary } from '@streamr/utils' +import { toStreamID, utf8ToBinary, EcdsaSecp256k1Evm } from '@streamr/utils' import { SignatureType } from '@streamr/trackerless-network' import { SigningService } from '../../src/signature/SigningService' import { SigningRequest } from '../../src/signature/signingUtils' @@ -6,7 +6,6 @@ import { StreamMessageType } from '../../src/protocol/StreamMessage' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { DestroySignal } from '../../src/DestroySignal' import { createSignaturePayload } from '../../src/signature/createSignaturePayload' -import { EcdsaSecp256k1Evm } from '@streamr/utils' describe('SigningService', () => { @@ -95,7 +94,7 @@ describe('SigningService', () => { expect(signatures).toHaveLength(3) // All signatures should be different (different payloads) - expect(new Set(signatures.map(s => Buffer.from(s).toString('hex'))).size).toBe(3) + expect(new Set(signatures.map((s) => Buffer.from(s).toString('hex'))).size).toBe(3) }) it('returns error for unsupported signature type', async () => { From ae85fd79a4141c76bef7bae0d103f410f417860a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 20:54:06 +0100 Subject: [PATCH 12/15] Export DestroySignal and SigningService from SDK for external usage --- packages/cli-tools/test/stream-publish.test.ts | 15 +++++++++++++-- packages/sdk/src/exports.ts | 2 ++ .../sequential-resend-subscribe.test.ts | 2 +- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/packages/cli-tools/test/stream-publish.test.ts b/packages/cli-tools/test/stream-publish.test.ts index 1182e48713..bd74ef12ef 100644 --- a/packages/cli-tools/test/stream-publish.test.ts +++ b/packages/cli-tools/test/stream-publish.test.ts @@ -5,10 +5,12 @@ import { EthereumKeyPairIdentity, MessageID, MessageSigner, + SigningService, SignatureType, StreamMessageType, StreamPermission, - StreamrClient + StreamrClient, + DestroySignal } from '@streamr/sdk' import { createTestPrivateKey } from '@streamr/test-utils' import { binaryToHex, keyToArrayIndex, StreamID, toLengthPrefixedFrame, toUserId, UserID } from '@streamr/utils' @@ -24,6 +26,15 @@ describe('stream-publish', () => { let publisherPrivateKey: string let subscriberPrivateKey: string let streamCreatorPrivateKey: string + let signingService: SigningService + + beforeAll(() => { + signingService = new SigningService(new DestroySignal()) + }) + + afterAll(() => { + signingService.destroy() + }) function createSubscriber(): StreamrClient { return createTestClient(subscriberPrivateKey) @@ -39,7 +50,7 @@ describe('stream-publish', () => { } async function createTestMessage(streamId: StreamID, partition: number, privateKey: string, content: Uint8Array, timestamp: number) { - const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(privateKey)) + const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(privateKey), signingService) return await messageSigner.createSignedMessage({ messageId: new MessageID(streamId, partition, timestamp, 0, toUserId(new Wallet(privateKey).address), 'mock-msgChainId'), content, diff --git a/packages/sdk/src/exports.ts b/packages/sdk/src/exports.ts index 2a9549451e..0988cb88ce 100644 --- a/packages/sdk/src/exports.ts +++ b/packages/sdk/src/exports.ts @@ -48,6 +48,7 @@ export { ConfigInjectionToken, } from './ConfigTypes' +export { DestroySignal } from './DestroySignal' export { DEFAULT_KEY_TYPE } from './identity/IdentityMapping' export { GroupKey as EncryptionKey } from './encryption/GroupKey' export type { UpdateEncryptionKeyOptions } from './encryption/LocalGroupKeyStore' @@ -60,6 +61,7 @@ export { EthereumProviderIdentity } from './identity/EthereumProviderIdentity' export { MLDSAKeyPairIdentity } from './identity/MLDSAKeyPairIdentity' export { ECDSAKeyPairIdentity } from './identity/ECDSAKeyPairIdentity' export { MessageSigner } from './signature/MessageSigner' +export { SigningService } from './signature/SigningService' export { RpcProviderSource } from './RpcProviderSource' export { convertBytesToStreamMessage, convertStreamMessageToBytes } from './protocol/oldStreamMessageBinaryUtils' diff --git a/packages/sdk/test/integration/sequential-resend-subscribe.test.ts b/packages/sdk/test/integration/sequential-resend-subscribe.test.ts index 20153852fc..6b3aa08c5c 100644 --- a/packages/sdk/test/integration/sequential-resend-subscribe.test.ts +++ b/packages/sdk/test/integration/sequential-resend-subscribe.test.ts @@ -75,7 +75,7 @@ describe('sequential resend subscribe', () => { const expectedMessageCount = published.length + 1 // the realtime message which we publish next const receivedMsgsPromise = collect(sub, expectedMessageCount) - await until(() => onResent.mock.calls.length > 0) + await until(() => onResent.mock.calls.length > 0, 15_000) const streamMessage = await publisher.publish(stream.id, Msg(), { // should be realtime timestamp: id }) From 718b098f4da87bea3eaff61246a5ba48869b7faf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 21:08:54 +0100 Subject: [PATCH 13/15] Register SigningService in FakeEnvironment dependency container This makes the fake env create clients that share the same signing worker. --- packages/sdk/test/test-utils/fake/FakeEnvironment.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/sdk/test/test-utils/fake/FakeEnvironment.ts b/packages/sdk/test/test-utils/fake/FakeEnvironment.ts index 206c5117df..3d530952c2 100644 --- a/packages/sdk/test/test-utils/fake/FakeEnvironment.ts +++ b/packages/sdk/test/test-utils/fake/FakeEnvironment.ts @@ -21,6 +21,7 @@ import { FakeStorageNodeRegistry } from './FakeStorageNodeRegistry' import { FakeStreamRegistry } from './FakeStreamRegistry' import { FakeStreamStorageRegistry } from './FakeStreamStorageRegistry' import { DestroySignal } from '../../../src/DestroySignal' +import { SigningService } from '../../../src/signature/SigningService' const DEFAULT_CLIENT_OPTIONS: StreamrClientConfig = { encryption: { @@ -34,13 +35,16 @@ export class FakeEnvironment { private chain: FakeChain private logger: FakeLogger private dependencyContainer: DependencyContainer - private destroySignal = new DestroySignal() + private destroySignal: DestroySignal + private signingService: SigningService constructor() { this.network = new FakeNetwork() this.chain = new FakeChain() this.logger = new FakeLogger() this.dependencyContainer = container.createChildContainer() + this.destroySignal = new DestroySignal() + this.signingService = new SigningService(this.destroySignal) const loggerFactory = { createLogger: () => this.logger } @@ -53,6 +57,7 @@ export class FakeEnvironment { this.dependencyContainer.register(StreamStorageRegistry, FakeStreamStorageRegistry as any) this.dependencyContainer.register(StorageNodeRegistry, FakeStorageNodeRegistry as any) this.dependencyContainer.register(OperatorRegistry, FakeOperatorRegistry as any) + this.dependencyContainer.register(SigningService, { useValue: this.signingService }) } createClient(opts?: StreamrClientConfig): StreamrClient { From 3e84fc6d3beacf150a8ac80f9510a71bd1408ac6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 21:21:55 +0100 Subject: [PATCH 14/15] Increase test timeout for CLI tools Firing up them SigningService instances takes a while in tests. --- packages/cli-tools/jest.config.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cli-tools/jest.config.ts b/packages/cli-tools/jest.config.ts index 1c1d1df1b3..b0f4aa7f87 100644 --- a/packages/cli-tools/jest.config.ts +++ b/packages/cli-tools/jest.config.ts @@ -3,6 +3,7 @@ import defaultConfig from '../../jest.config' const config: Config.InitialOptions = { ...defaultConfig, + testTimeout: 15_000, setupFilesAfterEnv: [ ...defaultConfig.setupFilesAfterEnv, '@streamr/test-utils/setupCustomMatchers' From 320947619948c2534d1c28f555c1a9e228211c17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mariusz=20Roli=C5=84ski?= Date: Thu, 29 Jan 2026 21:53:11 +0100 Subject: [PATCH 15/15] Use transferable objects for signing worker communication Transfer signature buffer back to main thread instead of copying, improving performance for large messages. --- packages/sdk/src/signature/SigningWorker.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/sdk/src/signature/SigningWorker.ts b/packages/sdk/src/signature/SigningWorker.ts index 275d792a53..da56432a49 100644 --- a/packages/sdk/src/signature/SigningWorker.ts +++ b/packages/sdk/src/signature/SigningWorker.ts @@ -1,4 +1,4 @@ -import { expose } from 'comlink' +import { expose, transfer } from 'comlink' import { createSignatureFromData, SigningResult, @@ -9,7 +9,11 @@ const workerApi = { createSignature: async ( request: SigningRequest ): Promise => { - return createSignatureFromData(request) + const result = await createSignatureFromData(request) + if (result.type === 'success') { + return transfer(result, [result.signature.buffer]) + } + return result }, }