diff --git a/packages/cubejs-backend-native/src/stream.rs b/packages/cubejs-backend-native/src/stream.rs index 8dcac034d9608..30ce680fe30ec 100644 --- a/packages/cubejs-backend-native/src/stream.rs +++ b/packages/cubejs-backend-native/src/stream.rs @@ -254,8 +254,30 @@ fn js_stream_push_chunk(mut cx: FunctionContext) -> JsResult { cx, handle: chunk_array, }; + // Handle transform errors via `reject()` rather than `.unwrap()` — a + // panic here bubbles up as an unhandled rejection on Node's tick queue + // and crashes the process. We signal both ends of the FFI bridge: the + // Rust downstream via `reject()` (so the wire layer can emit a Postgres + // `ErrorResponse`) and the JS callback via the same async wrapper used + // on the success path (avoids mixing sync/async stream-callback timing). let value = - transform_response(&mut value_object, this.schema.clone(), &this.member_fields).unwrap(); + match transform_response(&mut value_object, this.schema.clone(), &this.member_fields) { + Ok(v) => v, + Err(e) => { + let err_msg = e.message.to_string(); + this.reject(err_msg.clone()); + + let err_future = async move { Err(CubeError::internal(err_msg)) }; + wait_for_future_and_execute_callback( + this.tokio_handle.clone(), + value_object.cx.channel(), + callback, + err_future, + ); + + return Ok(value_object.cx.undefined()); + } + }; let future = this.push_chunk(value); wait_for_future_and_execute_callback( this.tokio_handle.clone(), diff --git a/packages/cubejs-backend-native/test/response-fake.ts b/packages/cubejs-backend-native/test/response-fake.ts index 9ae954b14a4ad..3733360259ec6 100644 --- a/packages/cubejs-backend-native/test/response-fake.ts +++ b/packages/cubejs-backend-native/test/response-fake.ts @@ -112,3 +112,40 @@ export class FakeRowStream extends stream.Readable { } } } + +// Pushes rows whose field values are unsupported types (nested arrays) for the +// native bridge's JsValueObject::get. Used to trigger a deterministic +// `transform_response` failure on the Rust side without depending on a +// specific backing database's error path. +// +// Each pushed row has the shape `{ : [, ] }` — the +// bridge's primitive-type extractor rejects JsArray values and returns +// `Err(...)`. The error must flow through `JsWriteStream::reject` rather than +// panicking; see cube-js/cube#10875. +export class FakeMalformedRowStream extends stream.Readable { + protected readonly fieldNames: string[]; + + public constructor(query: any) { + super({ + objectMode: true, + highWaterMark: 1024, + }); + this.fieldNames = [ + ...(query.dimensions || []), + ...(query.measures || []), + ]; + if (this.fieldNames.length === 0) { + throw new Error('FakeMalformedRowStream requires at least one dimension or measure'); + } + this.setMaxListeners(10); + } + + public _read(_size: number) { + const row: Record = {}; + for (const field of this.fieldNames) { + row[field] = [1, 2, 3]; + } + this.push(row); + this.push(null); + } +} diff --git a/packages/cubejs-backend-native/test/sql.test.ts b/packages/cubejs-backend-native/test/sql.test.ts index ad4e2771cb00c..287d9a599b201 100644 --- a/packages/cubejs-backend-native/test/sql.test.ts +++ b/packages/cubejs-backend-native/test/sql.test.ts @@ -4,7 +4,7 @@ import { Writable } from 'stream'; import * as native from '../js'; import metaFixture from './meta'; -import { FakeRowStream } from './response-fake'; +import { FakeRowStream, FakeMalformedRowStream } from './response-fake'; const _logger = jest.fn(({ event }) => { if ( @@ -423,4 +423,104 @@ describe('SQLInterface', () => { await native.shutdownInterface(instance, 'fast'); }); + + // Regression test for cube-js/cube#10875. + // + // When a chunk pushed from the JS streaming layer fails to decode (the + // native bridge's `transform_response` returns Err — e.g. because the + // backing database returned an error and the resulting chunk's shape + // doesn't match the declared schema), the bridge must NOT panic. The + // previous behaviour was an unguarded `.unwrap()` at + // `packages/cubejs-backend-native/src/stream.rs:258` that surfaced as a + // Node unhandled rejection and killed the cubesql process — dropping + // every concurrent BI session. + // + // The fix routes the transform error through the existing `reject()` + // channel so the downstream wire layer can emit a structured Postgres + // `ErrorResponse` instead of tearing the TCP connection. This test + // exercises that path by pushing a chunk whose field values are + // unsupported types (nested arrays); `transform_response` rejects them, + // and we assert that the process stays up and an error row reaches the + // output writable. + it('does not crash node when a stream chunk fails to transform', async () => { + if (process.env.CUBESQL_STREAM_MODE !== 'true') { + expect(process.env.CUBESQL_STREAM_MODE).toBeFalsy(); + return; + } + + const baseMethods = interfaceMethods(); + const instance = await native.registerInterface({ + pgPort: 5556, + ...baseMethods, + sqlApiLoad: jest.fn(async ({ query, streaming }) => { + if (streaming) { + return { stream: new FakeMalformedRowStream(query) }; + } + return { error: 'non-streaming path not exercised by this test' }; + }), + stream: jest.fn(async ({ query }) => ({ + stream: new FakeMalformedRowStream(query), + })), + canSwitchUserForSession: (_payload) => true, + }); + + let buf = ''; + const errors: string[] = []; + + const write = jest.fn((chunk, _, callback) => { + const lines = (buf + chunk.toString('utf-8')).split('\n'); + buf = lines.pop() || ''; + + for (const line of lines.filter((it) => it.trim().length)) { + const json = JSON.parse(line); + if (json.error) { + errors.push(json.error); + } + } + + callback(); + }); + + const cubeSqlStream = new Writable({ write }); + + // The error from `transform_response` should propagate without crashing + // the Node runtime. `execSql` may resolve with an error result rather + // than reject — either is acceptable for this regression: what we're + // asserting is "no process crash, error surfaces cleanly". + try { + // LIMIT > non_streaming_query_max_row_limit (default 50000) forces + // the SQL API to take the streaming path, exercising the chunk-push + // bridge that the fix targets. + await native.execSql( + instance, + 'SELECT order_date FROM KibanaSampleDataEcommerce ORDER BY order_date DESC LIMIT 100000;', + cubeSqlStream + ); + } catch (_e) { + // Expected — execSql may surface the transform error. + } + + if (buf.length > 0) { + const json = JSON.parse(buf); + if (json.error) { + errors.push(json.error); + } + } + + expect(errors.length).toBeGreaterThan(0); + + // Discriminate between (a) clean `reject()` propagation — the fix — + // and (b) Neon catching a panic from the old `.unwrap()` and surfacing + // it post-hoc. Both paths happen to include the underlying message in + // the wire output, so a positive-only assertion does NOT distinguish + // the regression. The Neon-wrapped panic adds an "internal error in + // Neon module: called Result::unwrap() on an Err value" prefix; its + // absence is the load-bearing signal here. Couples to Neon's panic + // formatting, but that's a stable, well-defined string in practice. + const errorBlob = errors.join('\n'); + expect(errorBlob).toContain('Expected primitive value'); + expect(errorBlob).not.toContain('internal error in Neon module'); + + await native.shutdownInterface(instance, 'fast'); + }); });