Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/platform-bun/src/BunClusterHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export const layer = <
),
Layer.provide(ShardingConfig.layerFromEnv(options?.shardingConfig)),
Layer.provide(
options?.serialization === "ndjson" ? RpcSerialization.layerNdjson : RpcSerialization.layerMsgPack
options?.serialization === "ndjson" ? RpcSerialization.layerNdjson : RpcSerialization.layerMsgPack()
)
) as any
}
2 changes: 1 addition & 1 deletion packages/platform-bun/src/BunClusterSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export const layer = <
),
Layer.provide(ShardingConfig.layerFromEnv(options?.shardingConfig)),
Layer.provide(
options?.serialization === "ndjson" ? RpcSerialization.layerNdjson : RpcSerialization.layerMsgPack
options?.serialization === "ndjson" ? RpcSerialization.layerNdjson : RpcSerialization.layerMsgPack()
)
) as any
}
2 changes: 1 addition & 1 deletion packages/platform-node/src/NodeClusterHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export const layer = <
),
Layer.provide(ShardingConfig.layerFromEnv(options?.shardingConfig)),
Layer.provide(
options?.serialization === "ndjson" ? RpcSerialization.layerNdjson : RpcSerialization.layerMsgPack
options?.serialization === "ndjson" ? RpcSerialization.layerNdjson : RpcSerialization.layerMsgPack()
)
) as any
}
Expand Down
2 changes: 1 addition & 1 deletion packages/platform-node/src/NodeClusterSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export const layer = <
),
Layer.provide(ShardingConfig.layerFromEnv(options?.shardingConfig)),
Layer.provide(
options?.serialization === "ndjson" ? RpcSerialization.layerNdjson : RpcSerialization.layerMsgPack
options?.serialization === "ndjson" ? RpcSerialization.layerNdjson : RpcSerialization.layerMsgPack()
)
) as any
}
Expand Down
6 changes: 3 additions & 3 deletions packages/platform-node/test/RpcServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe("RpcServer", () => {
"e2e http msgpack",
HttpNdjsonClient.pipe(
Layer.provideMerge(HttpNdjsonServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack])
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack()])
)
)
e2eSuite(
Expand Down Expand Up @@ -76,7 +76,7 @@ describe("RpcServer", () => {
"e2e ws msgpack",
HttpWsClient.pipe(
Layer.provideMerge(HttpWsServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack])
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack()])
)
)
e2eSuite(
Expand Down Expand Up @@ -113,7 +113,7 @@ describe("RpcServer", () => {
"e2e tcp msgpack",
TcpClient.pipe(
Layer.provideMerge(TcpServer),
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack])
Layer.provide([NodeHttpServer.layerTest, RpcSerialization.layerMsgPack()])
)
)
e2eSuite(
Expand Down
92 changes: 60 additions & 32 deletions packages/rpc/src/RpcSerialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,43 +352,66 @@ interface JsonRpcResponse {
type JsonRpcMessage = JsonRpcRequest | JsonRpcResponse

/**
* Create a MessagePack serialization with custom msgpackr options.
*
* On Cloudflare Workers with `allow_eval_during_startup` (default for
* `compatibility_date >= 2025-06-01`), pass `{ useRecords: false }` to
* prevent msgpackr's JIT code generation via `new Function()`, which is
* blocked during request handling.
*
* @since 1.0.0
* @category serialization
* @example
* import { Layer } from "effect"
* import { RpcSerialization } from "@effect/rpc"
*
* // Cloudflare Workers
* Layer.succeed(
* RpcSerialization,
* RpcSerialization.makeMsgPack({ useRecords: false })
* )
*/
export const msgPack: RpcSerialization["Type"] = RpcSerialization.of({
contentType: "application/msgpack",
includesFraming: true,
unsafeMake: () => {
const unpackr = new Msgpackr.Unpackr()
const packr = new Msgpackr.Packr()
const encoder = new TextEncoder()
let incomplete: Uint8Array | undefined = undefined
return {
decode: (bytes) => {
let buf = typeof bytes === "string" ? encoder.encode(bytes) : bytes
if (incomplete !== undefined) {
const prev = buf
bytes = new Uint8Array(incomplete.length + buf.length)
bytes.set(incomplete)
bytes.set(prev, incomplete.length)
buf = bytes
incomplete = undefined
}
try {
return unpackr.unpackMultiple(buf)
} catch (error_) {
const error = error_ as any
if (error.incomplete) {
incomplete = buf.subarray(error.lastPosition)
return error.values ?? []
export const makeMsgPack = (options: Msgpackr.Options): RpcSerialization["Type"] =>
RpcSerialization.of({
contentType: "application/msgpack",
includesFraming: true,
unsafeMake: () => {
const unpackr = new Msgpackr.Unpackr(options)
const packr = new Msgpackr.Packr(options)
const encoder = new TextEncoder()
let incomplete: Uint8Array | undefined = undefined
return {
decode: (bytes) => {
let buf = typeof bytes === "string" ? encoder.encode(bytes) : bytes
if (incomplete !== undefined) {
const prev = buf
bytes = new Uint8Array(incomplete.length + buf.length)
bytes.set(incomplete)
bytes.set(prev, incomplete.length)
buf = bytes
incomplete = undefined
}
return []
}
},
encode: (response) => packr.pack(response)
try {
return unpackr.unpackMultiple(buf)
} catch (error_) {
const error = error_ as any
if (error.incomplete) {
incomplete = buf.subarray(error.lastPosition)
return error.values ?? []
}
throw error_
}
},
encode: (response) => packr.pack(response)
}
}
}
})
})

/**
* @since 1.0.0
* @category serialization
*/
export const msgPack: RpcSerialization["Type"] = makeMsgPack({})

/**
* A rpc serialization layer that uses JSON for serialization.
Expand Down Expand Up @@ -439,6 +462,11 @@ export const layerNdJsonRpc = (options?: {
* MessagePack has a more compact binary format compared to JSON and NDJSON. It
* also has better support for binary data.
*
* On Cloudflare Workers with `allow_eval_during_startup` (default for
* `compatibility_date >= 2025-06-01`), pass `{ useRecords: false }` to
* prevent msgpackr's JIT code generation via `new Function()`, which is
* blocked during request handling.
*
* @since 1.0.0
* @category serialization
*/
Expand Down
61 changes: 61 additions & 0 deletions packages/rpc/test/RpcSerialization.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { RpcSerialization } from "@effect/rpc"
import { assert, describe, it } from "@effect/vitest"

describe("RpcSerialization", () => {
describe("msgPack", () => {
it("encode and decode correctly", () => {
const parser = RpcSerialization.msgPack.unsafeMake()
const payload = { _tag: "Request", id: 1, method: "echo" }
const encoded = parser.encode(payload)
const decoded = parser.decode(encoded as Uint8Array)
assert.strictEqual(decoded.length, 1)
assert.deepStrictEqual(decoded[0], payload)
})

it("handles incomplete frames gracefully", () => {
const parser = RpcSerialization.msgPack.unsafeMake()
const helper = RpcSerialization.msgPack.unsafeMake()

const msg1 = helper.encode({ a: 1 }) as Uint8Array
const msg2 = helper.encode({ b: 2 }) as Uint8Array
const combined = new Uint8Array(msg1.length + msg2.length)
combined.set(msg1)
combined.set(msg2, msg1.length)

const truncated = combined.subarray(0, msg1.length + 2)
const decoded = parser.decode(truncated)

assert.strictEqual(decoded.length, 1)
assert.deepStrictEqual(decoded[0], { a: 1 })
})
})

describe("makeMsgPack", () => {
it("useRecords false encode and decode correctly", () => {
const parser = RpcSerialization.makeMsgPack({ useRecords: false }).unsafeMake()
const payload = { _tag: "Request", id: 1, method: "echo" }
const encoded = parser.encode(payload)
const decoded = parser.decode(encoded as Uint8Array)
assert.strictEqual(decoded.length, 1)
assert.deepStrictEqual(decoded[0], payload)
})

it("useRecords false handles nested objects with repeated structures", () => {
const parser = RpcSerialization.makeMsgPack({ useRecords: false }).unsafeMake()
const payload = {
_tag: "Chunk",
requestId: "1",
values: [
{ _tag: "Exit", requestId: "1", exit: { _tag: "Success", value: { _tag: "Ok", data: "a" } } },
{ _tag: "Exit", requestId: "2", exit: { _tag: "Success", value: { _tag: "Ok", data: "b" } } },
{ _tag: "Exit", requestId: "3", exit: { _tag: "Success", value: { _tag: "Ok", data: "c" } } },
{ _tag: "Exit", requestId: "4", exit: { _tag: "Success", value: { _tag: "Ok", data: "d" } } }
]
}
const encoded = parser.encode(payload)
const decoded = parser.decode(encoded as Uint8Array)
assert.strictEqual(decoded.length, 1)
assert.deepStrictEqual(decoded[0], payload)
})
})
})
Loading