Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export type LoggerFnParams = {
[key: string]: any;
};

export type LoggerFn = (msg: string, params: LoggerFnParams) => void;
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import R from 'ramda';
import crypto from 'crypto';
import { getEnv, MaybeCancelablePromise, LoggerFn } from '@cubejs-backend/shared';
import { getEnv, MaybeCancelablePromise } from '@cubejs-backend/shared';
import {
cancelCombinator,
DownloadQueryResultsResult,
Expand All @@ -13,6 +13,7 @@ import {
UnloadOptions
} from '@cubejs-backend/base-driver';
import { DriverFactory } from './DriverFactory';
import { LoggerFn } from './Logger';
import { PreAggTableToTempTableNames, QueryCache, QueryWithParams } from './QueryCache';
import { ContinueWaitError } from './ContinueWaitError';
import { LargeStreamWarning } from './StreamObjectsCounter';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
timeSeries,
localTimestampToUtc,
parseUtcIntoLocalDate,
LoggerFn,
} from '@cubejs-backend/shared';
import { InlineTable, TableStructure } from '@cubejs-backend/base-driver';
import { DriverFactory } from './DriverFactory';
Expand All @@ -28,6 +27,7 @@ import {
} from './PreAggregations';
import { PreAggregationLoader } from './PreAggregationLoader';
import { PreAggregationLoadCache } from './PreAggregationLoadCache';
import { LoggerFn } from './Logger';

const DEFAULT_TS_FORMAT = 'YYYY-MM-DDTHH:mm:ss.SSS';

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import R from 'ramda';
import crypto from 'crypto';
import { getEnv, LoggerFn } from '@cubejs-backend/shared';
import { getEnv } from '@cubejs-backend/shared';

import { BaseDriver, InlineTable, } from '@cubejs-backend/base-driver';
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
Expand All @@ -13,6 +13,7 @@ import { CacheAndQueryDriverType } from './QueryOrchestrator';
import { PreAggregationPartitionRangeLoader } from './PreAggregationPartitionRangeLoader';
import { PreAggregationLoader } from './PreAggregationLoader';
import { PreAggregationLoadCache } from './PreAggregationLoadCache';
import { LoggerFn } from './Logger';

