diff --git a/packages/cli-tools/jest.config.ts b/packages/cli-tools/jest.config.ts index 1c1d1df1b3..b0f4aa7f87 100644 --- a/packages/cli-tools/jest.config.ts +++ b/packages/cli-tools/jest.config.ts @@ -3,6 +3,7 @@ import defaultConfig from '../../jest.config' const config: Config.InitialOptions = { ...defaultConfig, + testTimeout: 15_000, setupFilesAfterEnv: [ ...defaultConfig.setupFilesAfterEnv, '@streamr/test-utils/setupCustomMatchers' diff --git a/packages/cli-tools/test/stream-publish.test.ts b/packages/cli-tools/test/stream-publish.test.ts index 1182e48713..bd74ef12ef 100644 --- a/packages/cli-tools/test/stream-publish.test.ts +++ b/packages/cli-tools/test/stream-publish.test.ts @@ -5,10 +5,12 @@ import { EthereumKeyPairIdentity, MessageID, MessageSigner, + SigningService, SignatureType, StreamMessageType, StreamPermission, - StreamrClient + StreamrClient, + DestroySignal } from '@streamr/sdk' import { createTestPrivateKey } from '@streamr/test-utils' import { binaryToHex, keyToArrayIndex, StreamID, toLengthPrefixedFrame, toUserId, UserID } from '@streamr/utils' @@ -24,6 +26,15 @@ describe('stream-publish', () => { let publisherPrivateKey: string let subscriberPrivateKey: string let streamCreatorPrivateKey: string + let signingService: SigningService + + beforeAll(() => { + signingService = new SigningService(new DestroySignal()) + }) + + afterAll(() => { + signingService.destroy() + }) function createSubscriber(): StreamrClient { return createTestClient(subscriberPrivateKey) @@ -39,7 +50,7 @@ describe('stream-publish', () => { } async function createTestMessage(streamId: StreamID, partition: number, privateKey: string, content: Uint8Array, timestamp: number) { - const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(privateKey)) + const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(privateKey), signingService) return await messageSigner.createSignedMessage({ messageId: new MessageID(streamId, partition, timestamp, 0, toUserId(new Wallet(privateKey).address), 'mock-msgChainId'), content, diff --git a/packages/sdk/createKarmaConfig.ts b/packages/sdk/createKarmaConfig.ts index a1adab036e..e455504467 100644 --- a/packages/sdk/createKarmaConfig.ts +++ b/packages/sdk/createKarmaConfig.ts @@ -16,6 +16,7 @@ export function createKarmaConfig(testPaths: string[]): ReturnType/src/_jest/createSignatureValidationWorker.ts", + "^@/createSigningWorker$": "/src/_jest/createSigningWorker.ts", "^@/(.*)$": "/src/_nodejs/$1", }, transform: { diff --git a/packages/sdk/rollup.config.mts b/packages/sdk/rollup.config.mts index db9527580a..35efbadc6e 100644 --- a/packages/sdk/rollup.config.mts +++ b/packages/sdk/rollup.config.mts @@ -28,9 +28,19 @@ const browserAliases: Alias[] = [ { find: 'stream', replacement: 'readable-stream' }, ] +/** + * Worker entry points - add new workers here. + * Key: output name (will become [name].node.mjs and [name].browser.mjs) + * Value: path relative to src/ (without extension) + */ +const WORKERS: Record = { + 'SignatureValidationWorker': 'signature/SignatureValidationWorker', + 'SigningWorker': 'signature/SigningWorker', +} + export default defineConfig([ - workerNodejs(), - workerBrowser(), + workersNodejs(), + ...workersBrowser(), nodejs(), nodejsTypes(), browser(), @@ -208,15 +218,19 @@ function umdMinified(): RollupOptions { } /** - * Worker bundle for Node.js - ESM format for use with web-worker {type: 'module'} + * All worker bundles for Node.js - ESM format for use with web-worker {type: 'module'} */ -function workerNodejs(): RollupOptions { +function workersNodejs(): RollupOptions { return { - input: './dist/nodejs/src/signature/SignatureValidationWorker.js', + input: Object.fromEntries( + Object.entries(WORKERS).map(([name, path]) => [name, `./dist/nodejs/src/${path}.js`]) + ), context: 'globalThis', output: { format: 'es', - file: './dist/workers/SignatureValidationWorker.node.mjs', + dir: './dist/workers', + entryFileNames: '[name].node.mjs', + chunkFileNames: '[name]-[hash].node.mjs', sourcemap: true, }, plugins: [ @@ -235,15 +249,18 @@ function workerNodejs(): RollupOptions { } /** - * Worker bundle for browser - ESM format for use with web-worker {type: 'module'} + * All worker bundles for browser - ESM format for use with web-worker {type: 'module'} + * Each worker is built as a self-contained bundle with all dependencies inlined. + * This avoids issues with webpack/karma not copying associated chunk files. */ -function workerBrowser(): RollupOptions { - return { - input: './dist/browser/src/signature/SignatureValidationWorker.js', +function workersBrowser(): RollupOptions[] { + return Object.entries(WORKERS).map(([name, path]) => ({ + input: `./dist/browser/src/${path}.js`, context: 'self', output: { format: 'es', - file: './dist/workers/SignatureValidationWorker.browser.mjs', + file: `./dist/workers/${name}.browser.mjs`, + inlineDynamicImports: true, sourcemap: true, }, plugins: [ @@ -259,5 +276,5 @@ function workerBrowser(): RollupOptions { ], external: [], onwarn, - } + })) } diff --git a/packages/sdk/src/_browser/createSigningWorker.ts b/packages/sdk/src/_browser/createSigningWorker.ts new file mode 100644 index 0000000000..46a0c3bb84 --- /dev/null +++ b/packages/sdk/src/_browser/createSigningWorker.ts @@ -0,0 +1,11 @@ +/** + * Browser-specific signing worker factory. + */ +import Worker from 'web-worker' + +export function createSigningWorker(): InstanceType { + return new Worker( + new URL('./workers/SigningWorker.browser.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_jest/createSigningWorker.ts b/packages/sdk/src/_jest/createSigningWorker.ts new file mode 100644 index 0000000000..f80d03954a --- /dev/null +++ b/packages/sdk/src/_jest/createSigningWorker.ts @@ -0,0 +1,12 @@ +/** + * Jest-specific signing worker factory. + * Points to the built worker in dist/ for testing. + */ +import Worker from 'web-worker' + +export function createSigningWorker(): InstanceType { + return new Worker( + new URL('../../dist/workers/SigningWorker.node.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_karma/createSigningWorker.ts b/packages/sdk/src/_karma/createSigningWorker.ts new file mode 100644 index 0000000000..daae1dcbe4 --- /dev/null +++ b/packages/sdk/src/_karma/createSigningWorker.ts @@ -0,0 +1,12 @@ +/** + * Karma-specific signing worker factory. + * Points to the built worker in dist/ for browser testing. + */ +import Worker from 'web-worker' + +export function createSigningWorker(): InstanceType { + return new Worker( + new URL('../../dist/workers/SigningWorker.browser.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/_nodejs/createSigningWorker.ts b/packages/sdk/src/_nodejs/createSigningWorker.ts new file mode 100644 index 0000000000..fcecd807e8 --- /dev/null +++ b/packages/sdk/src/_nodejs/createSigningWorker.ts @@ -0,0 +1,11 @@ +/** + * Node.js-specific signing worker factory. + */ +import Worker from 'web-worker' + +export function createSigningWorker(): InstanceType { + return new Worker( + new URL('./workers/SigningWorker.node.mjs', import.meta.url), + { type: 'module' } + ) +} diff --git a/packages/sdk/src/exports.ts b/packages/sdk/src/exports.ts index 2a9549451e..0988cb88ce 100644 --- a/packages/sdk/src/exports.ts +++ b/packages/sdk/src/exports.ts @@ -48,6 +48,7 @@ export { ConfigInjectionToken, } from './ConfigTypes' +export { DestroySignal } from './DestroySignal' export { DEFAULT_KEY_TYPE } from './identity/IdentityMapping' export { GroupKey as EncryptionKey } from './encryption/GroupKey' export type { UpdateEncryptionKeyOptions } from './encryption/LocalGroupKeyStore' @@ -60,6 +61,7 @@ export { EthereumProviderIdentity } from './identity/EthereumProviderIdentity' export { MLDSAKeyPairIdentity } from './identity/MLDSAKeyPairIdentity' export { ECDSAKeyPairIdentity } from './identity/ECDSAKeyPairIdentity' export { MessageSigner } from './signature/MessageSigner' +export { SigningService } from './signature/SigningService' export { RpcProviderSource } from './RpcProviderSource' export { convertBytesToStreamMessage, convertStreamMessageToBytes } from './protocol/oldStreamMessageBinaryUtils' diff --git a/packages/sdk/src/identity/Identity.ts b/packages/sdk/src/identity/Identity.ts index 0641139415..0ea5a7be22 100644 --- a/packages/sdk/src/identity/Identity.ts +++ b/packages/sdk/src/identity/Identity.ts @@ -20,4 +20,13 @@ export abstract class Identity { abstract getSignatureType(): SignatureType abstract createMessageSignature(payload: Uint8Array): Promise abstract getTransactionSigner(rpcProviderSource: RpcProviderSource): Promise + + /** + * Returns the private key if this identity supports worker-based signing. + * Returns undefined for identities that rely on external signers (e.g. browser wallets). + */ + // eslint-disable-next-line class-methods-use-this + getPrivateKey(): Uint8Array | undefined { + return undefined + } } diff --git a/packages/sdk/src/identity/KeyPairIdentity.ts b/packages/sdk/src/identity/KeyPairIdentity.ts index db6becf71a..08ca87217f 100644 --- a/packages/sdk/src/identity/KeyPairIdentity.ts +++ b/packages/sdk/src/identity/KeyPairIdentity.ts @@ -30,7 +30,7 @@ export abstract class KeyPairIdentity extends Identity { return this.publicKeyString } - async getPrivateKey(): Promise { + override getPrivateKey(): Uint8Array { return this.privateKey } diff --git a/packages/sdk/src/signature/MessageSigner.ts b/packages/sdk/src/signature/MessageSigner.ts index 24869e6a21..bf46e07382 100644 --- a/packages/sdk/src/signature/MessageSigner.ts +++ b/packages/sdk/src/signature/MessageSigner.ts @@ -4,24 +4,58 @@ import { Identity, IdentityInjectionToken } from '../identity/Identity' import { StreamMessage, StreamMessageOptions } from '../protocol/StreamMessage' import { createSignaturePayload } from './createSignaturePayload' import { SignatureType } from '@streamr/trackerless-network' +import { SigningService } from './SigningService' @scoped(Lifecycle.ContainerScoped) export class MessageSigner { private readonly identity: Identity + private readonly signingService: SigningService - constructor(@inject(IdentityInjectionToken) identity: Identity) { + constructor( + @inject(IdentityInjectionToken) identity: Identity, + signingService: SigningService + ) { this.identity = identity + this.signingService = signingService } async createSignedMessage( opts: MarkRequired, 'messageType'>, signatureType: SignatureType ): Promise { - const signature = await this.identity.createMessageSignature(createSignaturePayload(opts)) + let signature: Uint8Array + + // Use worker-based signing if identity provides a private key and signature type supports it + // ERC-1271 signatures require on-chain verification and must be handled by the identity directly + const privateKey = this.identity.getPrivateKey() + if (privateKey !== undefined && signatureType !== SignatureType.ERC_1271) { + signature = await this.createSignatureInWorker(opts, signatureType, privateKey) + } else { + signature = await this.identity.createMessageSignature(createSignaturePayload(opts)) + } + return new StreamMessage({ ...opts, signature, signatureType }) } + + private async createSignatureInWorker( + opts: MarkRequired, 'messageType'>, + signatureType: SignatureType, + privateKey: Uint8Array + ): Promise { + const result = await this.signingService.sign({ + payloadInput: opts, + privateKey, + signatureType + }) + + if (result.type === 'error') { + throw new Error(`Signing failed: ${result.message}`) + } + + return result.signature + } } diff --git a/packages/sdk/src/signature/SigningService.ts b/packages/sdk/src/signature/SigningService.ts new file mode 100644 index 0000000000..755f7c1fe1 --- /dev/null +++ b/packages/sdk/src/signature/SigningService.ts @@ -0,0 +1,46 @@ +/** + * Singleton signing service using Web Worker. + * This offloads CPU-intensive cryptographic operations to a separate thread. + * Works in both browser and Node.js environments via platform-specific config. + * + * The worker is lazily initialized on first use and shared across all MessageSigner instances. + */ +import { wrap, releaseProxy, type Remote } from 'comlink' +import { Lifecycle, scoped } from 'tsyringe' +import { createSigningWorker } from '@/createSigningWorker' +import { SigningResult, SigningRequest } from './signingUtils' +import type { SigningWorkerApi } from './SigningWorker' +import { DestroySignal } from '../DestroySignal' + +@scoped(Lifecycle.ContainerScoped) +export class SigningService { + private worker: ReturnType | undefined + private workerApi: Remote | undefined + + constructor(destroySignal: DestroySignal) { + destroySignal.onDestroy.listen(() => this.destroy()) + } + + private getWorkerApi(): Remote { + if (this.workerApi === undefined) { + this.worker = createSigningWorker() + this.workerApi = wrap(this.worker) + } + return this.workerApi + } + + async sign(request: SigningRequest): Promise { + return this.getWorkerApi().createSignature(request) + } + + destroy(): void { + if (this.workerApi !== undefined) { + this.workerApi[releaseProxy]() + this.workerApi = undefined + } + if (this.worker !== undefined) { + this.worker.terminate() + this.worker = undefined + } + } +} diff --git a/packages/sdk/src/signature/SigningWorker.ts b/packages/sdk/src/signature/SigningWorker.ts new file mode 100644 index 0000000000..da56432a49 --- /dev/null +++ b/packages/sdk/src/signature/SigningWorker.ts @@ -0,0 +1,22 @@ +import { expose, transfer } from 'comlink' +import { + createSignatureFromData, + SigningResult, + SigningRequest, +} from './signingUtils' + +const workerApi = { + createSignature: async ( + request: SigningRequest + ): Promise => { + const result = await createSignatureFromData(request) + if (result.type === 'success') { + return transfer(result, [result.signature.buffer]) + } + return result + }, +} + +export type SigningWorkerApi = typeof workerApi + +expose(workerApi) diff --git a/packages/sdk/src/signature/createSignaturePayload.ts b/packages/sdk/src/signature/createSignaturePayload.ts index 0162139680..5720c3584e 100644 --- a/packages/sdk/src/signature/createSignaturePayload.ts +++ b/packages/sdk/src/signature/createSignaturePayload.ts @@ -26,13 +26,18 @@ export interface MessageRefLike { sequenceNumber: number } -export const createSignaturePayload = (opts: { +/** + * Input data for creating a signature payload. + */ +export interface SignaturePayloadInput { messageId: MessageIdLike content: Uint8Array messageType: StreamMessageType prevMsgRef?: MessageRefLike newGroupKey?: EncryptedGroupKey -}): Uint8Array | never => { +} + +export const createSignaturePayload = (opts: SignaturePayloadInput): Uint8Array | never => { const header = Buffer.concat([ Buffer.from(`${opts.messageId.streamId}${opts.messageId.streamPartition}${opts.messageId.timestamp}` + `${opts.messageId.sequenceNumber}${opts.messageId.publisherId}${opts.messageId.msgChainId}`), diff --git a/packages/sdk/src/signature/signingUtils.ts b/packages/sdk/src/signature/signingUtils.ts new file mode 100644 index 0000000000..9bc6e32a56 --- /dev/null +++ b/packages/sdk/src/signature/signingUtils.ts @@ -0,0 +1,48 @@ +/** + * Core signing logic - shared between worker and main thread implementations. + * This file contains pure cryptographic signing functions without any network dependencies. + */ +import { SigningUtil } from '@streamr/utils' +import { SignatureType } from '@streamr/trackerless-network' +import { IDENTITY_MAPPING } from '../identity/IdentityMapping' +import { createSignaturePayload, SignaturePayloadInput } from './createSignaturePayload' + +// Lookup structure SignatureType -> SigningUtil +const signingUtilBySignatureType: Record = Object.fromEntries( + IDENTITY_MAPPING.map((idMapping) => [idMapping.signatureType, SigningUtil.getInstance(idMapping.keyType)]) +) + +/** + * Result of signing + */ +export type SigningResult = + | { type: 'success', signature: Uint8Array } + | { type: 'error', message: string } + +/** + * Complete signing request including private key and signature type. + */ +export interface SigningRequest { + payloadInput: SignaturePayloadInput + privateKey: Uint8Array + signatureType: SignatureType +} + +/** + * Create a signature for the given data. + * This is the core signing logic that can be run in a worker. + */ +export async function createSignatureFromData(request: SigningRequest): Promise { + try { + const signingUtil = signingUtilBySignatureType[request.signatureType] + if (!signingUtil) { + return { type: 'error', message: `Unsupported signatureType: "${request.signatureType}"` } + } + + const payload = createSignaturePayload(request.payloadInput) + const signature = await signingUtil.createSignature(payload, request.privateKey) + return { type: 'success', signature } + } catch (err) { + return { type: 'error', message: String(err) } + } +} diff --git a/packages/sdk/test/end-to-end/publish-subscribe-raw.test.ts b/packages/sdk/test/end-to-end/publish-subscribe-raw.test.ts index ee4a796d43..e05a4f5c9f 100644 --- a/packages/sdk/test/end-to-end/publish-subscribe-raw.test.ts +++ b/packages/sdk/test/end-to-end/publish-subscribe-raw.test.ts @@ -1,8 +1,7 @@ import { StreamID, toUserId } from '@streamr/utils' -import { createTestClient, createTestStream } from '../test-utils/utils' +import { createMessageSigner, createTestClient, createTestStream } from '../test-utils/utils' import { nextValue } from '../../src/utils/iterators' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' -import { MessageSigner } from '../../src/signature/MessageSigner' import { Wallet } from 'ethers' import { MessageID } from '../../src/protocol/MessageID' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' @@ -29,7 +28,7 @@ describe('publish-subscribe-raw', () => { }) async function createTestMessage() { - const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(publisherWallet.privateKey)) + const messageSigner = createMessageSigner(EthereumKeyPairIdentity.fromPrivateKey(publisherWallet.privateKey)) return await messageSigner.createSignedMessage({ messageId: new MessageID(streamId, 0, 123456789, 0, toUserId(publisherWallet.address), 'mock-msgChainId'), content: new Uint8Array([1, 2, 3]), diff --git a/packages/sdk/test/integration/Resends.test.ts b/packages/sdk/test/integration/Resends.test.ts index b6ebd3b5de..301433a375 100644 --- a/packages/sdk/test/integration/Resends.test.ts +++ b/packages/sdk/test/integration/Resends.test.ts @@ -6,10 +6,9 @@ import { StreamrClient } from '../../src/StreamrClient' import { GroupKey } from '../../src/encryption/GroupKey' import { StreamPermission } from '../../src/permission' import { MessageFactory } from '../../src/publish/MessageFactory' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' -import { createGroupKeyQueue, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { createStrictConfig } from '../../src/Config' @@ -45,7 +44,7 @@ describe('Resends', () => { streamRegistry: createStreamRegistry(), groupKeyQueue: await createGroupKeyQueue(identity, groupKey), signatureValidator: mock(), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: createStrictConfig() }) // store the encryption key publisher's local group key store diff --git a/packages/sdk/test/integration/Subscriber2.test.ts b/packages/sdk/test/integration/Subscriber2.test.ts index c4468181c5..c81899996d 100644 --- a/packages/sdk/test/integration/Subscriber2.test.ts +++ b/packages/sdk/test/integration/Subscriber2.test.ts @@ -10,7 +10,7 @@ import { MessageSigner } from '../../src/signature/MessageSigner' import { Subscription } from '../../src/subscribe/Subscription' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' import { getPublishTestStreamMessages } from '../test-utils/publish' -import { createTestStream } from '../test-utils/utils' +import { createMessageSigner, createTestStream } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' @@ -69,7 +69,7 @@ describe('Subscriber', () => { } }) const publisherIdentity = EthereumKeyPairIdentity.fromPrivateKey(publisherWallet.privateKey) - messageSigner = new MessageSigner(publisherIdentity) + messageSigner = createMessageSigner(publisherIdentity) }) afterAll(async () => { diff --git a/packages/sdk/test/integration/gap-fill.test.ts b/packages/sdk/test/integration/gap-fill.test.ts index 8162ac09f9..d577ce120b 100644 --- a/packages/sdk/test/integration/gap-fill.test.ts +++ b/packages/sdk/test/integration/gap-fill.test.ts @@ -4,10 +4,9 @@ import { Wallet } from 'ethers' import { mock } from 'jest-mock-extended' import { GroupKey } from '../../src/encryption/GroupKey' import { StreamMessage } from '../../src/protocol/StreamMessage' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' -import { createGroupKeyQueue, createStreamRegistry, createTestStream, startFailingStorageNode } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createStreamRegistry, createTestStream, startFailingStorageNode } from '../test-utils/utils' import { Stream } from './../../src/Stream' import { MessageFactory } from './../../src/publish/MessageFactory' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' @@ -45,7 +44,7 @@ describe('gap fill', () => { streamRegistry: createStreamRegistry(), groupKeyQueue: await createGroupKeyQueue(identity, GROUP_KEY), signatureValidator: mock(), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: createStrictConfig() }) }) diff --git a/packages/sdk/test/integration/parallel-key-exchange.test.ts b/packages/sdk/test/integration/parallel-key-exchange.test.ts index 08853bd974..ff6f872ed7 100644 --- a/packages/sdk/test/integration/parallel-key-exchange.test.ts +++ b/packages/sdk/test/integration/parallel-key-exchange.test.ts @@ -9,9 +9,8 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { StreamPermission } from '../../src/permission' import { StreamMessageType } from '../../src/protocol/StreamMessage' import { MessageFactory } from '../../src/publish/MessageFactory' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' -import { createGroupKeyQueue, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' import { FakeEnvironment } from './../test-utils/fake/FakeEnvironment' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { createStrictConfig } from '../../src/Config' @@ -73,7 +72,7 @@ describe('parallel key exchange', () => { }), groupKeyQueue: await createGroupKeyQueue(identity, publisher.groupKey), signatureValidator: mock(), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: createStrictConfig() }) for (let i = 0; i < MESSAGE_COUNT_PER_PUBLISHER; i++) { diff --git a/packages/sdk/test/integration/sequential-resend-subscribe.test.ts b/packages/sdk/test/integration/sequential-resend-subscribe.test.ts index 20153852fc..6b3aa08c5c 100644 --- a/packages/sdk/test/integration/sequential-resend-subscribe.test.ts +++ b/packages/sdk/test/integration/sequential-resend-subscribe.test.ts @@ -75,7 +75,7 @@ describe('sequential resend subscribe', () => { const expectedMessageCount = published.length + 1 // the realtime message which we publish next const receivedMsgsPromise = collect(sub, expectedMessageCount) - await until(() => onResent.mock.calls.length > 0) + await until(() => onResent.mock.calls.length > 0, 15_000) const streamMessage = await publisher.publish(stream.id, Msg(), { // should be realtime timestamp: id }) diff --git a/packages/sdk/test/integration/waitForStorage.test.ts b/packages/sdk/test/integration/waitForStorage.test.ts index a84ea00b16..a729fe05c5 100644 --- a/packages/sdk/test/integration/waitForStorage.test.ts +++ b/packages/sdk/test/integration/waitForStorage.test.ts @@ -6,7 +6,7 @@ import { StreamPermission } from '../../src/permission' import { MessageSigner } from '../../src/signature/MessageSigner' import { FakeEnvironment } from '../test-utils/fake/FakeEnvironment' import { FakeStorageNode } from '../test-utils/fake/FakeStorageNode' -import { MOCK_CONTENT, createRandomIdentity, createRelativeTestStreamId } from '../test-utils/utils' +import { MOCK_CONTENT, createMessageSigner, createRandomIdentity, createRelativeTestStreamId } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessageType } from './../../src/protocol/StreamMessage' import { randomUserId } from '@streamr/test-utils' @@ -23,7 +23,7 @@ describe('waitForStorage', () => { let environment: FakeEnvironment beforeEach(async () => { - messageSigner = new MessageSigner(await createRandomIdentity()) + messageSigner = createMessageSigner(await createRandomIdentity()) environment = new FakeEnvironment() client = environment.createClient() stream = await client.createStream({ diff --git a/packages/sdk/test/test-utils/fake/FakeEnvironment.ts b/packages/sdk/test/test-utils/fake/FakeEnvironment.ts index 206c5117df..3d530952c2 100644 --- a/packages/sdk/test/test-utils/fake/FakeEnvironment.ts +++ b/packages/sdk/test/test-utils/fake/FakeEnvironment.ts @@ -21,6 +21,7 @@ import { FakeStorageNodeRegistry } from './FakeStorageNodeRegistry' import { FakeStreamRegistry } from './FakeStreamRegistry' import { FakeStreamStorageRegistry } from './FakeStreamStorageRegistry' import { DestroySignal } from '../../../src/DestroySignal' +import { SigningService } from '../../../src/signature/SigningService' const DEFAULT_CLIENT_OPTIONS: StreamrClientConfig = { encryption: { @@ -34,13 +35,16 @@ export class FakeEnvironment { private chain: FakeChain private logger: FakeLogger private dependencyContainer: DependencyContainer - private destroySignal = new DestroySignal() + private destroySignal: DestroySignal + private signingService: SigningService constructor() { this.network = new FakeNetwork() this.chain = new FakeChain() this.logger = new FakeLogger() this.dependencyContainer = container.createChildContainer() + this.destroySignal = new DestroySignal() + this.signingService = new SigningService(this.destroySignal) const loggerFactory = { createLogger: () => this.logger } @@ -53,6 +57,7 @@ export class FakeEnvironment { this.dependencyContainer.register(StreamStorageRegistry, FakeStreamStorageRegistry as any) this.dependencyContainer.register(StorageNodeRegistry, FakeStorageNodeRegistry as any) this.dependencyContainer.register(OperatorRegistry, FakeOperatorRegistry as any) + this.dependencyContainer.register(SigningService, { useValue: this.signingService }) } createClient(opts?: StreamrClientConfig): StreamrClient { diff --git a/packages/sdk/test/test-utils/utils.ts b/packages/sdk/test/test-utils/utils.ts index b97bf96e77..8ea62c0fc2 100644 --- a/packages/sdk/test/test-utils/utils.ts +++ b/packages/sdk/test/test-utils/utils.ts @@ -49,6 +49,8 @@ import { StreamMessage } from '../../src/protocol/StreamMessage' import { GroupKeyQueue } from '../../src/publish/GroupKeyQueue' import { MessageFactory } from '../../src/publish/MessageFactory' import { MessageSigner } from '../../src/signature/MessageSigner' +import { SigningService } from '../../src/signature/SigningService' +import { createSignatureFromData } from '../../src/signature/signingUtils' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { LoggerFactory } from '../../src/utils/LoggerFactory' import { counterId } from '../../src/utils/utils' @@ -59,6 +61,25 @@ import { StreamIDBuilder } from '../../src/StreamIDBuilder' const logger = new Logger('sdk-test-utils') +/** + * Creates a mock SigningService that performs signing synchronously on the main thread. + * Use this in tests instead of the real SigningService which spawns a worker. + */ +export function createMockSigningService(): SigningService { + return { + sign: createSignatureFromData, + destroy: () => {} + } as unknown as SigningService +} + +/** + * Creates a MessageSigner for testing purposes. + * Uses a mock SigningService that doesn't spawn a worker. + */ +export function createMessageSigner(identity: Identity): MessageSigner { + return new MessageSigner(identity, createMockSigningService()) +} + export function mockLoggerFactory(clientId?: string): LoggerFactory { return new LoggerFactory({ id: clientId ?? counterId('TestCtx'), @@ -152,7 +173,7 @@ export const createMockMessage = async ( }), groupKeyQueue: await createGroupKeyQueue(identity, opts.encryptionKey, opts.nextEncryptionKey), signatureValidator: mock(), - messageSigner: new MessageSigner(identity) + messageSigner: createMessageSigner(identity) }) const DEFAULT_CONTENT = {} const plainContent = opts.content ?? DEFAULT_CONTENT diff --git a/packages/sdk/test/unit/MessageFactory.test.ts b/packages/sdk/test/unit/MessageFactory.test.ts index 03d09f31d3..76d0edb8bb 100644 --- a/packages/sdk/test/unit/MessageFactory.test.ts +++ b/packages/sdk/test/unit/MessageFactory.test.ts @@ -9,9 +9,8 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyQueue } from '../../src/publish/GroupKeyQueue' import { MessageFactory, MessageFactoryOptions } from '../../src/publish/MessageFactory' import { PublishMetadata } from '../../src/publish/Publisher' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' -import { createGroupKeyQueue, createStreamRegistry } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createStreamRegistry } from '../test-utils/utils' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { EncryptionType, SignatureType, ContentType } from '@streamr/trackerless-network' @@ -60,7 +59,7 @@ describe('MessageFactory', () => { }), groupKeyQueue: await createGroupKeyQueue(identity, GROUP_KEY), signatureValidator: new SignatureValidator(opts?.erc1271ContractFacade ?? mock(), new DestroySignal()), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: { validation: { permissions: true, diff --git a/packages/sdk/test/unit/MessageStream.test.ts b/packages/sdk/test/unit/MessageStream.test.ts index 7022601f62..ac6c540838 100644 --- a/packages/sdk/test/unit/MessageStream.test.ts +++ b/packages/sdk/test/unit/MessageStream.test.ts @@ -3,7 +3,7 @@ import omit from 'lodash/omit' import { MessageSigner } from '../../src/signature/MessageSigner' import { MessageStream } from '../../src/subscribe/MessageStream' import { Msg } from '../test-utils/publish' -import { createRandomIdentity, waitForCalls } from '../test-utils/utils' +import { createMessageSigner, createRandomIdentity, waitForCalls } from '../test-utils/utils' import { convertStreamMessageToMessage } from './../../src/Message' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessageType } from './../../src/protocol/StreamMessage' @@ -28,7 +28,7 @@ describe('MessageStream', () => { } beforeEach(async () => { - messageSigner = new MessageSigner(await createRandomIdentity()) + messageSigner = createMessageSigner(await createRandomIdentity()) }) it('onMessage', async () => { diff --git a/packages/sdk/test/unit/PushPipeline.test.ts b/packages/sdk/test/unit/PushPipeline.test.ts index 5246f0402f..b0fcf20a7d 100644 --- a/packages/sdk/test/unit/PushPipeline.test.ts +++ b/packages/sdk/test/unit/PushPipeline.test.ts @@ -6,7 +6,7 @@ import { PushPipeline } from '../../src/utils/PushPipeline' import { counterId, instanceId } from '../../src/utils/utils' import { LeaksDetector } from '../test-utils/LeaksDetector' import { Msg } from '../test-utils/publish' -import { createRandomIdentity } from '../test-utils/utils' +import { createMessageSigner, createRandomIdentity } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' @@ -31,7 +31,7 @@ describe('PushPipeline', () => { beforeEach(async () => { leaksDetector = new LeaksDetector() - messageSigner = new MessageSigner(await createRandomIdentity()) + messageSigner = createMessageSigner(await createRandomIdentity()) }) afterEach(async () => { diff --git a/packages/sdk/test/unit/SigningService.test.ts b/packages/sdk/test/unit/SigningService.test.ts new file mode 100644 index 0000000000..efa36b5a8e --- /dev/null +++ b/packages/sdk/test/unit/SigningService.test.ts @@ -0,0 +1,189 @@ +import { toStreamID, utf8ToBinary, EcdsaSecp256k1Evm } from '@streamr/utils' +import { SignatureType } from '@streamr/trackerless-network' +import { SigningService } from '../../src/signature/SigningService' +import { SigningRequest } from '../../src/signature/signingUtils' +import { StreamMessageType } from '../../src/protocol/StreamMessage' +import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' +import { DestroySignal } from '../../src/DestroySignal' +import { createSignaturePayload } from '../../src/signature/createSignaturePayload' + +describe('SigningService', () => { + + let signingService: SigningService + let destroySignal: DestroySignal + + beforeEach(() => { + destroySignal = new DestroySignal() + signingService = new SigningService(destroySignal) + }) + + afterEach(() => { + signingService.destroy() + }) + + it('signs a message using the worker and produces a valid signature', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + const payloadInput = { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now(), + sequenceNumber: 0, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ hello: 'world' })), + messageType: StreamMessageType.MESSAGE + } + + const request: SigningRequest = { + payloadInput, + privateKey, + signatureType: SignatureType.ECDSA_SECP256K1_EVM + } + + const result = await signingService.sign(request) + + if (result.type !== 'success') { + throw new Error(`Expected success but got error: ${result.message}`) + } + expect(result.signature).toBeInstanceOf(Uint8Array) + expect(result.signature.length).toBeGreaterThan(0) + + // Verify the signature is valid by checking it against the payload + const payload = createSignaturePayload(payloadInput) + const signingUtil = new EcdsaSecp256k1Evm() + const isValid = await signingUtil.verifySignature(await identity.getUserIdRaw(), payload, result.signature) + expect(isValid).toBe(true) + }) + + it('can sign multiple messages sequentially', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + const signatures: Uint8Array[] = [] + + for (let i = 0; i < 3; i++) { + const request: SigningRequest = { + payloadInput: { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now() + i, + sequenceNumber: i, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ index: i })), + messageType: StreamMessageType.MESSAGE + }, + privateKey, + signatureType: SignatureType.ECDSA_SECP256K1_EVM + } + + const result = await signingService.sign(request) + if (result.type !== 'success') { + throw new Error(`Expected success but got error: ${result.message}`) + } + signatures.push(result.signature) + } + + expect(signatures).toHaveLength(3) + // All signatures should be different (different payloads) + expect(new Set(signatures.map((s) => Buffer.from(s).toString('hex'))).size).toBe(3) + }) + + it('returns error for unsupported signature type', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + const request: SigningRequest = { + payloadInput: { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now(), + sequenceNumber: 0, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ hello: 'world' })), + messageType: StreamMessageType.MESSAGE + }, + privateKey, + signatureType: 999 as SignatureType // Invalid signature type + } + + const result = await signingService.sign(request) + + if (result.type !== 'error') { + throw new Error('Expected error but got success') + } + expect(result.message).toContain('Unsupported signatureType') + }) + + it('cleans up worker on destroy', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + // First sign to ensure worker is created + const request: SigningRequest = { + payloadInput: { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now(), + sequenceNumber: 0, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ hello: 'world' })), + messageType: StreamMessageType.MESSAGE + }, + privateKey, + signatureType: SignatureType.ECDSA_SECP256K1_EVM + } + + await signingService.sign(request) + + // Destroy should not throw + expect(() => signingService.destroy()).not.toThrow() + + // Calling destroy again should be safe (idempotent) + expect(() => signingService.destroy()).not.toThrow() + }) + + it('cleans up via DestroySignal', async () => { + const identity = EthereumKeyPairIdentity.generate() + const privateKey = identity.getPrivateKey() + const publisherId = await identity.getUserId() + + const request: SigningRequest = { + payloadInput: { + messageId: { + streamId: toStreamID('test-stream'), + streamPartition: 0, + timestamp: Date.now(), + sequenceNumber: 0, + publisherId, + msgChainId: 'test-chain' + }, + content: utf8ToBinary(JSON.stringify({ hello: 'world' })), + messageType: StreamMessageType.MESSAGE + }, + privateKey, + signatureType: SignatureType.ECDSA_SECP256K1_EVM + } + + await signingService.sign(request) + + // Trigger destroy via signal - should not throw + await destroySignal.destroy() + }) +}) diff --git a/packages/sdk/test/unit/messagePipeline.test.ts b/packages/sdk/test/unit/messagePipeline.test.ts index c267813b6f..4f9d00f773 100644 --- a/packages/sdk/test/unit/messagePipeline.test.ts +++ b/packages/sdk/test/unit/messagePipeline.test.ts @@ -11,11 +11,10 @@ import { GroupKey } from '../../src/encryption/GroupKey' import { GroupKeyManager } from '../../src/encryption/GroupKeyManager' import { SubscriberKeyExchange } from '../../src/encryption/SubscriberKeyExchange' import { StreamrClientEventEmitter } from '../../src/events' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { createMessagePipeline } from '../../src/subscribe/messagePipeline' import { PushPipeline } from '../../src/utils/PushPipeline' -import { mockLoggerFactory } from '../test-utils/utils' +import { createMessageSigner, mockLoggerFactory } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { EncryptionType, ContentType, SignatureType } from '@streamr/trackerless-network' @@ -40,7 +39,7 @@ describe('messagePipeline', () => { contentType?: ContentType } = {}): Promise => { const [streamId, partition] = StreamPartIDUtils.getStreamIDAndPartition(streamPartId) - const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(publisher.privateKey)) + const messageSigner = createMessageSigner(EthereumKeyPairIdentity.fromPrivateKey(publisher.privateKey)) return messageSigner.createSignedMessage({ messageId: new MessageID( streamId, diff --git a/packages/sdk/test/unit/resendSubscription.test.ts b/packages/sdk/test/unit/resendSubscription.test.ts index 40b8caae6a..c716c48a96 100644 --- a/packages/sdk/test/unit/resendSubscription.test.ts +++ b/packages/sdk/test/unit/resendSubscription.test.ts @@ -6,13 +6,12 @@ import isEqual from 'lodash/isEqual' import last from 'lodash/last' import { Message } from '../../src/Message' import { MessageFactory } from '../../src/publish/MessageFactory' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { ResendRangeOptions } from '../../src/subscribe/Resends' import { Subscription, SubscriptionEvents } from '../../src/subscribe/Subscription' import { initResendSubscription } from '../../src/subscribe/resendSubscription' import { PushPipeline } from '../../src/utils/PushPipeline' -import { createGroupKeyQueue, createRandomIdentity, createStreamRegistry, mockLoggerFactory } from '../test-utils/utils' +import { createGroupKeyQueue, createMessageSigner, createRandomIdentity, createStreamRegistry, mockLoggerFactory } from '../test-utils/utils' import { StreamMessage } from './../../src/protocol/StreamMessage' import { createStrictConfig } from '../../src/Config' @@ -61,7 +60,7 @@ describe('resend subscription', () => { }), groupKeyQueue: await createGroupKeyQueue(identity), signatureValidator: mock(), - messageSigner: new MessageSigner(identity), + messageSigner: createMessageSigner(identity), config: createStrictConfig(), }) }) diff --git a/packages/sdk/test/unit/validateStreamMessage2.test.ts b/packages/sdk/test/unit/validateStreamMessage2.test.ts index 9010e93cba..3dc617dc35 100644 --- a/packages/sdk/test/unit/validateStreamMessage2.test.ts +++ b/packages/sdk/test/unit/validateStreamMessage2.test.ts @@ -4,10 +4,9 @@ import { mock } from 'jest-mock-extended' import { Identity } from '../../src/identity/Identity' import { StreamMetadata } from '../../src/StreamMetadata' import { ERC1271ContractFacade } from '../../src/contracts/ERC1271ContractFacade' -import { MessageSigner } from '../../src/signature/MessageSigner' import { SignatureValidator } from '../../src/signature/SignatureValidator' import { validateStreamMessage } from '../../src/utils/validateStreamMessage' -import { MOCK_CONTENT, createRandomIdentity } from '../test-utils/utils' +import { MOCK_CONTENT, createMessageSigner, createRandomIdentity } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { MessageRef } from './../../src/protocol/MessageRef' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' @@ -20,7 +19,7 @@ const groupKeyRequestToStreamMessage = async ( prevMsgRef: MessageRef | undefined, identity: Identity ): Promise => { - const messageSigner = new MessageSigner(identity) + const messageSigner = createMessageSigner(identity) return messageSigner.createSignedMessage({ messageId, prevMsgRef, @@ -37,7 +36,7 @@ const groupKeyResponseToStreamMessage = async ( prevMsgRef: MessageRef | undefined, identity: Identity ): Promise => { - const messageSigner = new MessageSigner(identity) + const messageSigner = createMessageSigner(identity) return messageSigner.createSignedMessage({ messageId, prevMsgRef, @@ -100,7 +99,7 @@ describe('Validator2', () => { return userId === subscriber && streamId === 'streamId' } - const publisherSigner = new MessageSigner(publisherIdentity) + const publisherSigner = createMessageSigner(publisherIdentity) msg = await publisherSigner.createSignedMessage({ messageId: new MessageID(toStreamID('streamId'), 0, 0, 0, publisher, 'msgChainId'), diff --git a/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts b/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts index 014f9bed64..e268bf0e83 100644 --- a/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts +++ b/packages/sdk/test/unit/waitForAssignmentsToPropagate.test.ts @@ -1,13 +1,12 @@ -import { StreamID, toStreamID, toStreamPartID, utf8ToBinary, wait } from '@streamr/utils' +import { StreamID, toStreamID, toStreamPartID, UserID, utf8ToBinary, wait } from '@streamr/utils' import range from 'lodash/range' import shuffle from 'lodash/shuffle' import { MessageSigner } from '../../src/signature/MessageSigner' import { MessageStream } from '../../src/subscribe/MessageStream' import { waitForAssignmentsToPropagate } from '../../src/utils/waitForAssignmentsToPropagate' -import { createRandomIdentity, mockLoggerFactory } from '../test-utils/utils' +import { createMessageSigner, createRandomIdentity, mockLoggerFactory } from '../test-utils/utils' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' -import { Identity } from '../../src/identity/Identity' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' const RACE_TIMEOUT_IN_MS = 20 @@ -22,11 +21,12 @@ describe(waitForAssignmentsToPropagate, () => { let messageStream: MessageStream let propagatePromiseState: 'rejected' | 'resolved' | 'pending' let propagatePromise: Promise - let identity: Identity + let messageSigner: MessageSigner + let publisherId: UserID async function makeMsg(ts: number, content: unknown): Promise { - return new MessageSigner(identity).createSignedMessage({ - messageId: new MessageID(toStreamID('assignmentStreamId'), 0, ts, 0, await identity.getUserId(), 'msgChain'), + return messageSigner.createSignedMessage({ + messageId: new MessageID(toStreamID('assignmentStreamId'), 0, ts, 0, publisherId, 'msgChain'), messageType: StreamMessageType.MESSAGE, content: utf8ToBinary(JSON.stringify(content)), contentType: ContentType.JSON, @@ -46,7 +46,9 @@ describe(waitForAssignmentsToPropagate, () => { } beforeAll(async () => { - identity = await createRandomIdentity() + const identity = await createRandomIdentity() + messageSigner = createMessageSigner(identity) + publisherId = await identity.getUserId() }) beforeEach(() => {