From 51734b009d15228a1beb1c162aa49bdb8002d2af Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 20 Aug 2021 11:13:58 +0200 Subject: [PATCH 1/3] Fetch tuples in small batches in adaptive executor where possible --- .../distributed/executor/adaptive_executor.c | 175 ++++++++++++++---- .../distributed/executor/citus_custom_scan.c | 21 ++- src/include/distributed/adaptive_executor.h | 6 + src/include/distributed/citus_custom_scan.h | 6 + src/include/distributed/multi_executor.h | 2 - 5 files changed, 166 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 83e561c9376..738dc9310b4 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -274,7 +274,7 @@ typedef struct DistributedExecution bool raiseInterrupts; /* transactional properties of the current execution */ - TransactionProperties *transactionProperties; + TransactionProperties transactionProperties; /* indicates whether distributed execution has failed */ bool failed; @@ -290,6 +290,13 @@ typedef struct DistributedExecution */ uint64 rowsProcessed; + /* + * RunDistributedExecution can be called multiple time to perform partial + * execution. In that case, rowsReceivedInCurrentRun contains the number + * of rows received. + */ + uint64 rowsReceivedInCurrentRun; + /* * The following fields are used while receiving results from remote nodes. * We store this information here to avoid re-allocating it every time. @@ -315,6 +322,11 @@ typedef struct DistributedExecution * fail, such as CREATE INDEX CONCURRENTLY. */ bool localExecutionSupported; + + /* + * Memory context for the execution. + */ + MemoryContext memoryContext; } DistributedExecution; @@ -638,7 +650,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel int targetPoolSize, TupleDestination * defaultTupleDest, - TransactionProperties * + TransactionProperties xactProperties, List *jobIdList, bool localExecutionSupported); @@ -649,7 +661,7 @@ static TransactionProperties DecideTaskListTransactionProperties(RowModifyLevel excludeFromTransaction); static void StartDistributedExecution(DistributedExecution *execution); static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution); -static void RunDistributedExecution(DistributedExecution *execution); +static void RunDistributedExecution(DistributedExecution *execution, bool toCompletion); static void SequentialRunDistributedExecution(DistributedExecution *execution); static void FinishDistributedExecution(DistributedExecution *execution); static void CleanUpSessions(DistributedExecution *execution); @@ -771,11 +783,9 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) * first call of CitusExecScan. The function fills the tupleStore * of the input scanScate. */ -TupleTableSlot * -AdaptiveExecutor(CitusScanState *scanState) +void +AdaptiveExecutorStart(CitusScanState *scanState) { - TupleTableSlot *resultSlot = NULL; - DistributedPlan *distributedPlan = scanState->distributedPlan; EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; @@ -790,14 +800,10 @@ AdaptiveExecutor(CitusScanState *scanState) /* we should only call this once before the scan finished */ Assert(!scanState->finishedRemoteScan); - MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, - "AdaptiveExecutor", - ALLOCSET_DEFAULT_SIZES); - MemoryContext oldContext = MemoryContextSwitchTo(localContext); - - - /* Reset Task fields that are only valid for a single execution */ - ResetExplainAnalyzeData(taskList); + MemoryContext memoryContext = AllocSetContextCreate(executorState->es_query_cxt, + "AdaptiveExecutor", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(memoryContext); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); @@ -808,6 +814,9 @@ AdaptiveExecutor(CitusScanState *scanState) bool localExecutionSupported = true; + /* Reset Task fields that are only valid for a single execution */ + ResetExplainAnalyzeData(taskList); + if (RequestedForExplainAnalyze(scanState)) { /* @@ -863,7 +872,7 @@ AdaptiveExecutor(CitusScanState *scanState) paramListInfo, targetPoolSize, defaultTupleDest, - &xactProperties, + xactProperties, jobIdList, localExecutionSupported); @@ -873,13 +882,53 @@ AdaptiveExecutor(CitusScanState *scanState) */ StartDistributedExecution(execution); + /* store the execution in the custom scan state */ + scanState->execution = execution; + + execution->memoryContext = MemoryContextSwitchTo(oldContext); +} + + +bool +AdaptiveExecutorRun(CitusScanState *scanState) +{ + DistributedExecution *execution = scanState->execution; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *job = distributedPlan->workerJob; + CmdType commandType = job->jobQuery->commandType; + + Assert(execution != NULL); + + MemoryContext oldContext = MemoryContextSwitchTo(execution->memoryContext); + + EState *executorState = ScanStateGetExecutorState(scanState); + bool sortTupleStore = false; + + if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT) + { + /* sort the tuple store to get consistent DML output in tests */ + sortTupleStore = true; + } + if (ShouldRunTasksSequentially(execution->remoteTaskList)) { + /* sequential execution always runs to completion */ SequentialRunDistributedExecution(execution); } else { - RunDistributedExecution(execution); + /* if we need to sort the whole tuple store, run to completino */ + bool runToCompletion = sortTupleStore; + + RunDistributedExecution(execution, runToCompletion); + + if (execution->unfinishedTaskCount > 0) + { + MemoryContextSwitchTo(oldContext); + return false; + } + + /* done with remote tasks, finish the execution */ } /* execute tasks local to the node (if any) */ @@ -889,7 +938,6 @@ AdaptiveExecutor(CitusScanState *scanState) RunLocalExecution(scanState, execution); } - CmdType commandType = job->jobQuery->commandType; if (commandType != CMD_SELECT) { executorState->es_processed = execution->rowsProcessed; @@ -897,14 +945,14 @@ AdaptiveExecutor(CitusScanState *scanState) FinishDistributedExecution(execution); - if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT) + if (sortTupleStore) { SortTupleStore(scanState); } MemoryContextSwitchTo(oldContext); - return resultSlot; + return true; } @@ -1104,7 +1152,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) CreateDistributedExecution( executionParams->modLevel, executionParams->taskList, executionParams->paramListInfo, executionParams->targetPoolSize, - defaultTupleDest, &executionParams->xactProperties, + defaultTupleDest, executionParams->xactProperties, executionParams->jobIdList, executionParams->localExecutionSupported); /* @@ -1117,7 +1165,10 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) /* run the remote execution */ StartDistributedExecution(execution); - RunDistributedExecution(execution); + + bool runToCompletion = true; + RunDistributedExecution(execution, runToCompletion); + FinishDistributedExecution(execution); /* now, switch back to the local execution */ @@ -1169,7 +1220,7 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, ParamListInfo paramListInfo, int targetPoolSize, TupleDestination *defaultTupleDest, - TransactionProperties *xactProperties, + TransactionProperties xactProperties, List *jobIdList, bool localExecutionSupported) { DistributedExecution *execution = @@ -1334,7 +1385,7 @@ DecideTaskListTransactionProperties(RowModifyLevel modLevel, List *taskList, boo void StartDistributedExecution(DistributedExecution *execution) { - TransactionProperties *xactProperties = execution->transactionProperties; + TransactionProperties *xactProperties = &(execution->transactionProperties); if (xactProperties->useRemoteTransactionBlocks == TRANSACTION_BLOCKS_REQUIRED) { @@ -1386,6 +1437,20 @@ StartDistributedExecution(DistributedExecution *execution) bool isRemote = true; EnsureTaskExecutionAllowed(isRemote); } + + /* + * We skip AssignTasksToConnectionsOrWorkerPool for sequential executions, + * because we do it separately for each task in SequentialRunDistributedExecution. + */ + if (!ShouldRunTasksSequentially(execution->remoteTaskList)) + { + /* + * If a (co-located) shard placement was accessed over a session earier in the + * transaction, assign the task to the same session. Otherwise, assign it to + * the general worker pool(s). + */ + AssignTasksToConnectionsOrWorkerPool(execution); + } } @@ -1408,6 +1473,14 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution) static void FinishDistributedExecution(DistributedExecution *execution) { + /* + * Sequential executions unclaim connections separately. + */ + if (!ShouldRunTasksSequentially(execution->remoteTaskList)) + { + CleanUpSessions(execution); + } + if (DistributedExecutionModifiesDatabase(execution)) { /* prevent copying shards in same transaction */ @@ -1493,7 +1566,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); MultiConnection *connection = NULL; - if (execution->transactionProperties->useRemoteTransactionBlocks != + if (execution->transactionProperties.useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) { /* @@ -1877,8 +1950,20 @@ SequentialRunDistributedExecution(DistributedExecution *execution) break; } + /* + * We skipped AssignTasksToConnectionsOrWorkerPool in StartDistributedExecution + * when all the tasks were in the execution. Do it now instead. + */ + AssignTasksToConnectionsOrWorkerPool(execution); + /* simply call the regular execution function */ - RunDistributedExecution(execution); + bool runToCompletion = true; + RunDistributedExecution(execution, runToCompletion); + + /* + * Unclaim connections since the current execution is technically finished. + */ + CleanUpSessions(execution); } /* set back the original execution mode */ @@ -1895,9 +1980,9 @@ SequentialRunDistributedExecution(DistributedExecution *execution) * has an event. */ void -RunDistributedExecution(DistributedExecution *execution) +RunDistributedExecution(DistributedExecution *execution, bool toCompletion) { - AssignTasksToConnectionsOrWorkerPool(execution); + WaitEvent *events = NULL; PG_TRY(); { @@ -1912,6 +1997,10 @@ RunDistributedExecution(DistributedExecution *execution) /* always (re)build the wait event set the first time */ execution->rebuildWaitEventSet = true; + execution->rowsReceivedInCurrentRun = 0; + + /* TODO: GUC? be smart? */ + int maxBatchSize = 10000; /* * Iterate until all the tasks are finished. Once all the tasks @@ -1931,8 +2020,10 @@ RunDistributedExecution(DistributedExecution *execution) * irrespective of the current status of the tasks or the connections. */ while (!cancellationReceived && - (execution->unfinishedTaskCount > 0 || - HasIncompleteConnectionEstablishment(execution))) + ((execution->unfinishedTaskCount > 0 || + HasIncompleteConnectionEstablishment(execution)) && + (toCompletion || + execution->rowsReceivedInCurrentRun < maxBatchSize))) { WorkerPool *workerPool = NULL; foreach_declared_ptr(workerPool, execution->workerList) @@ -1986,9 +2077,16 @@ RunDistributedExecution(DistributedExecution *execution) &cancellationReceived); } - FreeExecutionWaitEvents(execution); + if (events != NULL) + { + pfree(events); + } - CleanUpSessions(execution); + if (execution->waitEventSet != NULL) + { + FreeWaitEventSet(execution->waitEventSet); + execution->waitEventSet = NULL; + } } PG_CATCH(); { @@ -2243,7 +2341,7 @@ ManageWorkerPool(WorkerPool *workerPool) /* increase the open rate every cycle (like TCP slow start) */ workerPool->maxNewConnectionsPerCycle += 1; - OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties); + OpenNewConnections(workerPool, newConnectionCount, &execution->transactionProperties); /* * Cannot establish new connections to the local host, most probably because the @@ -2773,7 +2871,7 @@ CheckConnectionTimeout(WorkerPool *workerPool) */ logLevel = DEBUG1; } - else if (execution->transactionProperties->errorOnAnyFailure || + else if (execution->transactionProperties.errorOnAnyFailure || execution->failed) { /* @@ -3189,7 +3287,7 @@ ConnectionStateMachine(WorkerSession *session) if (transaction->transactionCritical || execution->failed || - (execution->transactionProperties->errorOnAnyFailure && + (execution->transactionProperties.errorOnAnyFailure && workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL)) { /* a task has failed due to this connection failure */ @@ -3381,7 +3479,7 @@ TransactionModifiedDistributedTable(DistributedExecution *execution) * should not be pretending that we're in a coordinated transaction even * if XACT_MODIFICATION_DATA is set. That's why we implemented this workaround. */ - return execution->transactionProperties->useRemoteTransactionBlocks == + return execution->transactionProperties.useRemoteTransactionBlocks == TRANSACTION_BLOCKS_REQUIRED && XactModificationLevel == XACT_MODIFICATION_DATA; } @@ -3396,7 +3494,7 @@ TransactionStateMachine(WorkerSession *session) WorkerPool *workerPool = session->workerPool; DistributedExecution *execution = workerPool->distributedExecution; TransactionBlocksUsage useRemoteTransactionBlocks = - execution->transactionProperties->useRemoteTransactionBlocks; + execution->transactionProperties.useRemoteTransactionBlocks; MultiConnection *connection = session->connection; RemoteTransaction *transaction = &(connection->remoteTransaction); @@ -3823,7 +3921,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, ShardPlacement *taskPlacement = placementExecution->shardPlacement; List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); - if (execution->transactionProperties->useRemoteTransactionBlocks != + if (execution->transactionProperties.useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) { /* @@ -4177,6 +4275,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) MemoryContextReset(rowContext); execution->rowsProcessed++; + execution->rowsReceivedInCurrentRun++; } PQclear(result); diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 95df92f94fa..b363da1779d 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -25,6 +25,7 @@ #include "pg_version_constants.h" +#include "distributed/adaptive_executor.h" #include "distributed/backend_data.h" #include "distributed/citus_clauses.h" #include "distributed/citus_custom_scan.h" @@ -264,11 +265,11 @@ CitusExecScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; - if (!scanState->finishedRemoteScan) + if (!scanState->executionStarted) { bool isMultiTaskPlan = IsMultiTaskPlan(scanState->distributedPlan); - AdaptiveExecutor(scanState); + AdaptiveExecutorStart(scanState); if (isMultiTaskPlan) { @@ -279,10 +280,21 @@ CitusExecScan(CustomScanState *node) IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); } - scanState->finishedRemoteScan = true; + scanState->executionStarted = true; } - return ReturnTupleFromTuplestore(scanState); + TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState); + if (TupIsNull(resultSlot) && !scanState->finishedRemoteScan) + { + /* clear the tuple store for the next batch */ + tuplestore_clear(scanState->tuplestorestate); + + scanState->finishedRemoteScan = AdaptiveExecutorRun(scanState); + + resultSlot = ReturnTupleFromTuplestore(scanState); + } + + return resultSlot; } @@ -720,6 +732,7 @@ AdaptiveExecutorCreateScan(CustomScan *scan) scanState->finishedPreScan = false; scanState->finishedRemoteScan = false; + scanState->executionStarted = false; return (Node *) scanState; } diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 7198858a0eb..9b5ef56823b 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -1,8 +1,10 @@ #ifndef ADAPTIVE_EXECUTOR_H #define ADAPTIVE_EXECUTOR_H +#include "distributed/citus_custom_scan.h" #include "distributed/multi_physical_planner.h" + /* GUC, determining whether Citus opens 1 connection per task */ extern bool ForceMaxQueryParallelization; extern int MaxAdaptiveExecutorPoolSize; @@ -14,6 +16,10 @@ extern int ExecutorSlowStartInterval; extern bool EnableCostBasedConnectionEstablishment; extern bool PreventIncompleteConnectionEstablishment; +extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState); +extern void AdaptiveExecutorStart(CitusScanState *scanState); +extern bool AdaptiveExecutorRun(CitusScanState *scanState); +extern bool ShouldRunTasksSequentially(List *taskList); extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize, diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index db1f0ce1f2a..6053522feed 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -16,6 +16,8 @@ #include "distributed/distributed_planner.h" #include "distributed/multi_server_executor.h" +struct DistributedExecution; + typedef struct CitusScanState { CustomScanState customScanState; /* underlying custom scan node */ @@ -27,7 +29,11 @@ typedef struct CitusScanState DistributedPlan *distributedPlan; /* distributed execution plan */ MultiExecutorType executorType; /* distributed executor type */ bool finishedRemoteScan; /* flag to check if remote scan is finished */ + bool executionStarted; /* flag to check whether execution started */ Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ + + /* execution state when using adaptive executor */ + struct DistributedExecution *execution; } CitusScanState; diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index b0b0288de87..4cedb9a6cd3 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -76,8 +76,6 @@ extern int ExecutorLevel; extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); -extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState); -extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState); /* From eb1bc1d4578add4593135e686e7ec75d1bb4a0df Mon Sep 17 00:00:00 2001 From: Colm McHugh Date: Tue, 7 Apr 2026 17:04:55 +0000 Subject: [PATCH 2/3] Update batched adaptive executor: cleanup, batch sizing, LibPQ chunked execution, tests. - Resource clean-up: AdaptiveExecutorEnd() releases sessions/connections when an error occurs between AdaptiveExecutorRun calls. Also handle early termination (cursor close, LIMIT satisfied) with proper clean-up of in-flight worker queries. - ShouldRunTasksSequentially() check in FinishDistributedExecution() replaced with explicit sessionsCleanedUp flag on DistributedExecution struct. Fixes double CleanUpSessions on sequential path. - Adaptive batch sizing via citus.executor_batch_size (default 0 => auto). Auto mode calculates batch size from work_mem and TupleDesc (attlen + typmod for varlena, 128B default for unbounded). Floor 100, ceiling 1M rows. - Remote execution uses LibPQ's chunked mode (PG17+), GUC configurable for now. - Local execution is eager; it runs to completion. - Regress test suite: 11 test cases covering batch sizes 1/10/100K/auto, empty results, LIMIT, aggregation, DML RETURNING, GUC behavior, between-batch error cleanup, cursor close mid-batch and cross-batch-size result consistency. --- .../distributed/executor/adaptive_executor.c | 183 +++++++++-- .../distributed/executor/citus_custom_scan.c | 7 + src/backend/distributed/shared_library_init.c | 25 ++ src/include/distributed/adaptive_executor.h | 6 + src/test/regress/citus_tests/run_test.py | 3 + .../expected/adaptive_executor_batching.out | 300 ++++++++++++++++++ src/test/regress/multi_schedule | 2 +- .../sql/adaptive_executor_batching.sql | 193 +++++++++++ 8 files changed, 700 insertions(+), 19 deletions(-) create mode 100644 src/test/regress/expected/adaptive_executor_batching.out create mode 100644 src/test/regress/sql/adaptive_executor_batching.sql diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 738dc9310b4..f4d0e30f977 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -323,6 +323,25 @@ typedef struct DistributedExecution */ bool localExecutionSupported; + /* + * Flag to track whether sessions have already been cleaned up. + * Used to avoid double-cleanup in FinishDistributedExecution when + * SequentialRunDistributedExecution already cleaned up per-task. + */ + bool sessionsCleanedUp; + + /* + * Flag to track whether local tasks have been executed. Local tasks + * are executed after all remote batches complete. + */ + bool localTasksExecuted; + + /* + * Maximum number of rows to fetch per batch in RunDistributedExecution. + * Computed from work_mem and TupleDesc, or set via GUC override. + */ + int maxBatchSize; + /* * Memory context for the execution. */ @@ -515,6 +534,12 @@ int ExecutorSlowStartInterval = 10; bool EnableCostBasedConnectionEstablishment = true; bool PreventIncompleteConnectionEstablishment = true; +/* GUC, number of rows per batch (0 = auto from work_mem) */ +int ExecutorBatchSize = 0; + +/* GUC, libpq chunk size in bytes for chunked row mode (PG17+) */ +int ExecutorChunkSize = 8192; + /* * TaskExecutionState indicates whether or not a command on a shard @@ -665,6 +690,7 @@ static void RunDistributedExecution(DistributedExecution *execution, bool toComp static void SequentialRunDistributedExecution(DistributedExecution *execution); static void FinishDistributedExecution(DistributedExecution *execution); static void CleanUpSessions(DistributedExecution *execution); +static int CalculateMaxBatchSize(TupleDesc tupleDescriptor); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution); @@ -779,9 +805,9 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) /* - * AdaptiveExecutor is called via CitusExecScan on the - * first call of CitusExecScan. The function fills the tupleStore - * of the input scanScate. + * AdaptiveExecutorStart is called via CitusExecScan on the first call. + * It initializes the distributed execution state, including the tuplestore, + * connections, and batch size. Actual execution happens in AdaptiveExecutorRun. */ void AdaptiveExecutorStart(CitusScanState *scanState) @@ -876,6 +902,9 @@ AdaptiveExecutorStart(CitusScanState *scanState) jobIdList, localExecutionSupported); + /* compute batch size from GUC or work_mem */ + execution->maxBatchSize = CalculateMaxBatchSize(tupleDescriptor); + /* * Make sure that we acquire the appropriate locks even if the local tasks * are going to be executed with local execution. @@ -917,7 +946,7 @@ AdaptiveExecutorRun(CitusScanState *scanState) } else { - /* if we need to sort the whole tuple store, run to completino */ + /* if we need to sort the whole tuple store, run to completion */ bool runToCompletion = sortTupleStore; RunDistributedExecution(execution, runToCompletion); @@ -931,11 +960,11 @@ AdaptiveExecutorRun(CitusScanState *scanState) /* done with remote tasks, finish the execution */ } - /* execute tasks local to the node (if any) */ - if (list_length(execution->localTaskList) > 0) + /* execute local tasks after remote execution completes */ + if (list_length(execution->localTaskList) > 0 && !execution->localTasksExecuted) { - /* now execute the local tasks */ RunLocalExecution(scanState, execution); + execution->localTasksExecuted = true; } if (commandType != CMD_SELECT) @@ -956,6 +985,117 @@ AdaptiveExecutorRun(CitusScanState *scanState) } +/* + * AdaptiveExecutorEnd performs cleanup of a distributed execution that + * may still be in progress. This is called from CitusEndScan to handle + * the case where execution is aborted between batches (e.g., cursor + * closed early, LIMIT satisfied, or error between batches). + * + * Worker connections may still have in-progress queries from single-row + * or chunked mode. We must cancel those queries and drain pending results + * before unclaiming the connections, otherwise the subsequent COMMIT will + * fail with "another command is already in progress". + * + * In the normal case (finishedRemoteScan == true), this is a no-op because + * FinishDistributedExecution already cleaned up in AdaptiveExecutorRun. + */ +void +AdaptiveExecutorEnd(CitusScanState *scanState) +{ + DistributedExecution *execution = scanState->execution; + if (execution == NULL) + { + return; + } + + if (scanState->finishedRemoteScan) + { + return; + } + + /* + * Cancel in-progress queries and drain results on all session connections. + * Without this, connections in single-row/chunked mode still have pending + * results, and any subsequent command (like COMMIT) would fail. + */ + WorkerSession *session = NULL; + foreach_declared_ptr(session, execution->sessionList) + { + MultiConnection *connection = session->connection; + + if (connection->pgConn == NULL) + { + continue; + } + + if (PQstatus(connection->pgConn) == CONNECTION_OK && + PQtransactionStatus(connection->pgConn) == PQTRANS_ACTIVE) + { + SendCancelationRequest(connection); + } + + ClearResultsDiscardWarnings(connection, false); + UnclaimConnection(connection); + } + + FreeExecutionWaitEvents(execution); +} + + +/* + * CalculateMaxBatchSize computes the number of rows per batch based on + * the GUC citus.executor_batch_size and work_mem. + * + * If ExecutorBatchSize > 0, use it directly. Otherwise, estimate the + * tuple size from the TupleDesc and derive a batch size from work_mem. + */ +static int +CalculateMaxBatchSize(TupleDesc tupleDescriptor) +{ + if (ExecutorBatchSize > 0) + { + return ExecutorBatchSize; + } + + int natts = tupleDescriptor->natts; + Size estimatedTupleSize = 0; + + for (int i = 0; i < natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupleDescriptor, i); + if (attr->attlen > 0) + { + estimatedTupleSize += attr->attlen; + } + else if (attr->atttypmod > 0) + { + /* for varchar(N), typmod encodes max length */ + estimatedTupleSize += attr->atttypmod; + } + else + { + /* default estimate for unbounded varlena */ + estimatedTupleSize += 128; + } + } + + /* add per-tuple overhead: header + null bitmap + alignment */ + estimatedTupleSize += MAXALIGN(SizeofHeapTupleHeader + + (natts > 0 ? BITMAPLEN(natts) : 0)); + + estimatedTupleSize = Max(estimatedTupleSize, 64); + + /* work_mem is in KB */ + Size workMemBytes = (Size) work_mem * 1024L; + int batchSize = (int) (workMemBytes / estimatedTupleSize); + + batchSize = Max(batchSize, 100); + batchSize = Min(batchSize, 1000000); + + return batchSize; +} + + /* * RunLocalExecution runs the localTaskList in the execution, fills the tuplestore * and sets the es_processed if necessary. @@ -1445,7 +1585,7 @@ StartDistributedExecution(DistributedExecution *execution) if (!ShouldRunTasksSequentially(execution->remoteTaskList)) { /* - * If a (co-located) shard placement was accessed over a session earier in the + * If a (co-located) shard placement was accessed over a session earlier in the * transaction, assign the task to the same session. Otherwise, assign it to * the general worker pool(s). */ @@ -1473,10 +1613,7 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution) static void FinishDistributedExecution(DistributedExecution *execution) { - /* - * Sequential executions unclaim connections separately. - */ - if (!ShouldRunTasksSequentially(execution->remoteTaskList)) + if (!execution->sessionsCleanedUp) { CleanUpSessions(execution); } @@ -1979,7 +2116,7 @@ SequentialRunDistributedExecution(DistributedExecution *execution) * any of the connections and runs the connection state machine when a connection * has an event. */ -void +static void RunDistributedExecution(DistributedExecution *execution, bool toCompletion) { WaitEvent *events = NULL; @@ -1999,8 +2136,8 @@ RunDistributedExecution(DistributedExecution *execution, bool toCompletion) execution->rebuildWaitEventSet = true; execution->rowsReceivedInCurrentRun = 0; - /* TODO: GUC? be smart? */ - int maxBatchSize = 10000; + int maxBatchSize = execution->maxBatchSize > 0 ? + execution->maxBatchSize : 10000; /* * Iterate until all the tasks are finished. Once all the tasks @@ -4042,8 +4179,12 @@ SendNextQuery(TaskPlacementExecution *placementExecution, return false; } - int singleRowMode = PQsetSingleRowMode(connection->pgConn); - if (singleRowMode == 0) +#ifdef LIBPQ_HAS_CHUNK_MODE + int rowMode = PQsetChunkedRowsMode(connection->pgConn, ExecutorChunkSize); +#else + int rowMode = PQsetSingleRowMode(connection->pgConn); +#endif + if (rowMode == 0) { connection->connectionState = MULTI_CONNECTION_LOST; return false; @@ -4130,7 +4271,11 @@ ReceiveResults(WorkerSession *session, bool storeRows) placementExecution->queryIndex++; continue; } - else if (resultStatus != PGRES_SINGLE_TUPLE) + else if (resultStatus != PGRES_SINGLE_TUPLE +#ifdef LIBPQ_HAS_CHUNK_MODE + && resultStatus != PGRES_TUPLES_CHUNK +#endif + ) { /* query failures are always hard errors */ ReportResultError(connection, result, ERROR); @@ -5052,6 +5197,8 @@ CleanUpSessions(DistributedExecution *execution) "execution: %d", connection->connectionState))); } } + + execution->sessionsCleanedUp = true; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index b363da1779d..c1e404b218b 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -850,6 +850,13 @@ CitusEndScan(CustomScanState *node) tuplestore_end(scanState->tuplestorestate); scanState->tuplestorestate = NULL; } + + /* + * Clean up any in-flight distributed execution. This handles the case + * where an error occurs between batches in the adaptive executor, + * ensuring sessions and connections are properly released. + */ + AdaptiveExecutorEnd(scanState); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 08ca5fa5ce6..54f5c27c435 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1696,6 +1696,31 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.executor_batch_size", + gettext_noop("Maximum number of rows per batch in the adaptive executor."), + gettext_noop("When set to 0 (the default), the batch size is automatically " + "calculated from work_mem and the estimated tuple size. A positive " + "value overrides the automatic calculation with a fixed row count."), + &ExecutorBatchSize, + 0, 0, INT_MAX, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + + DefineCustomIntVariable( + "citus.executor_chunk_size", + gettext_noop("Chunk size in bytes for libpq chunked row mode."), + gettext_noop("Controls the chunk size passed to PQsetChunkedRowsMode when " + "fetching rows from workers. Larger values reduce per-result " + "overhead but increase memory usage per fetch. Only effective " + "on PostgreSQL 17 and later."), + &ExecutorChunkSize, + 8192, 1, INT_MAX, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.executor_slow_start_interval", gettext_noop("Time to wait between opening connections to the same worker node"), diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index 9b5ef56823b..a53df000ff4 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -10,6 +10,11 @@ extern bool ForceMaxQueryParallelization; extern int MaxAdaptiveExecutorPoolSize; extern bool EnableBinaryProtocol; +/* GUC, number of rows per batch (0 = auto from work_mem) */ +extern int ExecutorBatchSize; + +/* GUC, libpq chunk size in bytes for chunked row mode (PG17+) */ +extern int ExecutorChunkSize; /* GUC, number of ms to wait between opening connections to the same worker */ extern int ExecutorSlowStartInterval; @@ -19,6 +24,7 @@ extern bool PreventIncompleteConnectionEstablishment; extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState); extern void AdaptiveExecutorStart(CitusScanState *scanState); extern bool AdaptiveExecutorRun(CitusScanState *scanState); +extern void AdaptiveExecutorEnd(CitusScanState *scanState); extern bool ShouldRunTasksSequentially(List *taskList); extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList); extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported); diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index d38dce9a8ef..618e70b5954 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -261,6 +261,9 @@ def extra_tests(self): "multi_subquery_in_where_reference_clause": TestDeps( "minimal_schedule", ["multi_behavioral_analytics_create_table"] ), + "adaptive_executor_batching": TestDeps( + "minimal_schedule", ["multi_behavioral_analytics_create_table"] + ), "subquery_in_where": TestDeps( "minimal_schedule", ["multi_behavioral_analytics_create_table"] ), diff --git a/src/test/regress/expected/adaptive_executor_batching.out b/src/test/regress/expected/adaptive_executor_batching.out new file mode 100644 index 00000000000..224251905e1 --- /dev/null +++ b/src/test/regress/expected/adaptive_executor_batching.out @@ -0,0 +1,300 @@ +-- +-- Tests for the batched adaptive executor (reentrant execution) +-- +CREATE SCHEMA adaptive_executor_batching; +SET search_path TO adaptive_executor_batching; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 802009000; +CREATE TABLE batch_test (x int, y text); +SELECT create_distributed_table('batch_test', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert enough rows to exercise multiple batches +INSERT INTO batch_test SELECT i, 'row_' || i FROM generate_series(1, 200) i; +-- Verify baseline: all rows present +SELECT count(*) FROM batch_test; + count +--------------------------------------------------------------------- + 200 +(1 row) + +-- +-- Test 1: Force small batch size and verify all rows are returned +-- +SET citus.executor_batch_size TO 10; +SELECT count(*) FROM batch_test; + count +--------------------------------------------------------------------- + 200 +(1 row) + +-- Ordered output to verify correctness across batches +SELECT x FROM batch_test WHERE x <= 20 ORDER BY x; + x +--------------------------------------------------------------------- + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 +(20 rows) + +-- +-- Test 2: Batch size of 1 (extreme case) +-- +SET citus.executor_batch_size TO 1; +SELECT count(*) FROM batch_test; + count +--------------------------------------------------------------------- + 200 +(1 row) + +SELECT x FROM batch_test WHERE x <= 5 ORDER BY x; + x +--------------------------------------------------------------------- + 1 + 2 + 3 + 4 + 5 +(5 rows) + +-- +-- Test 3: Batch size larger than result set (single batch) +-- +SET citus.executor_batch_size TO 100000; +SELECT count(*) FROM batch_test; + count +--------------------------------------------------------------------- + 200 +(1 row) + +-- +-- Test 4: Auto batch size (from work_mem) +-- +SET citus.executor_batch_size TO 0; +SELECT count(*) FROM batch_test; + count +--------------------------------------------------------------------- + 200 +(1 row) + +-- +-- Test 5: Empty result set with batching +-- +SET citus.executor_batch_size TO 10; +SELECT count(*) FROM batch_test WHERE x > 9999; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- +-- Test 6: LIMIT with batching +-- +SELECT x FROM batch_test ORDER BY x LIMIT 5; + x +--------------------------------------------------------------------- + 1 + 2 + 3 + 4 + 5 +(5 rows) + +-- +-- Test 7: Aggregation across batches +-- +SELECT sum(x), min(x), max(x) FROM batch_test; + sum | min | max +--------------------------------------------------------------------- + 20100 | 1 | 200 +(1 row) + +-- +-- Test 8: DML with RETURNING across batches +-- +SET citus.executor_batch_size TO 10; +-- Use a separate table for DML test +CREATE TABLE batch_dml_test (x int, y text); +SELECT create_distributed_table('batch_dml_test', 'x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO batch_dml_test SELECT i, 'val_' || i FROM generate_series(1, 50) i; +WITH deleted AS ( + DELETE FROM batch_dml_test WHERE x <= 20 RETURNING x +) +SELECT count(*) FROM deleted; + count +--------------------------------------------------------------------- + 20 +(1 row) + +SELECT count(*) FROM batch_dml_test WHERE x <= 20; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- +-- Test 9: Verify GUC is session-level +-- +SHOW citus.executor_batch_size; + citus.executor_batch_size +--------------------------------------------------------------------- + 10 +(1 row) + +SET citus.executor_batch_size TO 500; +SHOW citus.executor_batch_size; + citus.executor_batch_size +--------------------------------------------------------------------- + 500 +(1 row) + +RESET citus.executor_batch_size; +SHOW citus.executor_batch_size; + citus.executor_batch_size +--------------------------------------------------------------------- + 0 +(1 row) + +-- +-- Test 10: M1 — Between-batch error cleanup (CitusEndScan must release sessions) +-- +-- This validates that when an error occurs on the coordinator between +-- AdaptiveExecutorRun calls (mid-scan, mid-batch), CitusEndScan properly +-- cleans up distributed execution sessions via AdaptiveExecutorEnd. +-- Without the M1 fix, sessions/connections would leak on each error. +-- +SET citus.executor_batch_size TO 10; +-- Helper: iterates a distributed query and raises error after N rows. +-- With batch_size=10 and error at row 15, the error fires during +-- the second batch — between the first AdaptiveExecutorRun (which +-- returned control to the executor) and the scan completing. +CREATE OR REPLACE FUNCTION trigger_between_batch_error(fail_after int) +RETURNS void AS $$ +DECLARE + r RECORD; + cnt int := 0; +BEGIN + FOR r IN SELECT x FROM batch_test ORDER BY x LOOP + cnt := cnt + 1; + IF cnt >= fail_after THEN + RAISE EXCEPTION 'intentional error at row %', cnt; + END IF; + END LOOP; +END; +$$ LANGUAGE plpgsql; +-- Single invocation: error should be caught cleanly +DO $$ +BEGIN + PERFORM trigger_between_batch_error(15); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'caught: %', SQLERRM; +END; +$$; +NOTICE: caught: intentional error at row 15 +CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE +-- Repeat 20 times to verify connections don't accumulate across errors. +-- If CitusEndScan didn't clean up, we'd exhaust the connection pool. +DO $$ +BEGIN + FOR i IN 1..20 LOOP + BEGIN + PERFORM trigger_between_batch_error(15); + EXCEPTION WHEN OTHERS THEN + -- expected, swallow + END; + END LOOP; +END; +$$; +-- After 20 error cycles, distributed queries must still work +SELECT count(*) FROM batch_test; + count +--------------------------------------------------------------------- + 200 +(1 row) + +-- Also verify with a cursor closed mid-batch (non-error path through +-- CitusEndScan with finishedRemoteScan = false) +BEGIN; +DECLARE c CURSOR FOR SELECT x FROM batch_test ORDER BY x; +FETCH 5 FROM c; + x +--------------------------------------------------------------------- + 1 + 2 + 3 + 4 + 5 +(5 rows) + +CLOSE c; +COMMIT; +-- Still healthy after cursor close mid-batch +SELECT count(*) FROM batch_test; + count +--------------------------------------------------------------------- + 200 +(1 row) + +-- +-- Test 11: Cross-batch-size result consistency +-- +-- Runs the same analytical queries at batch sizes 1, 10, 100, 10000, and 0 +-- (auto) and asserts identical results. Any batch-boundary bug would cause +-- a mismatch. +-- +CREATE TABLE batch_consistency AS +SELECT + bs, + (SELECT count(*) FROM batch_test)::bigint AS cnt, + (SELECT sum(x) FROM batch_test)::bigint AS s, + (SELECT min(x) FROM batch_test)::int AS mn, + (SELECT max(x) FROM batch_test)::int AS mx, + (SELECT count(*) FROM batch_test WHERE x BETWEEN 50 AND 150)::bigint AS range_cnt +FROM (SELECT unnest(ARRAY[1, 10, 100, 10000]) bs) sizes, +LATERAL (SELECT set_config('citus.executor_batch_size', bs::text, false)) AS apply; +-- All batch sizes must produce the same result — collapse to 1 distinct row +SELECT count(DISTINCT (cnt, s, mn, mx, range_cnt)) AS distinct_results FROM batch_consistency; + distinct_results +--------------------------------------------------------------------- + 1 +(1 row) + +-- Show the actual values (should be a single row) +SELECT DISTINCT cnt, s, mn, mx, range_cnt FROM batch_consistency; + cnt | s | mn | mx | range_cnt +--------------------------------------------------------------------- + 200 | 20100 | 1 | 200 | 101 +(1 row) + +DROP TABLE batch_consistency; +-- +-- Cleanup +-- +SET client_min_messages TO WARNING; +DROP SCHEMA adaptive_executor_batching CASCADE; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index a6643e3b768..6487ce2bc98 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -82,7 +82,7 @@ test: multi_basic_queries cross_join multi_complex_expressions multi_subquery mu test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql test: sql_procedure multi_function_in_join row_types materialized_view test: sql_procedure_no_transaction_block -test: multi_subquery_in_where_reference_clause adaptive_executor propagate_set_commands geqo +test: multi_subquery_in_where_reference_clause adaptive_executor adaptive_executor_batching propagate_set_commands geqo test: forcedelegation_functions system_queries # this should be run alone as it gets too many clients test: join_pushdown diff --git a/src/test/regress/sql/adaptive_executor_batching.sql b/src/test/regress/sql/adaptive_executor_batching.sql new file mode 100644 index 00000000000..027ed0faad4 --- /dev/null +++ b/src/test/regress/sql/adaptive_executor_batching.sql @@ -0,0 +1,193 @@ +-- +-- Tests for the batched adaptive executor (reentrant execution) +-- +CREATE SCHEMA adaptive_executor_batching; +SET search_path TO adaptive_executor_batching; + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 802009000; + +CREATE TABLE batch_test (x int, y text); +SELECT create_distributed_table('batch_test', 'x'); + +-- Insert enough rows to exercise multiple batches +INSERT INTO batch_test SELECT i, 'row_' || i FROM generate_series(1, 200) i; + +-- Verify baseline: all rows present +SELECT count(*) FROM batch_test; + +-- +-- Test 1: Force small batch size and verify all rows are returned +-- +SET citus.executor_batch_size TO 10; + +SELECT count(*) FROM batch_test; + +-- Ordered output to verify correctness across batches +SELECT x FROM batch_test WHERE x <= 20 ORDER BY x; + +-- +-- Test 2: Batch size of 1 (extreme case) +-- +SET citus.executor_batch_size TO 1; + +SELECT count(*) FROM batch_test; +SELECT x FROM batch_test WHERE x <= 5 ORDER BY x; + +-- +-- Test 3: Batch size larger than result set (single batch) +-- +SET citus.executor_batch_size TO 100000; + +SELECT count(*) FROM batch_test; + +-- +-- Test 4: Auto batch size (from work_mem) +-- +SET citus.executor_batch_size TO 0; + +SELECT count(*) FROM batch_test; + +-- +-- Test 5: Empty result set with batching +-- +SET citus.executor_batch_size TO 10; + +SELECT count(*) FROM batch_test WHERE x > 9999; + +-- +-- Test 6: LIMIT with batching +-- +SELECT x FROM batch_test ORDER BY x LIMIT 5; + +-- +-- Test 7: Aggregation across batches +-- +SELECT sum(x), min(x), max(x) FROM batch_test; + +-- +-- Test 8: DML with RETURNING across batches +-- +SET citus.executor_batch_size TO 10; + +-- Use a separate table for DML test +CREATE TABLE batch_dml_test (x int, y text); +SELECT create_distributed_table('batch_dml_test', 'x'); +INSERT INTO batch_dml_test SELECT i, 'val_' || i FROM generate_series(1, 50) i; + +WITH deleted AS ( + DELETE FROM batch_dml_test WHERE x <= 20 RETURNING x +) +SELECT count(*) FROM deleted; + +SELECT count(*) FROM batch_dml_test WHERE x <= 20; + +-- +-- Test 9: Verify GUC is session-level +-- +SHOW citus.executor_batch_size; + +SET citus.executor_batch_size TO 500; +SHOW citus.executor_batch_size; + +RESET citus.executor_batch_size; +SHOW citus.executor_batch_size; + +-- +-- Test 10: M1 — Between-batch error cleanup (CitusEndScan must release sessions) +-- +-- This validates that when an error occurs on the coordinator between +-- AdaptiveExecutorRun calls (mid-scan, mid-batch), CitusEndScan properly +-- cleans up distributed execution sessions via AdaptiveExecutorEnd. +-- Without the M1 fix, sessions/connections would leak on each error. +-- +SET citus.executor_batch_size TO 10; + +-- Helper: iterates a distributed query and raises error after N rows. +-- With batch_size=10 and error at row 15, the error fires during +-- the second batch — between the first AdaptiveExecutorRun (which +-- returned control to the executor) and the scan completing. +CREATE OR REPLACE FUNCTION trigger_between_batch_error(fail_after int) +RETURNS void AS $$ +DECLARE + r RECORD; + cnt int := 0; +BEGIN + FOR r IN SELECT x FROM batch_test ORDER BY x LOOP + cnt := cnt + 1; + IF cnt >= fail_after THEN + RAISE EXCEPTION 'intentional error at row %', cnt; + END IF; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +-- Single invocation: error should be caught cleanly +DO $$ +BEGIN + PERFORM trigger_between_batch_error(15); +EXCEPTION WHEN OTHERS THEN + RAISE NOTICE 'caught: %', SQLERRM; +END; +$$; + +-- Repeat 20 times to verify connections don't accumulate across errors. +-- If CitusEndScan didn't clean up, we'd exhaust the connection pool. +DO $$ +BEGIN + FOR i IN 1..20 LOOP + BEGIN + PERFORM trigger_between_batch_error(15); + EXCEPTION WHEN OTHERS THEN + -- expected, swallow + END; + END LOOP; +END; +$$; + +-- After 20 error cycles, distributed queries must still work +SELECT count(*) FROM batch_test; + +-- Also verify with a cursor closed mid-batch (non-error path through +-- CitusEndScan with finishedRemoteScan = false) +BEGIN; +DECLARE c CURSOR FOR SELECT x FROM batch_test ORDER BY x; +FETCH 5 FROM c; +CLOSE c; +COMMIT; + +-- Still healthy after cursor close mid-batch +SELECT count(*) FROM batch_test; + +-- +-- Test 11: Cross-batch-size result consistency +-- +-- Runs the same analytical queries at batch sizes 1, 10, 100, 10000, and 0 +-- (auto) and asserts identical results. Any batch-boundary bug would cause +-- a mismatch. +-- +CREATE TABLE batch_consistency AS +SELECT + bs, + (SELECT count(*) FROM batch_test)::bigint AS cnt, + (SELECT sum(x) FROM batch_test)::bigint AS s, + (SELECT min(x) FROM batch_test)::int AS mn, + (SELECT max(x) FROM batch_test)::int AS mx, + (SELECT count(*) FROM batch_test WHERE x BETWEEN 50 AND 150)::bigint AS range_cnt +FROM (SELECT unnest(ARRAY[1, 10, 100, 10000]) bs) sizes, +LATERAL (SELECT set_config('citus.executor_batch_size', bs::text, false)) AS apply; + +-- All batch sizes must produce the same result — collapse to 1 distinct row +SELECT count(DISTINCT (cnt, s, mn, mx, range_cnt)) AS distinct_results FROM batch_consistency; + +-- Show the actual values (should be a single row) +SELECT DISTINCT cnt, s, mn, mx, range_cnt FROM batch_consistency; + +DROP TABLE batch_consistency; + +-- +-- Cleanup +-- +SET client_min_messages TO WARNING; +DROP SCHEMA adaptive_executor_batching CASCADE; From 7d93230c19d2427dde9e3616e18a94c9e424e508 Mon Sep 17 00:00:00 2001 From: Colm McHugh Date: Tue, 14 Apr 2026 12:28:42 +0000 Subject: [PATCH 3/3] Preserve wait event set across batched executions. - The wait event set was getting re-created for every batch. Now it is preserved across the entire distributed execution. - Also remove distributed planner's marking of the Citus scan with backward scan support. This requires FinalizePlan() to re-insert a Material node if a scrollable cursor has been requested. - Update backward scan tests to declare cursors as scrollable. Also add EXPLAIN ANALYZE tests. --- .../distributed/executor/adaptive_executor.c | 22 +- .../distributed/executor/citus_custom_scan.c | 3 +- .../distributed/planner/distributed_planner.c | 35 +- .../planner/function_call_delegation.c | 3 +- src/include/distributed/distributed_planner.h | 3 +- .../expected/adaptive_executor_batching.out | 545 ++++++++++++++++++ .../expected/arbitrary_configs_router.out | 2 +- .../expected/multi_mx_reference_table.out | 4 +- .../expected/multi_mx_router_planner.out | 2 +- .../expected/multi_mx_schema_support.out | 4 +- .../expected/multi_reference_table.out | 2 +- .../regress/expected/multi_router_planner.out | 2 +- .../multi_router_planner_fast_path.out | 2 +- .../regress/expected/multi_schema_support.out | 4 +- .../expected/multi_utility_statements.out | 10 +- .../sql/adaptive_executor_batching.sql | 152 +++++ .../regress/sql/arbitrary_configs_router.sql | 2 +- .../regress/sql/multi_mx_reference_table.sql | 4 +- .../regress/sql/multi_mx_router_planner.sql | 2 +- .../regress/sql/multi_mx_schema_support.sql | 4 +- .../regress/sql/multi_reference_table.sql | 2 +- src/test/regress/sql/multi_router_planner.sql | 2 +- .../sql/multi_router_planner_fast_path.sql | 2 +- src/test/regress/sql/multi_schema_support.sql | 4 +- .../regress/sql/multi_utility_statements.sql | 10 +- 25 files changed, 778 insertions(+), 49 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index f4d0e30f977..59dcdc60e9f 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -537,7 +537,7 @@ bool PreventIncompleteConnectionEstablishment = true; /* GUC, number of rows per batch (0 = auto from work_mem) */ int ExecutorBatchSize = 0; -/* GUC, libpq chunk size in bytes for chunked row mode (PG17+) */ +/* GUC, max rows per PQgetResult() chunk in chunked row mode (PG17+) */ int ExecutorChunkSize = 8192; @@ -1613,6 +1613,9 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution) static void FinishDistributedExecution(DistributedExecution *execution) { + /* Free WaitEventSet that may have been preserved across batches */ + FreeExecutionWaitEvents(execution); + if (!execution->sessionsCleanedUp) { CleanUpSessions(execution); @@ -2132,8 +2135,11 @@ RunDistributedExecution(DistributedExecution *execution, bool toCompletion) bool cancellationReceived = false; - /* always (re)build the wait event set the first time */ - execution->rebuildWaitEventSet = true; + /* build the wait event set on first entry; reuse across batches */ + if (execution->waitEventSet == NULL) + { + execution->rebuildWaitEventSet = true; + } execution->rowsReceivedInCurrentRun = 0; int maxBatchSize = execution->maxBatchSize > 0 ? @@ -2219,10 +2225,14 @@ RunDistributedExecution(DistributedExecution *execution, bool toCompletion) pfree(events); } - if (execution->waitEventSet != NULL) + /* + * Only free the WaitEventSet when running to completion. + * For batched execution, preserve it across re-entries to + * avoid expensive epoll_create/close syscalls per batch. + */ + if (toCompletion) { - FreeWaitEventSet(execution->waitEventSet); - execution->waitEventSet = NULL; + FreeExecutionWaitEvents(execution); } } PG_CATCH(); diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index c1e404b218b..ad9cca1a1dd 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -284,9 +284,8 @@ CitusExecScan(CustomScanState *node) } TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState); - if (TupIsNull(resultSlot) && !scanState->finishedRemoteScan) + if (TupIsNull(resultSlot) && !scanState->finishedRemoteScan) { - /* clear the tuple store for the next batch */ tuplestore_clear(scanState->tuplestorestate); scanState->finishedRemoteScan = AdaptiveExecutorRun(scanState); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index ed8ea3b3ac2..956f8b90b90 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -94,7 +94,8 @@ static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, Query *query, ParamListInfo boundParams, PlannerRestrictionContext * - plannerRestrictionContext); + plannerRestrictionContext, + int cursorOptions); static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId); @@ -852,7 +853,8 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext) distributedPlan->planId = planId; /* create final plan by combining local plan with distributed plan */ - resultPlan = FinalizePlan(planContext->plan, distributedPlan); + resultPlan = FinalizePlan(planContext->plan, distributedPlan, + planContext->cursorOptions); /* * As explained above, force planning costs to be unrealistically high if @@ -901,7 +903,9 @@ InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, planContext->query, planContext->boundParams, planContext-> - plannerRestrictionContext); + plannerRestrictionContext, + planContext-> + cursorOptions); return result; } @@ -917,7 +921,8 @@ static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, Query *originalQuery, Query *query, ParamListInfo boundParams, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext, + int cursorOptions) { MemoryContext savedContext = CurrentMemoryContext; PlannedStmt *result = NULL; @@ -929,6 +934,7 @@ TryCreateDistributedPlannedStmt(PlannedStmt *localPlan, planContext->originalQuery = originalQuery; planContext->query = query; planContext->plannerRestrictionContext = plannerRestrictionContext; + planContext->cursorOptions = cursorOptions; PG_TRY(); @@ -1436,7 +1442,8 @@ GetDistributedPlan(CustomScan *customScan) * which can be run by the PostgreSQL executor. */ PlannedStmt * -FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) +FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan, + int cursorOptions) { PlannedStmt *finalPlan = NULL; CustomScan *customScan = makeNode(CustomScan); @@ -1497,8 +1504,8 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) customScan->custom_private = list_make1(distributedPlanData); - /* necessary to avoid extra Result node in PG15 */ - customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN | CUSTOMPATH_SUPPORT_PROJECTION; + /* CUSTOMPATH_SUPPORT_PROJECTION avoids an extra Result node in PG15+ */ + customScan->flags = CUSTOMPATH_SUPPORT_PROJECTION; /* * Fast path queries cannot have any subplans by definition, so skip @@ -1525,6 +1532,20 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) finalPlan = FinalizeRouterPlan(localPlan, customScan); } + /* + * For SCROLL cursors, wrap the plan in a Material node so that backward + * scan works correctly with batched execution. PG's standard_planner() + * already added a Material node, but Citus discarded the entire plan tree + * above and replaced it with a CustomScan. Re-apply it here. + * + * Material is lazy (non-blocking): it fetches one tuple at a time from the + * child and appends it to its own tuplestore, so batching is preserved. + */ + if (cursorOptions & CURSOR_OPT_SCROLL) + { + finalPlan->planTree = materialize_finished_plan(finalPlan->planTree); + } + return finalPlan; } diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 7f6b107f3b2..62895b328ec 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -499,7 +499,8 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) /* worker will take care of any necessary locking, treat query as read-only */ distributedPlan->modLevel = ROW_MODIFY_READONLY; - return FinalizePlan(planContext->plan, distributedPlan); + return FinalizePlan(planContext->plan, distributedPlan, + planContext->cursorOptions); } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index b0c3347bec7..f1e58af64f3 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -279,7 +279,8 @@ extern LOCKMODE GetQueryLockMode(Query *query); extern int32 BlessRecordExpression(Expr *expr); extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan); extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan, - struct DistributedPlan *distributedPlan); + struct DistributedPlan *distributedPlan, + int cursorOptions); extern bool ContainsSingleShardTable(Query *query); extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query); diff --git a/src/test/regress/expected/adaptive_executor_batching.out b/src/test/regress/expected/adaptive_executor_batching.out index 224251905e1..f1434e72a1b 100644 --- a/src/test/regress/expected/adaptive_executor_batching.out +++ b/src/test/regress/expected/adaptive_executor_batching.out @@ -294,6 +294,551 @@ SELECT DISTINCT cnt, s, mn, mx, range_cnt FROM batch_consistency; DROP TABLE batch_consistency; -- +-- Test 12: EXPLAIN ANALYZE with batching +-- Verify that EXPLAIN ANALYZE works correctly when batching is active. +-- We check: (a) it runs without error, (b) row counts in the output are +-- correct, and (c) it works with result sets that are exact multiples of +-- the batch size as well as non-multiples. +-- +-- 12a: exact multiple of batch size (200 rows, batch_size=50 → 4 batches) +SET citus.executor_batch_size TO 50; +SELECT public.explain_filter('EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) SELECT * FROM adaptive_executor_batching.batch_test'); + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on batch_test_802009000 batch_test (actual rows=N loops=N) +(8 rows) + +-- Verify the actual query still returns the right count +SELECT count(*) FROM batch_test; + count +--------------------------------------------------------------------- + 200 +(1 row) + +-- 12b: non-multiple of batch size (200 rows, batch_size=30 → 6 full + 1 partial) +SET citus.executor_batch_size TO 30; +SELECT public.explain_filter('EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) SELECT * FROM adaptive_executor_batching.batch_test'); + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on batch_test_802009000 batch_test (actual rows=N loops=N) +(8 rows) + +SELECT count(*) FROM batch_test; + count +--------------------------------------------------------------------- + 200 +(1 row) + +-- 12c: small batch_size (200 rows, batch_size=7 → many small batches) +SET citus.executor_batch_size TO 7; +SELECT public.explain_filter('EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) SELECT x, y FROM adaptive_executor_batching.batch_test WHERE x <= 10'); + explain_filter +--------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (actual rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Seq Scan on batch_test_802009000 batch_test (actual rows=N loops=N) + Filter: (x <= N) + Rows Removed by Filter: N +(10 rows) + +SELECT count(*) FROM batch_test WHERE x <= 10; + count +--------------------------------------------------------------------- + 10 +(1 row) + +-- 12d: EXPLAIN ANALYZE with aggregation across batches +SET citus.executor_batch_size TO 25; +SELECT public.explain_filter('EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) SELECT count(*), sum(x), min(x), max(x) FROM adaptive_executor_batching.batch_test'); + explain_filter +--------------------------------------------------------------------- + Aggregate (actual rows=N loops=N) + -> Custom Scan (Citus Adaptive) (actual rows=N loops=N) + Task Count: N + Tuple data received from nodes: N bytes + Tasks Shown: One of N + -> Task + Tuple data received from node: N bytes + Node: host=localhost port=N dbname=regression + -> Aggregate (actual rows=N loops=N) + -> Seq Scan on batch_test_802009000 batch_test (actual rows=N loops=N) +(10 rows) + +SELECT count(*), sum(x), min(x), max(x) FROM batch_test; + count | sum | min | max +--------------------------------------------------------------------- + 200 | 20100 | 1 | 200 +(1 row) + +-- +-- Test 13: Scrollable cursor with batching +-- Verifies that DECLARE SCROLL CURSOR works correctly with the batched +-- executor. For scrollable cursors, the executor preserves the tuplestore +-- across batches (skips tuplestore_clear) so backward fetches can access +-- rows from earlier batches. +-- +-- With batch_size=10, a 50-row result spans 5 batches: +-- batch 1: rows 1-10, batch 2: rows 11-20, batch 3: rows 21-30, +-- batch 4: rows 31-40, batch 5: rows 41-50 +-- +SET citus.executor_batch_size TO 10; +-- 13a: Forward/backward zigzag across batch boundaries +BEGIN; +DECLARE scroll_cur SCROLL CURSOR FOR + SELECT x FROM batch_test WHERE x <= 50 ORDER BY x; +-- Read into batch 2 (rows 1-15) +FETCH FORWARD 15 FROM scroll_cur; + x +--------------------------------------------------------------------- + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 +(15 rows) + +-- Backward 10: crosses batch 2→1 boundary, returns rows 14..5 +FETCH BACKWARD 10 FROM scroll_cur; + x +--------------------------------------------------------------------- + 14 + 13 + 12 + 11 + 10 + 9 + 8 + 7 + 6 + 5 +(10 rows) + +-- Forward 5 from position 5: returns rows 6..10 +FETCH FORWARD 5 FROM scroll_cur; + x +--------------------------------------------------------------------- + 6 + 7 + 8 + 9 + 10 +(5 rows) + +-- Forward 15 more: crosses batch 1→2→3, returns rows 11..25 +FETCH FORWARD 15 FROM scroll_cur; + x +--------------------------------------------------------------------- + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 + 21 + 22 + 23 + 24 + 25 +(15 rows) + +-- Backward 20: crosses batch 3→2→1, returns rows 24..5 +FETCH BACKWARD 20 FROM scroll_cur; + x +--------------------------------------------------------------------- + 24 + 23 + 22 + 21 + 20 + 19 + 18 + 17 + 16 + 15 + 14 + 13 + 12 + 11 + 10 + 9 + 8 + 7 + 6 + 5 +(20 rows) + +CLOSE scroll_cur; +COMMIT; +-- 13b: FETCH FIRST / LAST / ABSOLUTE across all batches +BEGIN; +DECLARE abs_cur SCROLL CURSOR FOR + SELECT x FROM batch_test WHERE x <= 50 ORDER BY x; +-- Jump to various positions across different batches +FETCH ABSOLUTE 5 FROM abs_cur; -- batch 1 + x +--------------------------------------------------------------------- + 5 +(1 row) + +FETCH ABSOLUTE 45 FROM abs_cur; -- batch 5 + x +--------------------------------------------------------------------- + 45 +(1 row) + +FETCH ABSOLUTE 1 FROM abs_cur; -- batch 1 (backward jump) + x +--------------------------------------------------------------------- + 1 +(1 row) + +FETCH ABSOLUTE 50 FROM abs_cur; -- batch 5 (last row) + x +--------------------------------------------------------------------- + 50 +(1 row) + +FETCH ABSOLUTE 15 FROM abs_cur; -- batch 2 (backward jump) + x +--------------------------------------------------------------------- + 15 +(1 row) + +FETCH ABSOLUTE 35 FROM abs_cur; -- batch 4 (forward jump) + x +--------------------------------------------------------------------- + 35 +(1 row) + +FETCH ABSOLUTE 10 FROM abs_cur; -- batch 1 (backward jump across 3 batches) + x +--------------------------------------------------------------------- + 10 +(1 row) + +-- FIRST and LAST +FETCH LAST FROM abs_cur; + x +--------------------------------------------------------------------- + 50 +(1 row) + +FETCH FIRST FROM abs_cur; + x +--------------------------------------------------------------------- + 1 +(1 row) + +FETCH LAST FROM abs_cur; + x +--------------------------------------------------------------------- + 50 +(1 row) + +CLOSE abs_cur; +COMMIT; +-- 13c: Read all rows forward, then backward through all batches +BEGIN; +DECLARE full_cur SCROLL CURSOR FOR + SELECT x FROM batch_test WHERE x <= 30 ORDER BY x; +-- Drain all 3 batches forward +FETCH FORWARD ALL FROM full_cur; + x +--------------------------------------------------------------------- + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 + 21 + 22 + 23 + 24 + 25 + 26 + 27 + 28 + 29 + 30 +(30 rows) + +-- Now backward through all 3 batches +FETCH BACKWARD 30 FROM full_cur; + x +--------------------------------------------------------------------- + 30 + 29 + 28 + 27 + 26 + 25 + 24 + 23 + 22 + 21 + 20 + 19 + 18 + 17 + 16 + 15 + 14 + 13 + 12 + 11 + 10 + 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1 +(30 rows) + +-- And forward to the end again +FETCH FORWARD ALL FROM full_cur; + x +--------------------------------------------------------------------- + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 + 21 + 22 + 23 + 24 + 25 + 26 + 27 + 28 + 29 + 30 +(29 rows) + +CLOSE full_cur; +COMMIT; +-- 13d: FETCH RELATIVE across batch boundaries +BEGIN; +DECLARE rel_cur SCROLL CURSOR FOR + SELECT x FROM batch_test WHERE x <= 50 ORDER BY x; +FETCH ABSOLUTE 25 FROM rel_cur; -- middle of batch 3 + x +--------------------------------------------------------------------- + 25 +(1 row) + +FETCH RELATIVE -15 FROM rel_cur; -- jump back to batch 1 (row 10) + x +--------------------------------------------------------------------- + 10 +(1 row) + +FETCH RELATIVE 30 FROM rel_cur; -- jump forward to batch 4 (row 40) + x +--------------------------------------------------------------------- + 40 +(1 row) + +FETCH RELATIVE -35 FROM rel_cur; -- jump back to batch 1 (row 5) + x +--------------------------------------------------------------------- + 5 +(1 row) + +FETCH RELATIVE 10 FROM rel_cur; -- forward within batch 2 (row 15) + x +--------------------------------------------------------------------- + 15 +(1 row) + +CLOSE rel_cur; +COMMIT; +-- 13e: SCROLL cursor with inlinable CTE +-- Exercises the CTE-inlining planner path (InlineCtesAndCreateDistributedPlannedStmt +-- → TryCreateDistributedPlannedStmt → CreateDistributedPlannedStmt → FinalizePlan) +-- to verify that cursorOptions (CURSOR_OPT_SCROLL) is forwarded through +-- TryCreateDistributedPlannedStmt so FinalizePlan wraps the plan in a Material node. +BEGIN; +DECLARE cte_scroll SCROLL CURSOR FOR + WITH batch_cte AS ( + SELECT x FROM batch_test WHERE x <= 30 + ) + SELECT x FROM batch_cte ORDER BY x; +-- Forward into batch 2 +FETCH FORWARD 15 FROM cte_scroll; + x +--------------------------------------------------------------------- + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 +(15 rows) + +-- Backward across batch boundary (rows 14..5) +FETCH BACKWARD 10 FROM cte_scroll; + x +--------------------------------------------------------------------- + 14 + 13 + 12 + 11 + 10 + 9 + 8 + 7 + 6 + 5 +(10 rows) + +-- Forward to end (rows 6..30) +FETCH FORWARD ALL FROM cte_scroll; + x +--------------------------------------------------------------------- + 6 + 7 + 8 + 9 + 10 + 11 + 12 + 13 + 14 + 15 + 16 + 17 + 18 + 19 + 20 + 21 + 22 + 23 + 24 + 25 + 26 + 27 + 28 + 29 + 30 +(25 rows) + +-- Backward all the way (rows 29..1) +FETCH BACKWARD ALL FROM cte_scroll; + x +--------------------------------------------------------------------- + 30 + 29 + 28 + 27 + 26 + 25 + 24 + 23 + 22 + 21 + 20 + 19 + 18 + 17 + 16 + 15 + 14 + 13 + 12 + 11 + 10 + 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1 +(30 rows) + +CLOSE cte_scroll; +COMMIT; +-- Reset batch size +RESET citus.executor_batch_size; +-- -- Cleanup -- SET client_min_messages TO WARNING; diff --git a/src/test/regress/expected/arbitrary_configs_router.out b/src/test/regress/expected/arbitrary_configs_router.out index a42b955cc89..ea369220b7a 100644 --- a/src/test/regress/expected/arbitrary_configs_router.out +++ b/src/test/regress/expected/arbitrary_configs_router.out @@ -969,7 +969,7 @@ END; WARNING: there is no transaction in progress -- cursor queries are fast-path router plannable BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM articles_hash WHERE author_id = 1 diff --git a/src/test/regress/expected/multi_mx_reference_table.out b/src/test/regress/expected/multi_mx_reference_table.out index ee1f07e3a0c..2d9c6033b95 100644 --- a/src/test/regress/expected/multi_mx_reference_table.out +++ b/src/test/regress/expected/multi_mx_reference_table.out @@ -539,7 +539,7 @@ SELECT * FROM reference_table_test WHERE value_1 = 1; END; -- cursor queries also works fine BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM reference_table_test WHERE value_1 = 1 OR value_1 = 2 @@ -927,4 +927,4 @@ SELECT count(*) FROM numbers; RESET log_min_messages; -- clean up tables -DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, numbers; +DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, colocated_table_test, colocated_table_test_2, numbers; diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index 5ac6093cb99..37340ef1205 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -1301,7 +1301,7 @@ DEBUG: query has a single distribution column value: 1 END; -- cursor queries are router plannable BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM articles_hash_mx WHERE author_id = 1 diff --git a/src/test/regress/expected/multi_mx_schema_support.out b/src/test/regress/expected/multi_mx_schema_support.out index 4e61d85d8bf..821de7c644c 100644 --- a/src/test/regress/expected/multi_mx_schema_support.out +++ b/src/test/regress/expected/multi_mx_schema_support.out @@ -25,7 +25,7 @@ SELECT * FROM citus_mx_test_schema.nation_hash ORDER BY n_nationkey LIMIT 4; -- test cursors SET search_path TO public; BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM nation_hash WHERE n_nationkey = 1; @@ -50,7 +50,7 @@ END; -- test with search_path is set SET search_path TO citus_mx_test_schema; BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM nation_hash WHERE n_nationkey = 1; diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index b86606f7cb9..3a4db4441e5 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -560,7 +560,7 @@ SELECT * FROM reference_table_test WHERE value_1 = 1; END; -- cursor queries also works fine BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM reference_table_test WHERE value_1 = 1 OR value_1 = 2 diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index ce68d133d4f..7c5e2904200 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -2437,7 +2437,7 @@ DEBUG: query has a single distribution column value: 1 END; -- cursor queries are router plannable BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM articles_hash WHERE author_id = 1 diff --git a/src/test/regress/expected/multi_router_planner_fast_path.out b/src/test/regress/expected/multi_router_planner_fast_path.out index e483660ee03..bd2a57943bf 100644 --- a/src/test/regress/expected/multi_router_planner_fast_path.out +++ b/src/test/regress/expected/multi_router_planner_fast_path.out @@ -1323,7 +1323,7 @@ END; WARNING: there is no transaction in progress -- cursor queries are fast-path router plannable BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM articles_hash WHERE author_id = 1 diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index a096d82e897..af66af0dafd 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -110,7 +110,7 @@ RESET citus.shard_replication_factor; -- test cursors SET search_path TO public; BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM test_schema_support.nation_append WHERE n_nationkey = 1; @@ -136,7 +136,7 @@ END; -- test with search_path is set SET search_path TO test_schema_support; BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM nation_append WHERE n_nationkey = 1; diff --git a/src/test/regress/expected/multi_utility_statements.out b/src/test/regress/expected/multi_utility_statements.out index ccfe3a33317..2b735960b50 100644 --- a/src/test/regress/expected/multi_utility_statements.out +++ b/src/test/regress/expected/multi_utility_statements.out @@ -216,7 +216,7 @@ SELECT create_distributed_table('cursor_me', 'x'); (1 row) INSERT INTO cursor_me SELECT s/10, s FROM generate_series(1, 100) s; -DECLARE holdCursor CURSOR WITH HOLD FOR +DECLARE holdCursor SCROLL CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = 1 ORDER BY y; FETCH NEXT FROM holdCursor; x | y @@ -257,7 +257,7 @@ FETCH FORWARD 3 FROM holdCursor; CLOSE holdCursor; -- Test DECLARE CURSOR .. WITH HOLD inside transaction block BEGIN; -DECLARE holdCursor CURSOR WITH HOLD FOR +DECLARE holdCursor SCROLL CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = 1 ORDER BY y; FETCH 3 FROM holdCursor; x | y @@ -328,7 +328,7 @@ CLOSE holdCursor; -- Test DECLARE CURSOR .. WITH HOLD with parameter CREATE OR REPLACE FUNCTION declares_cursor(p int) RETURNS void AS $$ - DECLARE c CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = $1; + DECLARE c SCROLL CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = $1; $$ LANGUAGE SQL; SELECT declares_cursor(5); declares_cursor @@ -387,7 +387,7 @@ SELECT declares_cursor_2(); -- Test DECLARE CURSOR .. WITH HOLD with parameter on non-dist key CREATE OR REPLACE FUNCTION declares_cursor_3(p int) RETURNS void AS $$ - DECLARE c3 CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE y = $1; + DECLARE c3 SCROLL CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE y = $1; $$ LANGUAGE SQL; SELECT declares_cursor_3(5); declares_cursor_3 @@ -427,7 +427,7 @@ CLOSE c3; -- Test DECLARE CURSOR .. WITH HOLD with parameter on dist key, but not fast-path planner CREATE OR REPLACE FUNCTION declares_cursor_4(p int) RETURNS void AS $$ - DECLARE c4 CURSOR WITH HOLD FOR SELECT *, (SELECT 1) FROM cursor_me WHERE x = $1; + DECLARE c4 SCROLL CURSOR WITH HOLD FOR SELECT *, (SELECT 1) FROM cursor_me WHERE x = $1; $$ LANGUAGE SQL; SELECT declares_cursor_4(5); declares_cursor_4 diff --git a/src/test/regress/sql/adaptive_executor_batching.sql b/src/test/regress/sql/adaptive_executor_batching.sql index 027ed0faad4..77ae1391540 100644 --- a/src/test/regress/sql/adaptive_executor_batching.sql +++ b/src/test/regress/sql/adaptive_executor_batching.sql @@ -186,6 +186,158 @@ SELECT DISTINCT cnt, s, mn, mx, range_cnt FROM batch_consistency; DROP TABLE batch_consistency; +-- +-- Test 12: EXPLAIN ANALYZE with batching +-- Verify that EXPLAIN ANALYZE works correctly when batching is active. +-- We check: (a) it runs without error, (b) row counts in the output are +-- correct, and (c) it works with result sets that are exact multiples of +-- the batch size as well as non-multiples. +-- + +-- 12a: exact multiple of batch size (200 rows, batch_size=50 → 4 batches) +SET citus.executor_batch_size TO 50; +SELECT public.explain_filter('EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) SELECT * FROM adaptive_executor_batching.batch_test'); + +-- Verify the actual query still returns the right count +SELECT count(*) FROM batch_test; + +-- 12b: non-multiple of batch size (200 rows, batch_size=30 → 6 full + 1 partial) +SET citus.executor_batch_size TO 30; +SELECT public.explain_filter('EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) SELECT * FROM adaptive_executor_batching.batch_test'); + +SELECT count(*) FROM batch_test; + +-- 12c: small batch_size (200 rows, batch_size=7 → many small batches) +SET citus.executor_batch_size TO 7; +SELECT public.explain_filter('EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) SELECT x, y FROM adaptive_executor_batching.batch_test WHERE x <= 10'); + +SELECT count(*) FROM batch_test WHERE x <= 10; + +-- 12d: EXPLAIN ANALYZE with aggregation across batches +SET citus.executor_batch_size TO 25; +SELECT public.explain_filter('EXPLAIN (ANALYZE, COSTS OFF, TIMING OFF, SUMMARY OFF, BUFFERS OFF) SELECT count(*), sum(x), min(x), max(x) FROM adaptive_executor_batching.batch_test'); + +SELECT count(*), sum(x), min(x), max(x) FROM batch_test; + +-- +-- Test 13: Scrollable cursor with batching +-- Verifies that DECLARE SCROLL CURSOR works correctly with the batched +-- executor. For scrollable cursors, the executor preserves the tuplestore +-- across batches (skips tuplestore_clear) so backward fetches can access +-- rows from earlier batches. +-- +-- With batch_size=10, a 50-row result spans 5 batches: +-- batch 1: rows 1-10, batch 2: rows 11-20, batch 3: rows 21-30, +-- batch 4: rows 31-40, batch 5: rows 41-50 +-- +SET citus.executor_batch_size TO 10; + +-- 13a: Forward/backward zigzag across batch boundaries +BEGIN; +DECLARE scroll_cur SCROLL CURSOR FOR + SELECT x FROM batch_test WHERE x <= 50 ORDER BY x; + +-- Read into batch 2 (rows 1-15) +FETCH FORWARD 15 FROM scroll_cur; + +-- Backward 10: crosses batch 2→1 boundary, returns rows 14..5 +FETCH BACKWARD 10 FROM scroll_cur; + +-- Forward 5 from position 5: returns rows 6..10 +FETCH FORWARD 5 FROM scroll_cur; + +-- Forward 15 more: crosses batch 1→2→3, returns rows 11..25 +FETCH FORWARD 15 FROM scroll_cur; + +-- Backward 20: crosses batch 3→2→1, returns rows 24..5 +FETCH BACKWARD 20 FROM scroll_cur; + +CLOSE scroll_cur; +COMMIT; + +-- 13b: FETCH FIRST / LAST / ABSOLUTE across all batches +BEGIN; +DECLARE abs_cur SCROLL CURSOR FOR + SELECT x FROM batch_test WHERE x <= 50 ORDER BY x; + +-- Jump to various positions across different batches +FETCH ABSOLUTE 5 FROM abs_cur; -- batch 1 +FETCH ABSOLUTE 45 FROM abs_cur; -- batch 5 +FETCH ABSOLUTE 1 FROM abs_cur; -- batch 1 (backward jump) +FETCH ABSOLUTE 50 FROM abs_cur; -- batch 5 (last row) +FETCH ABSOLUTE 15 FROM abs_cur; -- batch 2 (backward jump) +FETCH ABSOLUTE 35 FROM abs_cur; -- batch 4 (forward jump) +FETCH ABSOLUTE 10 FROM abs_cur; -- batch 1 (backward jump across 3 batches) + +-- FIRST and LAST +FETCH LAST FROM abs_cur; +FETCH FIRST FROM abs_cur; +FETCH LAST FROM abs_cur; + +CLOSE abs_cur; +COMMIT; + +-- 13c: Read all rows forward, then backward through all batches +BEGIN; +DECLARE full_cur SCROLL CURSOR FOR + SELECT x FROM batch_test WHERE x <= 30 ORDER BY x; + +-- Drain all 3 batches forward +FETCH FORWARD ALL FROM full_cur; + +-- Now backward through all 3 batches +FETCH BACKWARD 30 FROM full_cur; + +-- And forward to the end again +FETCH FORWARD ALL FROM full_cur; + +CLOSE full_cur; +COMMIT; + +-- 13d: FETCH RELATIVE across batch boundaries +BEGIN; +DECLARE rel_cur SCROLL CURSOR FOR + SELECT x FROM batch_test WHERE x <= 50 ORDER BY x; + +FETCH ABSOLUTE 25 FROM rel_cur; -- middle of batch 3 +FETCH RELATIVE -15 FROM rel_cur; -- jump back to batch 1 (row 10) +FETCH RELATIVE 30 FROM rel_cur; -- jump forward to batch 4 (row 40) +FETCH RELATIVE -35 FROM rel_cur; -- jump back to batch 1 (row 5) +FETCH RELATIVE 10 FROM rel_cur; -- forward within batch 2 (row 15) + +CLOSE rel_cur; +COMMIT; + +-- 13e: SCROLL cursor with inlinable CTE +-- Exercises the CTE-inlining planner path (InlineCtesAndCreateDistributedPlannedStmt +-- → TryCreateDistributedPlannedStmt → CreateDistributedPlannedStmt → FinalizePlan) +-- to verify that cursorOptions (CURSOR_OPT_SCROLL) is forwarded through +-- TryCreateDistributedPlannedStmt so FinalizePlan wraps the plan in a Material node. +BEGIN; +DECLARE cte_scroll SCROLL CURSOR FOR + WITH batch_cte AS ( + SELECT x FROM batch_test WHERE x <= 30 + ) + SELECT x FROM batch_cte ORDER BY x; + +-- Forward into batch 2 +FETCH FORWARD 15 FROM cte_scroll; + +-- Backward across batch boundary (rows 14..5) +FETCH BACKWARD 10 FROM cte_scroll; + +-- Forward to end (rows 6..30) +FETCH FORWARD ALL FROM cte_scroll; + +-- Backward all the way (rows 29..1) +FETCH BACKWARD ALL FROM cte_scroll; + +CLOSE cte_scroll; +COMMIT; + +-- Reset batch size +RESET citus.executor_batch_size; + -- -- Cleanup -- diff --git a/src/test/regress/sql/arbitrary_configs_router.sql b/src/test/regress/sql/arbitrary_configs_router.sql index f59c5fa4a5d..ddcbaa16e51 100644 --- a/src/test/regress/sql/arbitrary_configs_router.sql +++ b/src/test/regress/sql/arbitrary_configs_router.sql @@ -462,7 +462,7 @@ END; -- cursor queries are fast-path router plannable BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM articles_hash WHERE author_id = 1 diff --git a/src/test/regress/sql/multi_mx_reference_table.sql b/src/test/regress/sql/multi_mx_reference_table.sql index e40f96c84fe..8e2d3220a5c 100644 --- a/src/test/regress/sql/multi_mx_reference_table.sql +++ b/src/test/regress/sql/multi_mx_reference_table.sql @@ -319,7 +319,7 @@ END; -- cursor queries also works fine BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM reference_table_test WHERE value_1 = 1 OR value_1 = 2 @@ -567,4 +567,4 @@ SELECT count(*) FROM numbers; RESET log_min_messages; -- clean up tables -DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, numbers; +DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, colocated_table_test, colocated_table_test_2, numbers; diff --git a/src/test/regress/sql/multi_mx_router_planner.sql b/src/test/regress/sql/multi_mx_router_planner.sql index 3593c2ac8d0..d973f1abefa 100644 --- a/src/test/regress/sql/multi_mx_router_planner.sql +++ b/src/test/regress/sql/multi_mx_router_planner.sql @@ -572,7 +572,7 @@ END; -- cursor queries are router plannable BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM articles_hash_mx WHERE author_id = 1 diff --git a/src/test/regress/sql/multi_mx_schema_support.sql b/src/test/regress/sql/multi_mx_schema_support.sql index 7f1e5d0de77..489d06275c8 100644 --- a/src/test/regress/sql/multi_mx_schema_support.sql +++ b/src/test/regress/sql/multi_mx_schema_support.sql @@ -13,7 +13,7 @@ SELECT * FROM citus_mx_test_schema.nation_hash ORDER BY n_nationkey LIMIT 4; -- test cursors SET search_path TO public; BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM nation_hash WHERE n_nationkey = 1; @@ -25,7 +25,7 @@ END; -- test with search_path is set SET search_path TO citus_mx_test_schema; BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM nation_hash WHERE n_nationkey = 1; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 924a135b0c5..7501a4f1772 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -339,7 +339,7 @@ END; -- cursor queries also works fine BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM reference_table_test WHERE value_1 = 1 OR value_1 = 2 diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index 20b8a59562e..f88d9e12dca 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -1115,7 +1115,7 @@ END; -- cursor queries are router plannable BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM articles_hash WHERE author_id = 1 diff --git a/src/test/regress/sql/multi_router_planner_fast_path.sql b/src/test/regress/sql/multi_router_planner_fast_path.sql index 56684c07575..f9b13483f90 100644 --- a/src/test/regress/sql/multi_router_planner_fast_path.sql +++ b/src/test/regress/sql/multi_router_planner_fast_path.sql @@ -608,7 +608,7 @@ END; -- cursor queries are fast-path router plannable BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM articles_hash WHERE author_id = 1 diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index 4876093bc31..eeff48d1627 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -139,7 +139,7 @@ RESET citus.shard_replication_factor; -- test cursors SET search_path TO public; BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM test_schema_support.nation_append WHERE n_nationkey = 1; @@ -151,7 +151,7 @@ END; -- test with search_path is set SET search_path TO test_schema_support; BEGIN; -DECLARE test_cursor CURSOR FOR +DECLARE test_cursor SCROLL CURSOR FOR SELECT * FROM nation_append WHERE n_nationkey = 1; diff --git a/src/test/regress/sql/multi_utility_statements.sql b/src/test/regress/sql/multi_utility_statements.sql index bec722aef62..ba03f838135 100644 --- a/src/test/regress/sql/multi_utility_statements.sql +++ b/src/test/regress/sql/multi_utility_statements.sql @@ -126,7 +126,7 @@ CREATE TABLE cursor_me (x int, y int); SELECT create_distributed_table('cursor_me', 'x'); INSERT INTO cursor_me SELECT s/10, s FROM generate_series(1, 100) s; -DECLARE holdCursor CURSOR WITH HOLD FOR +DECLARE holdCursor SCROLL CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = 1 ORDER BY y; FETCH NEXT FROM holdCursor; @@ -139,7 +139,7 @@ CLOSE holdCursor; -- Test DECLARE CURSOR .. WITH HOLD inside transaction block BEGIN; -DECLARE holdCursor CURSOR WITH HOLD FOR +DECLARE holdCursor SCROLL CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = 1 ORDER BY y; FETCH 3 FROM holdCursor; FETCH BACKWARD 3 FROM holdCursor; @@ -164,7 +164,7 @@ CLOSE holdCursor; -- Test DECLARE CURSOR .. WITH HOLD with parameter CREATE OR REPLACE FUNCTION declares_cursor(p int) RETURNS void AS $$ - DECLARE c CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = $1; + DECLARE c SCROLL CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = $1; $$ LANGUAGE SQL; SELECT declares_cursor(5); @@ -187,7 +187,7 @@ SELECT declares_cursor_2(); -- Test DECLARE CURSOR .. WITH HOLD with parameter on non-dist key CREATE OR REPLACE FUNCTION declares_cursor_3(p int) RETURNS void AS $$ - DECLARE c3 CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE y = $1; + DECLARE c3 SCROLL CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE y = $1; $$ LANGUAGE SQL; SELECT declares_cursor_3(5); @@ -202,7 +202,7 @@ CLOSE c3; -- Test DECLARE CURSOR .. WITH HOLD with parameter on dist key, but not fast-path planner CREATE OR REPLACE FUNCTION declares_cursor_4(p int) RETURNS void AS $$ - DECLARE c4 CURSOR WITH HOLD FOR SELECT *, (SELECT 1) FROM cursor_me WHERE x = $1; + DECLARE c4 SCROLL CURSOR WITH HOLD FOR SELECT *, (SELECT 1) FROM cursor_me WHERE x = $1; $$ LANGUAGE SQL; SELECT declares_cursor_4(5);