From 3b822db81268e2370bd7c94bdf10ae9426c1db93 Mon Sep 17 00:00:00 2001 From: Arman Yaraee Date: Tue, 14 Apr 2026 16:42:28 +0100 Subject: [PATCH] bug: (@effect/cluster): Add reproduction test case for infinite recursion Cluster rpc serialization causes a full infinite recursion Issue is on cluster/src/Runners.ts around L595 where the envelope's message is not encoded or for some reason the raw BigDecimal object is being packed instead of the encoded message --- packages/cluster/test/SocketRunner.test.ts | 115 +++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 packages/cluster/test/SocketRunner.test.ts diff --git a/packages/cluster/test/SocketRunner.test.ts b/packages/cluster/test/SocketRunner.test.ts new file mode 100644 index 00000000000..8b2ee1fd36d --- /dev/null +++ b/packages/cluster/test/SocketRunner.test.ts @@ -0,0 +1,115 @@ +import { ClusterSchema, Entity, MessageStorage, RunnerAddress, RunnerStorage, ShardingConfig } from "@effect/cluster" +import { NodeClusterSocket } from "@effect/platform-node" +import { Rpc, RpcSerialization } from "@effect/rpc" +import { describe, it } from "@effect/vitest" +import { BigDecimal, Effect, Layer, Logger, LogLevel, Option, PrimaryKey, Schema } from "effect" +import * as RunnerHealth from "../src/RunnerHealth.js" +import * as SocketRunner from "../src/SocketRunner.js" + +class TestPayload extends Schema.Class("TestPayload")({ + id: Schema.String, + amount: Schema.BigDecimal +}) { + [PrimaryKey.symbol]() { + return this.id + } +} + +const TestEntity = Entity + .make("TestEntity", [ + Rpc.make("Process", { + payload: TestPayload, + success: Schema.Void + }) + ]) + .annotateRpcs(ClusterSchema.Persisted, true) + .annotateRpcs(ClusterSchema.Uninterruptible, true) + +const TestEntityLayer = TestEntity.toLayer( + Effect.succeed({ + Process: () => Effect.void + }) +) + +const RUNNER_PORT = 50_123 +// Build shared storage instances once, so runner and client see the same state. +// MessageStorage.layerMemory requires ShardingConfig, so we provide a minimal one. +const SharedStorage = Layer.mergeAll( + RunnerStorage.layerMemory, + MessageStorage.layerMemory +).pipe( + Layer.provide(ShardingConfig.layerDefaults) +) + +const makeRunnerLayer = (port: number) => + TestEntityLayer.pipe( + Layer.provideMerge(SocketRunner.layer), + Layer.provide(RunnerHealth.layerNoop), + Layer.provide(NodeClusterSocket.layerSocketServer), + Layer.provide(NodeClusterSocket.layerClientProtocol), + Layer.provide(ShardingConfig.layer({ + runnerAddress: Option.some(RunnerAddress.make("localhost", port)), + entityTerminationTimeout: 0, + entityMessagePollInterval: 5000, + sendRetryInterval: 100 + })), + Layer.provide(RpcSerialization.layerMsgPack) + ) + +const makeClientLayer = (port: number) => + SocketRunner.layerClientOnly.pipe( + Layer.provide(NodeClusterSocket.layerClientProtocol), + Layer.provide(ShardingConfig.layer({ + runnerAddress: Option.some(RunnerAddress.make("localhost", port)), + runnerListenAddress: Option.some(RunnerAddress.make("localhost", port)), + entityTerminationTimeout: 0, + entityMessagePollInterval: 5000, + sendRetryInterval: 100 + })), + Layer.provide(RpcSerialization.layerMsgPack) + ) + +// BigDecimal.normalize creates a circular `normalized` self-reference. +// When a persisted message is sent with discard: true, the notify path in Runners.makeRpc +// passes the raw envelope (with circular BigDecimal payload) to the runner via msgpack, +// causing RangeError: Maximum call stack size exceeded. +describe("SocketRunner", () => { + it.scopedLive( + "entity call with BigDecimal and discard should not stack overflow", + () => + Effect.gen(function*() { + // Start the runner (with socket server and entity handler) + yield* Layer.launch(makeRunnerLayer(RUNNER_PORT)).pipe(Effect.forkScoped) + + // Give the runner time to start and acquire shards + yield* Effect.sleep("2 seconds") + yield* Effect.log("Before starting the client") + + // Send a message from the client with discard: true. + // The BigDecimal is normalized to trigger the circular `normalized` self-reference. + yield* Effect.gen(function*() { + yield* Effect.log("Starting the client") + yield* Effect.sleep("2 seconds") + const makeClient = yield* TestEntity.client + // Give the client time to discover the runner + yield* Effect.sleep("3 seconds") + const client = makeClient("entity-1") + + const amount = BigDecimal.unsafeFromString("123.45") + + yield* client.Process( + TestPayload.make({ id: "req-1", amount }), + { discard: true } + ) + }).pipe( + Effect.provide(makeClientLayer(RUNNER_PORT)), + Effect.scoped + ) + }).pipe(Effect.provide( + SharedStorage.pipe(Layer.provideMerge( + Logger.minimumLogLevel(LogLevel.None) + )) + )), + 30_000 + ) +})