Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion packages/cubejs-backend-native/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,30 @@ fn js_stream_push_chunk(mut cx: FunctionContext) -> JsResult<JsUndefined> {
cx,
handle: chunk_array,
};
// Handle transform errors via `reject()` rather than `.unwrap()` — a
// panic here bubbles up as an unhandled rejection on Node's tick queue
// and crashes the process. We signal both ends of the FFI bridge: the
// Rust downstream via `reject()` (so the wire layer can emit a Postgres
// `ErrorResponse`) and the JS callback via the same async wrapper used
// on the success path (avoids mixing sync/async stream-callback timing).
let value =
transform_response(&mut value_object, this.schema.clone(), &this.member_fields).unwrap();
match transform_response(&mut value_object, this.schema.clone(), &this.member_fields) {
Ok(v) => v,
Err(e) => {
let err_msg = e.message.to_string();
this.reject(err_msg.clone());

let err_future = async move { Err(CubeError::internal(err_msg)) };
wait_for_future_and_execute_callback(
this.tokio_handle.clone(),
value_object.cx.channel(),
callback,
err_future,
);

return Ok(value_object.cx.undefined());
}
};
let future = this.push_chunk(value);
wait_for_future_and_execute_callback(
this.tokio_handle.clone(),
Expand Down
37 changes: 37 additions & 0 deletions packages/cubejs-backend-native/test/response-fake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,40 @@ export class FakeRowStream extends stream.Readable {
}
}
}

// Pushes rows whose field values are unsupported types (nested arrays) for the
// native bridge's JsValueObject::get. Used to trigger a deterministic
// `transform_response` failure on the Rust side without depending on a
// specific backing database's error path.
//
// Each pushed row has the shape `{ <dimension>: [<nested>, <array>] }` — the
// bridge's primitive-type extractor rejects JsArray values and returns
// `Err(...)`. The error must flow through `JsWriteStream::reject` rather than
// panicking; see cube-js/cube#10875.
export class FakeMalformedRowStream extends stream.Readable {
protected readonly fieldNames: string[];

public constructor(query: any) {
super({
objectMode: true,
highWaterMark: 1024,
});
this.fieldNames = [
...(query.dimensions || []),
...(query.measures || []),
];
if (this.fieldNames.length === 0) {
throw new Error('FakeMalformedRowStream requires at least one dimension or measure');
}
this.setMaxListeners(10);
}

public _read(_size: number) {
const row: Record<string, unknown> = {};
for (const field of this.fieldNames) {
row[field] = [1, 2, 3];
}
this.push(row);
this.push(null);
}
}
102 changes: 101 additions & 1 deletion packages/cubejs-backend-native/test/sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Writable } from 'stream';

import * as native from '../js';
import metaFixture from './meta';
import { FakeRowStream } from './response-fake';
import { FakeRowStream, FakeMalformedRowStream } from './response-fake';

const _logger = jest.fn(({ event }) => {
if (
Expand Down Expand Up @@ -423,4 +423,104 @@ describe('SQLInterface', () => {

await native.shutdownInterface(instance, 'fast');
});

// Regression test for cube-js/cube#10875.
//
// When a chunk pushed from the JS streaming layer fails to decode (the
// native bridge's `transform_response` returns Err — e.g. because the
// backing database returned an error and the resulting chunk's shape
// doesn't match the declared schema), the bridge must NOT panic. The
// previous behaviour was an unguarded `.unwrap()` at
// `packages/cubejs-backend-native/src/stream.rs:258` that surfaced as a
// Node unhandled rejection and killed the cubesql process — dropping
// every concurrent BI session.
//
// The fix routes the transform error through the existing `reject()`
// channel so the downstream wire layer can emit a structured Postgres
// `ErrorResponse` instead of tearing the TCP connection. This test
// exercises that path by pushing a chunk whose field values are
// unsupported types (nested arrays); `transform_response` rejects them,
// and we assert that the process stays up and an error row reaches the
// output writable.
it('does not crash node when a stream chunk fails to transform', async () => {
if (process.env.CUBESQL_STREAM_MODE !== 'true') {
expect(process.env.CUBESQL_STREAM_MODE).toBeFalsy();
return;
}

const baseMethods = interfaceMethods();
const instance = await native.registerInterface({
pgPort: 5556,
...baseMethods,
sqlApiLoad: jest.fn(async ({ query, streaming }) => {
if (streaming) {
return { stream: new FakeMalformedRowStream(query) };
}
return { error: 'non-streaming path not exercised by this test' };
}),
stream: jest.fn(async ({ query }) => ({
stream: new FakeMalformedRowStream(query),
})),
canSwitchUserForSession: (_payload) => true,
});

let buf = '';
const errors: string[] = [];

const write = jest.fn((chunk, _, callback) => {
const lines = (buf + chunk.toString('utf-8')).split('\n');
buf = lines.pop() || '';

for (const line of lines.filter((it) => it.trim().length)) {
const json = JSON.parse(line);
if (json.error) {
errors.push(json.error);
}
}

callback();
});

const cubeSqlStream = new Writable({ write });

// The error from `transform_response` should propagate without crashing
// the Node runtime. `execSql` may resolve with an error result rather
// than reject — either is acceptable for this regression: what we're
// asserting is "no process crash, error surfaces cleanly".
try {
// LIMIT > non_streaming_query_max_row_limit (default 50000) forces
// the SQL API to take the streaming path, exercising the chunk-push
// bridge that the fix targets.
await native.execSql(
instance,
'SELECT order_date FROM KibanaSampleDataEcommerce ORDER BY order_date DESC LIMIT 100000;',
cubeSqlStream
);
} catch (_e) {
// Expected — execSql may surface the transform error.
}

if (buf.length > 0) {
const json = JSON.parse(buf);
if (json.error) {
errors.push(json.error);
}
}

expect(errors.length).toBeGreaterThan(0);

// Discriminate between (a) clean `reject()` propagation — the fix —
// and (b) Neon catching a panic from the old `.unwrap()` and surfacing
// it post-hoc. Both paths happen to include the underlying message in
// the wire output, so a positive-only assertion does NOT distinguish
// the regression. The Neon-wrapped panic adds an "internal error in
// Neon module: called Result::unwrap() on an Err value" prefix; its
// absence is the load-bearing signal here. Couples to Neon's panic
// formatting, but that's a stable, well-defined string in practice.
const errorBlob = errors.join('\n');
expect(errorBlob).toContain('Expected primitive value');
expect(errorBlob).not.toContain('internal error in Neon module');

await native.shutdownInterface(instance, 'fast');
});
});
Loading