/// Name of the inline table containing the lambda rows.
export const LAMBDA_TABLE_PREFIX = 'lambda';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
MaybeCancelablePromise,
streamToArray,
CacheMode,
LoggerFn,
} from '@cubejs-backend/shared';
import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
import {
Expand All @@ -23,6 +22,7 @@ import { QueryQueue, QueryQueueOptions } from './QueryQueue';
import { ContinueWaitError } from './ContinueWaitError';
import { LocalCacheDriver } from './LocalCacheDriver';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
import { LoggerFn } from './Logger';
import { LoadPreAggregationResult, PreAggregationDescription } from './PreAggregations';
import { getCacheHash } from './utils';
import { CacheAndQueryDriverType, MetadataOperationType } from './QueryOrchestrator';
Expand Down Expand Up @@ -428,7 +428,9 @@ export class QueryCache {
const [keyQuery, params, queryOptions] = Array.isArray(queryAndParams)
? queryAndParams
: [queryAndParams, []];
const replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce(
const sortedPreAggregationsTablesToTempTables = [...preAggregationsTablesToTempTables]
.sort((a, b) => b[0].length - a[0].length);
const replacedKeyQuery: string = sortedPreAggregationsTablesToTempTables.reduce(
(query, [tableName, { targetTableName }]) => QueryCache.replaceAll(tableName, targetTableName, query),
keyQuery
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as stream from 'stream';
import R from 'ramda';
import { CacheMode, getEnv, LoggerFn } from '@cubejs-backend/shared';
import { CacheMode, getEnv } from '@cubejs-backend/shared';
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
import {
QuerySchemasResult,
Expand All @@ -12,6 +12,7 @@ import {
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable, QueryWithParams, CacheKey } from './QueryCache';
import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
import { LoggerFn } from './Logger';
import { QueryStream } from './QueryStream';

export type CacheAndQueryDriverType = 'memory' | 'cubestore' | /** removed, used for exception */ 'redis';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EventEmitter } from 'events';
import { getEnv, getProcessUid, LoggerFn } from '@cubejs-backend/shared';
import { getEnv, getProcessUid } from '@cubejs-backend/shared';
import {
QueueDriverInterface,
QueryKey,
Expand All @@ -14,6 +14,7 @@ import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
import { TimeoutError } from './TimeoutError';
import { ContinueWaitError } from './ContinueWaitError';
import { LocalQueueDriver } from './LocalQueueDriver';
import { LoggerFn } from './Logger';
import { QueryStream } from './QueryStream';
import { CacheAndQueryDriverType } from './QueryOrchestrator';

Expand Down Expand Up @@ -192,11 +193,12 @@ export class QueryQueue {
queueId: this.generateQueueId(),
...executeOptions,
};

if (options.requestId) {
const idx = options.requestId.lastIndexOf('-span-');
options.externalId = idx !== -1 ? options.requestId.substring(0, idx) : options.requestId;
}
const externalId = options.requestId
? (() => {
const idx = options.requestId.lastIndexOf('-span-');
return idx !== -1 ? options.requestId.substring(0, idx) : options.requestId;
})()
: undefined;

if (this.skipQueue) {
const queryDef = {
Expand Down Expand Up @@ -238,7 +240,7 @@ export class QueryQueue {
// Result here won't be fetched for a forced build query and a jobbed build
// query (initialized by the /cubejs-system/v1/pre-aggregations/jobs
// endpoint).
let result = !query.forceBuild && await queueConnection.getResult(queryKey, options.externalId);
let result = !query.forceBuild && await (queueConnection as any).getResult(queryKey, externalId);
if (result && !result.streamResult) {
return this.parseResult(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,5 +430,19 @@ export const QueryCacheTest = (name: string, options: QueryCacheTestOptions) =>
// @ts-ignore
expect(key4.persistent).toEqual(false);
});

it('replacePreAggregationTableNames replaces usage aliases before the base table', () => {
const result = QueryCache.replacePreAggregationTableNames(
'SELECT * FROM prod_pre_aggregations.ra_client_m_abc__usage_0 JOIN prod_pre_aggregations.ra_client_m_abc ON true',
[
['ra_client_m_abc', { targetTableName: 'base_table' }],
['ra_client_m_abc__usage_0', { targetTableName: 'usage_table' }],
]
);

expect(result).toEqual(
'SELECT * FROM prod_pre_aggregations.usage_table JOIN prod_pre_aggregations.base_table ON true'
);
});
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ import crypto from 'crypto';

import type { QueryKey, QueueDriverInterface } from '@cubejs-backend/base-driver';
import { pausePromise } from '@cubejs-backend/shared';
import { CubestoreQueueDriverConnection } from '@cubejs-backend/cubestore-driver';

import { QueryQueue, QueryQueueOptions } from '../../src';
import { ContinueWaitError } from '../../src/orchestrator/ContinueWaitError';
import { processUidRE } from '../../src/orchestrator/utils';

type CubestoreQueueDriverConnectionLike = {
useExternalId(): Promise<boolean>;
};

export type QueryQueueTestOptions = Pick<QueryQueueOptions, 'cacheAndQueueDriver' | 'cubeStoreDriverFactory'> & {
beforeAll?: () => Promise<void>,
afterAll?: () => Promise<void>,
Expand Down Expand Up @@ -465,7 +468,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions) =>
test('useExternalId should return true', async () => {
const connection = await queue.queueDriver.createConnection();
try {
expect(await (connection as CubestoreQueueDriverConnection).useExternalId()).toBe(true);
expect(await (connection as unknown as CubestoreQueueDriverConnectionLike).useExternalId()).toBe(true);
} finally {
queue.queueDriver.release(connection);
}
Expand Down
4 changes: 3 additions & 1 deletion tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
"baseUrl": ".",
"paths": {
"@cubejs-backend/*": [
"rust/js-wrapper",
"packages/cubejs-backend-shared/src",
"packages/cubejs-base-driver/src",
"packages/cubejs-cubestore-driver/src",
"rust/js-wrapper",
"packages/cubejs-api-gateway/src",
"packages/cubejs-cli/src",
"packages/cubejs-query-orchestrator/src",
Comment on lines +16 to 22
Expand Down
Loading