From 670f20f5cd9dd17fced771bdf4b11b360050b300 Mon Sep 17 00:00:00 2001 From: Debjit Mandal Date: Sat, 23 May 2026 20:52:38 +0530 Subject: [PATCH 1/2] fix(query-orchestrator): handle __usage_N pre-aggregation aliases in table replacement --- .../src/orchestrator/QueryCache.ts | 4 +++- .../test/unit/QueryCache.abstract.ts | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 70b04ade38ddb..37e258d1aa112 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -428,7 +428,9 @@ export class QueryCache { const [keyQuery, params, queryOptions] = Array.isArray(queryAndParams) ? queryAndParams : [queryAndParams, []]; - const replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce( + const sortedPreAggregationsTablesToTempTables = [...preAggregationsTablesToTempTables] + .sort((a, b) => b[0].length - a[0].length); + const replacedKeyQuery: string = sortedPreAggregationsTablesToTempTables.reduce( (query, [tableName, { targetTableName }]) => QueryCache.replaceAll(tableName, targetTableName, query), keyQuery ); diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts index 5c77fd3333e8c..65f38b3494aac 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts @@ -430,5 +430,19 @@ export const QueryCacheTest = (name: string, options: QueryCacheTestOptions) => // @ts-ignore expect(key4.persistent).toEqual(false); }); + + it('replacePreAggregationTableNames replaces usage aliases before the base table', () => { + const result = QueryCache.replacePreAggregationTableNames( + 'SELECT * FROM prod_pre_aggregations.ra_client_m_abc__usage_0 JOIN prod_pre_aggregations.ra_client_m_abc ON true', + [ + ['ra_client_m_abc', { targetTableName: 'base_table' }], + ['ra_client_m_abc__usage_0', { targetTableName: 'usage_table' }], + ] + ); + + expect(result).toEqual( + 'SELECT * FROM prod_pre_aggregations.usage_table JOIN prod_pre_aggregations.base_table ON true' + ); + }); }); }; From f96312b9160c4be9fbd8169fc80cbbb301184635 Mon Sep 17 00:00:00 2001 From: Debjit Mandal Date: Sat, 23 May 2026 20:59:26 +0530 Subject: [PATCH 2/2] refactor: remove LoggerFn import from shared and create a new Logger module --- .../src/orchestrator/Logger.ts | 5 +++++ .../src/orchestrator/PreAggregationLoader.ts | 3 ++- .../PreAggregationPartitionRangeLoader.ts | 2 +- .../src/orchestrator/PreAggregations.ts | 3 ++- .../src/orchestrator/QueryCache.ts | 2 +- .../src/orchestrator/QueryOrchestrator.ts | 3 ++- .../src/orchestrator/QueryQueue.ts | 16 +++++++++------- .../test/unit/QueryQueue.abstract.ts | 7 +++++-- tsconfig.base.json | 4 +++- 9 files changed, 30 insertions(+), 15 deletions(-) create mode 100644 packages/cubejs-query-orchestrator/src/orchestrator/Logger.ts diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/Logger.ts b/packages/cubejs-query-orchestrator/src/orchestrator/Logger.ts new file mode 100644 index 0000000000000..3cd16b4dae087 --- /dev/null +++ b/packages/cubejs-query-orchestrator/src/orchestrator/Logger.ts @@ -0,0 +1,5 @@ +export type LoggerFnParams = { + [key: string]: any; +}; + +export type LoggerFn = (msg: string, params: LoggerFnParams) => void; \ No newline at end of file diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts index a478fce68cb16..e7780ba73191f 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts @@ -1,6 +1,6 @@ import R from 'ramda'; import crypto from 'crypto'; -import { getEnv, MaybeCancelablePromise, LoggerFn } from '@cubejs-backend/shared'; +import { getEnv, MaybeCancelablePromise } from '@cubejs-backend/shared'; import { cancelCombinator, DownloadQueryResultsResult, @@ -13,6 +13,7 @@ import { UnloadOptions } from '@cubejs-backend/base-driver'; import { DriverFactory } from './DriverFactory'; +import { LoggerFn } from './Logger'; import { PreAggTableToTempTableNames, QueryCache, QueryWithParams } from './QueryCache'; import { ContinueWaitError } from './ContinueWaitError'; import { LargeStreamWarning } from './StreamObjectsCounter'; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts index 9c63e26f0128f..225182a1beba1 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts @@ -10,7 +10,6 @@ import { timeSeries, localTimestampToUtc, parseUtcIntoLocalDate, - LoggerFn, } from '@cubejs-backend/shared'; import { InlineTable, TableStructure } from '@cubejs-backend/base-driver'; import { DriverFactory } from './DriverFactory'; @@ -28,6 +27,7 @@ import { } from './PreAggregations'; import { PreAggregationLoader } from './PreAggregationLoader'; import { PreAggregationLoadCache } from './PreAggregationLoadCache'; +import { LoggerFn } from './Logger'; const DEFAULT_TS_FORMAT = 'YYYY-MM-DDTHH:mm:ss.SSS'; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index fc890b7df1b6c..47f53f8ca2981 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -1,6 +1,6 @@ import R from 'ramda'; import crypto from 'crypto'; -import { getEnv, LoggerFn } from '@cubejs-backend/shared'; +import { getEnv } from '@cubejs-backend/shared'; import { BaseDriver, InlineTable, } from '@cubejs-backend/base-driver'; import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; @@ -13,6 +13,7 @@ import { CacheAndQueryDriverType } from './QueryOrchestrator'; import { PreAggregationPartitionRangeLoader } from './PreAggregationPartitionRangeLoader'; import { PreAggregationLoader } from './PreAggregationLoader'; import { PreAggregationLoadCache } from './PreAggregationLoadCache'; +import { LoggerFn } from './Logger'; /// Name of the inline table containing the lambda rows. export const LAMBDA_TABLE_PREFIX = 'lambda'; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 37e258d1aa112..3b75a3e5e905e 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -8,7 +8,6 @@ import { MaybeCancelablePromise, streamToArray, CacheMode, - LoggerFn, } from '@cubejs-backend/shared'; import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import { @@ -23,6 +22,7 @@ import { QueryQueue, QueryQueueOptions } from './QueryQueue'; import { ContinueWaitError } from './ContinueWaitError'; import { LocalCacheDriver } from './LocalCacheDriver'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; +import { LoggerFn } from './Logger'; import { LoadPreAggregationResult, PreAggregationDescription } from './PreAggregations'; import { getCacheHash } from './utils'; import { CacheAndQueryDriverType, MetadataOperationType } from './QueryOrchestrator'; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index e257579fa5397..ac54d6f095b58 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -1,6 +1,6 @@ import * as stream from 'stream'; import R from 'ramda'; -import { CacheMode, getEnv, LoggerFn } from '@cubejs-backend/shared'; +import { CacheMode, getEnv } from '@cubejs-backend/shared'; import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import { QuerySchemasResult, @@ -12,6 +12,7 @@ import { import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable, QueryWithParams, CacheKey } from './QueryCache'; import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations'; import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory'; +import { LoggerFn } from './Logger'; import { QueryStream } from './QueryStream'; export type CacheAndQueryDriverType = 'memory' | 'cubestore' | /** removed, used for exception */ 'redis'; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts index 19f0dcbeba660..73d98fb536348 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.ts @@ -1,5 +1,5 @@ import { EventEmitter } from 'events'; -import { getEnv, getProcessUid, LoggerFn } from '@cubejs-backend/shared'; +import { getEnv, getProcessUid } from '@cubejs-backend/shared'; import { QueueDriverInterface, QueryKey, @@ -14,6 +14,7 @@ import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver'; import { TimeoutError } from './TimeoutError'; import { ContinueWaitError } from './ContinueWaitError'; import { LocalQueueDriver } from './LocalQueueDriver'; +import { LoggerFn } from './Logger'; import { QueryStream } from './QueryStream'; import { CacheAndQueryDriverType } from './QueryOrchestrator'; @@ -192,11 +193,12 @@ export class QueryQueue { queueId: this.generateQueueId(), ...executeOptions, }; - - if (options.requestId) { - const idx = options.requestId.lastIndexOf('-span-'); - options.externalId = idx !== -1 ? options.requestId.substring(0, idx) : options.requestId; - } + const externalId = options.requestId + ? (() => { + const idx = options.requestId.lastIndexOf('-span-'); + return idx !== -1 ? options.requestId.substring(0, idx) : options.requestId; + })() + : undefined; if (this.skipQueue) { const queryDef = { @@ -238,7 +240,7 @@ export class QueryQueue { // Result here won't be fetched for a forced build query and a jobbed build // query (initialized by the /cubejs-system/v1/pre-aggregations/jobs // endpoint). - let result = !query.forceBuild && await queueConnection.getResult(queryKey, options.externalId); + let result = !query.forceBuild && await (queueConnection as any).getResult(queryKey, externalId); if (result && !result.streamResult) { return this.parseResult(result); } diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 8c9d202516685..e6c9f77ece9ab 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -3,12 +3,15 @@ import crypto from 'crypto'; import type { QueryKey, QueueDriverInterface } from '@cubejs-backend/base-driver'; import { pausePromise } from '@cubejs-backend/shared'; -import { CubestoreQueueDriverConnection } from '@cubejs-backend/cubestore-driver'; import { QueryQueue, QueryQueueOptions } from '../../src'; import { ContinueWaitError } from '../../src/orchestrator/ContinueWaitError'; import { processUidRE } from '../../src/orchestrator/utils'; +type CubestoreQueueDriverConnectionLike = { + useExternalId(): Promise; +}; + export type QueryQueueTestOptions = Pick & { beforeAll?: () => Promise, afterAll?: () => Promise, @@ -465,7 +468,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) => test('useExternalId should return true', async () => { const connection = await queue.queueDriver.createConnection(); try { - expect(await (connection as CubestoreQueueDriverConnection).useExternalId()).toBe(true); + expect(await (connection as unknown as CubestoreQueueDriverConnectionLike).useExternalId()).toBe(true); } finally { queue.queueDriver.release(connection); } diff --git a/tsconfig.base.json b/tsconfig.base.json index e2007451bb64b..0c0676c1dbf2f 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -13,8 +13,10 @@ "baseUrl": ".", "paths": { "@cubejs-backend/*": [ - "rust/js-wrapper", + "packages/cubejs-backend-shared/src", + "packages/cubejs-base-driver/src", "packages/cubejs-cubestore-driver/src", + "rust/js-wrapper", "packages/cubejs-api-gateway/src", "packages/cubejs-cli/src", "packages/cubejs-query-orchestrator/src",