Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions src/remote/query-optimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
private target?: Target;
private semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY);
private _finish = Promise.withResolvers<void>();
private _inflight: Promise<void> = Promise.resolve();

private _validQueriesProcessed = 0;
private _invalidQueries = 0;
Expand Down Expand Up @@ -158,6 +159,11 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
this._validQueriesProcessed = 0;
}

async drain(): Promise<void> {
this.stop();
await this._inflight;
}

async restart({ clearQueries } = { clearQueries: false }) {
this.semaphore = new Sema(QueryOptimizer.MAX_CONCURRENCY);
if (clearQueries) {
Expand Down Expand Up @@ -297,21 +303,24 @@ export class QueryOptimizer extends EventEmitter<EventMap> {
break;
}
this._validQueriesProcessed++;
const optimization = await this.optimizeQuery(
optimized,
this.target,
{ timeoutMs: this.calculateTimeoutRetryDelay(optimized) },
);
this.queriedSinceVacuum++;
if (this.queriedSinceVacuum > QueryOptimizer.vacuumThreshold) {
await this.vacuum();
this.queriedSinceVacuum = 0;
}

this.queries.set(
optimized.hash,
optimized.withOptimization(optimization),
);
// mark the latest inflight request so it can be awaited in drain()
this._inflight = (async () => {
const optimization = await this.optimizeQuery(
optimized,
this.target!,
{ timeoutMs: this.calculateTimeoutRetryDelay(optimized) },
);
this.queriedSinceVacuum++;
if (this.queriedSinceVacuum > QueryOptimizer.vacuumThreshold) {
await this.vacuum();
this.queriedSinceVacuum = 0;
}
this.queries.set(
optimized.hash,
optimized.withOptimization(optimization),
);
})();
await this._inflight;
}
} finally {
this.running = false;
Expand Down
3 changes: 2 additions & 1 deletion src/remote/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,10 @@ export class Remote extends EventEmitter<RemoteEvents> {
this.generation = nextGeneration;
this.optimizingDbUDRL = this.optimizingDbUDRL.withDatabaseName(nextDbName);
this.optimizer.updateConnectable(this.optimizingDbUDRL);
await this.optimizer.drain();
// these cannot be run in the same `exec` block as that implicitly creates transactions
await baseDb.exec(
`drop database if exists ${prevDbName} with (force);`,
`drop database if exists ${prevDbName};`,
);
}

Expand Down
Loading