Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cli-tools/jest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import defaultConfig from '../../jest.config'

const config: Config.InitialOptions = {
...defaultConfig,
testTimeout: 15_000,
setupFilesAfterEnv: [
...defaultConfig.setupFilesAfterEnv,
'@streamr/test-utils/setupCustomMatchers'
Expand Down
15 changes: 13 additions & 2 deletions packages/cli-tools/test/stream-publish.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import {
EthereumKeyPairIdentity,
MessageID,
MessageSigner,
SigningService,
SignatureType,
StreamMessageType,
StreamPermission,
StreamrClient
StreamrClient,
DestroySignal
} from '@streamr/sdk'
import { createTestPrivateKey } from '@streamr/test-utils'
import { binaryToHex, keyToArrayIndex, StreamID, toLengthPrefixedFrame, toUserId, UserID } from '@streamr/utils'
Expand All @@ -24,6 +26,15 @@ describe('stream-publish', () => {
let publisherPrivateKey: string
let subscriberPrivateKey: string
let streamCreatorPrivateKey: string
let signingService: SigningService

beforeAll(() => {
signingService = new SigningService(new DestroySignal())
})

afterAll(() => {
signingService.destroy()
})

function createSubscriber(): StreamrClient {
return createTestClient(subscriberPrivateKey)
Expand All @@ -39,7 +50,7 @@ describe('stream-publish', () => {
}

async function createTestMessage(streamId: StreamID, partition: number, privateKey: string, content: Uint8Array, timestamp: number) {
const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(privateKey))
const messageSigner = new MessageSigner(EthereumKeyPairIdentity.fromPrivateKey(privateKey), signingService)
return await messageSigner.createSignedMessage({
messageId: new MessageID(streamId, partition, timestamp, 0, toUserId(new Wallet(privateKey).address), 'mock-msgChainId'),
content,
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/createKarmaConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export function createKarmaConfig(testPaths: string[]): ReturnType<typeof create
),
'@streamr/dht': resolve(__dirname, '../dht/dist/exports-browser.cjs'),
"@/createSignatureValidationWorker": resolve(__dirname, 'src/_karma/createSignatureValidationWorker.ts'),
"@/createSigningWorker": resolve(__dirname, 'src/_karma/createSigningWorker.ts'),
'@': resolve(__dirname, 'src/_browser'),
},
fallback: {
Expand Down
1 change: 1 addition & 0 deletions packages/sdk/jest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const config: Config.InitialOptions = {
],
moduleNameMapper: {
"^@/createSignatureValidationWorker$": "<rootDir>/src/_jest/createSignatureValidationWorker.ts",
"^@/createSigningWorker$": "<rootDir>/src/_jest/createSigningWorker.ts",
"^@/(.*)$": "<rootDir>/src/_nodejs/$1",
},
transform: {
Expand Down
41 changes: 29 additions & 12 deletions packages/sdk/rollup.config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,19 @@ const browserAliases: Alias[] = [
{ find: 'stream', replacement: 'readable-stream' },
]

/**
* Worker entry points - add new workers here.
* Key: output name (will become [name].node.mjs and [name].browser.mjs)
* Value: path relative to src/ (without extension)
*/
const WORKERS: Record<string, string> = {
'SignatureValidationWorker': 'signature/SignatureValidationWorker',
'SigningWorker': 'signature/SigningWorker',
}

export default defineConfig([
workerNodejs(),
workerBrowser(),
workersNodejs(),
...workersBrowser(),
nodejs(),
nodejsTypes(),
browser(),
Expand Down Expand Up @@ -208,15 +218,19 @@ function umdMinified(): RollupOptions {
}

/**
* Worker bundle for Node.js - ESM format for use with web-worker {type: 'module'}
* All worker bundles for Node.js - ESM format for use with web-worker {type: 'module'}
*/
function workerNodejs(): RollupOptions {
function workersNodejs(): RollupOptions {
return {
input: './dist/nodejs/src/signature/SignatureValidationWorker.js',
input: Object.fromEntries(
Object.entries(WORKERS).map(([name, path]) => [name, `./dist/nodejs/src/${path}.js`])
),
context: 'globalThis',
output: {
format: 'es',
file: './dist/workers/SignatureValidationWorker.node.mjs',
dir: './dist/workers',
entryFileNames: '[name].node.mjs',
chunkFileNames: '[name]-[hash].node.mjs',
sourcemap: true,
},
plugins: [
Expand All @@ -235,15 +249,18 @@ function workerNodejs(): RollupOptions {
}

/**
* Worker bundle for browser - ESM format for use with web-worker {type: 'module'}
* All worker bundles for browser - ESM format for use with web-worker {type: 'module'}
* Each worker is built as a self-contained bundle with all dependencies inlined.
* This avoids issues with webpack/karma not copying associated chunk files.
*/
function workerBrowser(): RollupOptions {
return {
input: './dist/browser/src/signature/SignatureValidationWorker.js',
function workersBrowser(): RollupOptions[] {
return Object.entries(WORKERS).map(([name, path]) => ({
input: `./dist/browser/src/${path}.js`,
context: 'self',
output: {
format: 'es',
file: './dist/workers/SignatureValidationWorker.browser.mjs',
file: `./dist/workers/${name}.browser.mjs`,
inlineDynamicImports: true,
sourcemap: true,
},
plugins: [
Expand All @@ -259,5 +276,5 @@ function workerBrowser(): RollupOptions {
],
external: [],
onwarn,
}
}))
}
11 changes: 11 additions & 0 deletions packages/sdk/src/_browser/createSigningWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Browser-specific signing worker factory.
*/
import Worker from 'web-worker'

export function createSigningWorker(): InstanceType<typeof Worker> {
return new Worker(
new URL('./workers/SigningWorker.browser.mjs', import.meta.url),
{ type: 'module' }
)
}
12 changes: 12 additions & 0 deletions packages/sdk/src/_jest/createSigningWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* Jest-specific signing worker factory.
* Points to the built worker in dist/ for testing.
*/
import Worker from 'web-worker'

export function createSigningWorker(): InstanceType<typeof Worker> {
return new Worker(
new URL('../../dist/workers/SigningWorker.node.mjs', import.meta.url),
{ type: 'module' }
)
}
12 changes: 12 additions & 0 deletions packages/sdk/src/_karma/createSigningWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* Karma-specific signing worker factory.
* Points to the built worker in dist/ for browser testing.
*/
import Worker from 'web-worker'

export function createSigningWorker(): InstanceType<typeof Worker> {
return new Worker(
new URL('../../dist/workers/SigningWorker.browser.mjs', import.meta.url),
{ type: 'module' }
)
}
11 changes: 11 additions & 0 deletions packages/sdk/src/_nodejs/createSigningWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* Node.js-specific signing worker factory.
*/
import Worker from 'web-worker'

export function createSigningWorker(): InstanceType<typeof Worker> {
return new Worker(
new URL('./workers/SigningWorker.node.mjs', import.meta.url),
{ type: 'module' }
)
}
2 changes: 2 additions & 0 deletions packages/sdk/src/exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export {
ConfigInjectionToken,
} from './ConfigTypes'

export { DestroySignal } from './DestroySignal'
export { DEFAULT_KEY_TYPE } from './identity/IdentityMapping'
export { GroupKey as EncryptionKey } from './encryption/GroupKey'
export type { UpdateEncryptionKeyOptions } from './encryption/LocalGroupKeyStore'
Expand All @@ -60,6 +61,7 @@ export { EthereumProviderIdentity } from './identity/EthereumProviderIdentity'
export { MLDSAKeyPairIdentity } from './identity/MLDSAKeyPairIdentity'
export { ECDSAKeyPairIdentity } from './identity/ECDSAKeyPairIdentity'
export { MessageSigner } from './signature/MessageSigner'
export { SigningService } from './signature/SigningService'
export { RpcProviderSource } from './RpcProviderSource'

export { convertBytesToStreamMessage, convertStreamMessageToBytes } from './protocol/oldStreamMessageBinaryUtils'
Expand Down
9 changes: 9 additions & 0 deletions packages/sdk/src/identity/Identity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,13 @@ export abstract class Identity {
abstract getSignatureType(): SignatureType
abstract createMessageSignature(payload: Uint8Array): Promise<Uint8Array>
abstract getTransactionSigner(rpcProviderSource: RpcProviderSource): Promise<SignerWithProvider>

/**
* Returns the private key if this identity supports worker-based signing.
* Returns undefined for identities that rely on external signers (e.g. browser wallets).
*/
// eslint-disable-next-line class-methods-use-this
getPrivateKey(): Uint8Array | undefined {
return undefined
}
}
2 changes: 1 addition & 1 deletion packages/sdk/src/identity/KeyPairIdentity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export abstract class KeyPairIdentity extends Identity {
return this.publicKeyString
}

async getPrivateKey(): Promise<Uint8Array> {
override getPrivateKey(): Uint8Array {
return this.privateKey
}

Expand Down
38 changes: 36 additions & 2 deletions packages/sdk/src/signature/MessageSigner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,58 @@ import { Identity, IdentityInjectionToken } from '../identity/Identity'
import { StreamMessage, StreamMessageOptions } from '../protocol/StreamMessage'
import { createSignaturePayload } from './createSignaturePayload'
import { SignatureType } from '@streamr/trackerless-network'
import { SigningService } from './SigningService'

@scoped(Lifecycle.ContainerScoped)
export class MessageSigner {
private readonly identity: Identity
private readonly signingService: SigningService

constructor(@inject(IdentityInjectionToken) identity: Identity) {
constructor(
@inject(IdentityInjectionToken) identity: Identity,
signingService: SigningService
) {
this.identity = identity
this.signingService = signingService
}

async createSignedMessage(
opts: MarkRequired<Omit<StreamMessageOptions, 'signature' | 'signatureType'>, 'messageType'>,
signatureType: SignatureType
): Promise<StreamMessage> {
const signature = await this.identity.createMessageSignature(createSignaturePayload(opts))
let signature: Uint8Array

// Use worker-based signing if identity provides a private key and signature type supports it
// ERC-1271 signatures require on-chain verification and must be handled by the identity directly
const privateKey = this.identity.getPrivateKey()
if (privateKey !== undefined && signatureType !== SignatureType.ERC_1271) {
signature = await this.createSignatureInWorker(opts, signatureType, privateKey)
} else {
signature = await this.identity.createMessageSignature(createSignaturePayload(opts))
}

return new StreamMessage({
...opts,
signature,
signatureType
})
}

private async createSignatureInWorker(
opts: MarkRequired<Omit<StreamMessageOptions, 'signature' | 'signatureType'>, 'messageType'>,
signatureType: SignatureType,
privateKey: Uint8Array
): Promise<Uint8Array> {
const result = await this.signingService.sign({
payloadInput: opts,
privateKey,
signatureType
})

if (result.type === 'error') {
throw new Error(`Signing failed: ${result.message}`)
}

return result.signature
}
}
46 changes: 46 additions & 0 deletions packages/sdk/src/signature/SigningService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Singleton signing service using Web Worker.
* This offloads CPU-intensive cryptographic operations to a separate thread.
* Works in both browser and Node.js environments via platform-specific config.
*
* The worker is lazily initialized on first use and shared across all MessageSigner instances.
*/
import { wrap, releaseProxy, type Remote } from 'comlink'
import { Lifecycle, scoped } from 'tsyringe'
import { createSigningWorker } from '@/createSigningWorker'
import { SigningResult, SigningRequest } from './signingUtils'
import type { SigningWorkerApi } from './SigningWorker'
import { DestroySignal } from '../DestroySignal'

@scoped(Lifecycle.ContainerScoped)
export class SigningService {
private worker: ReturnType<typeof createSigningWorker> | undefined
private workerApi: Remote<SigningWorkerApi> | undefined

constructor(destroySignal: DestroySignal) {
destroySignal.onDestroy.listen(() => this.destroy())
}

private getWorkerApi(): Remote<SigningWorkerApi> {
if (this.workerApi === undefined) {
this.worker = createSigningWorker()
this.workerApi = wrap<SigningWorkerApi>(this.worker)
}
return this.workerApi
}

async sign(request: SigningRequest): Promise<SigningResult> {
return this.getWorkerApi().createSignature(request)
}

destroy(): void {
if (this.workerApi !== undefined) {
this.workerApi[releaseProxy]()
this.workerApi = undefined
}
if (this.worker !== undefined) {
this.worker.terminate()
this.worker = undefined
}
}
}
22 changes: 22 additions & 0 deletions packages/sdk/src/signature/SigningWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { expose, transfer } from 'comlink'
import {
createSignatureFromData,
SigningResult,
SigningRequest,
} from './signingUtils'

const workerApi = {
createSignature: async (
request: SigningRequest
): Promise<SigningResult> => {
const result = await createSignatureFromData(request)
if (result.type === 'success') {
return transfer(result, [result.signature.buffer])
}
return result
},
}

export type SigningWorkerApi = typeof workerApi

expose(workerApi)
9 changes: 7 additions & 2 deletions packages/sdk/src/signature/createSignaturePayload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ export interface MessageRefLike {
sequenceNumber: number
}

export const createSignaturePayload = (opts: {
/**
* Input data for creating a signature payload.
*/
export interface SignaturePayloadInput {
messageId: MessageIdLike
content: Uint8Array
messageType: StreamMessageType
prevMsgRef?: MessageRefLike
newGroupKey?: EncryptedGroupKey
}): Uint8Array | never => {
}

export const createSignaturePayload = (opts: SignaturePayloadInput): Uint8Array | never => {
const header = Buffer.concat([
Buffer.from(`${opts.messageId.streamId}${opts.messageId.streamPartition}${opts.messageId.timestamp}`
+ `${opts.messageId.sequenceNumber}${opts.messageId.publisherId}${opts.messageId.msgChainId}`),
Expand Down
Loading