Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
367 changes: 317 additions & 50 deletions src/backend/distributed/executor/adaptive_executor.c

Large diffs are not rendered by default.

30 changes: 26 additions & 4 deletions src/backend/distributed/executor/citus_custom_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "pg_version_constants.h"

#include "distributed/adaptive_executor.h"
#include "distributed/backend_data.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_custom_scan.h"
Expand Down Expand Up @@ -181,6 +182,9 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
{
CitusScanState *scanState = (CitusScanState *) node;

/* remember if we're serving a scrollable cursor (needs backward scan) */
scanState->scrollableCursor = (eflags & EXEC_FLAG_BACKWARD) != 0;

/*
* Make sure we can see notices during regular queries, which would typically
* be the result of a function that raises a notices being called.
Expand Down Expand Up @@ -264,11 +268,11 @@ CitusExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;

if (!scanState->finishedRemoteScan)
if (!scanState->executionStarted)
{
bool isMultiTaskPlan = IsMultiTaskPlan(scanState->distributedPlan);

AdaptiveExecutor(scanState);
AdaptiveExecutorStart(scanState);

if (isMultiTaskPlan)
{
Expand All @@ -279,10 +283,20 @@ CitusExecScan(CustomScanState *node)
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}

scanState->finishedRemoteScan = true;
scanState->executionStarted = true;
}

return ReturnTupleFromTuplestore(scanState);
TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState);
if (TupIsNull(resultSlot) && !scanState->finishedRemoteScan)
{
tuplestore_clear(scanState->tuplestorestate);

scanState->finishedRemoteScan = AdaptiveExecutorRun(scanState);

resultSlot = ReturnTupleFromTuplestore(scanState);
}

return resultSlot;
}


Expand Down Expand Up @@ -720,6 +734,7 @@ AdaptiveExecutorCreateScan(CustomScan *scan)

scanState->finishedPreScan = false;
scanState->finishedRemoteScan = false;
scanState->executionStarted = false;

return (Node *) scanState;
}
Expand Down Expand Up @@ -837,6 +852,13 @@ CitusEndScan(CustomScanState *node)
tuplestore_end(scanState->tuplestorestate);
scanState->tuplestorestate = NULL;
}

/*
* Clean up any in-flight distributed execution. This handles the case
* where an error occurs between batches in the adaptive executor,
* ensuring sessions and connections are properly released.
*/
AdaptiveExecutorEnd(scanState);
}


Expand Down
4 changes: 2 additions & 2 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1497,8 +1497,8 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)

customScan->custom_private = list_make1(distributedPlanData);

/* necessary to avoid extra Result node in PG15 */
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN | CUSTOMPATH_SUPPORT_PROJECTION;
/* CUSTOMPATH_SUPPORT_PROJECTION avoids an extra Result node in PG15+ */
customScan->flags = CUSTOMPATH_SUPPORT_PROJECTION;

/*
* Fast path queries cannot have any subplans by definition, so skip
Expand Down
25 changes: 25 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,31 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.executor_batch_size",
gettext_noop("Maximum number of rows per batch in the adaptive executor."),
gettext_noop("When set to 0 (the default), the batch size is automatically "
"calculated from work_mem and the estimated tuple size. A positive "
"value overrides the automatic calculation with a fixed row count."),
&ExecutorBatchSize,
0, 0, INT_MAX,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.executor_chunk_size",
gettext_noop("Chunk size in bytes for libpq chunked row mode."),
gettext_noop("Controls the chunk size passed to PQsetChunkedRowsMode when "
"fetching rows from workers. Larger values reduce per-result "
"overhead but increase memory usage per fetch. Only effective "
"on PostgreSQL 17 and later."),
&ExecutorChunkSize,
8192, 1, INT_MAX,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.executor_slow_start_interval",
gettext_noop("Time to wait between opening connections to the same worker node"),
Expand Down
12 changes: 12 additions & 0 deletions src/include/distributed/adaptive_executor.h
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
#ifndef ADAPTIVE_EXECUTOR_H
#define ADAPTIVE_EXECUTOR_H

#include "distributed/citus_custom_scan.h"
#include "distributed/multi_physical_planner.h"


/* GUC, determining whether Citus opens 1 connection per task */
extern bool ForceMaxQueryParallelization;
extern int MaxAdaptiveExecutorPoolSize;
extern bool EnableBinaryProtocol;

/* GUC, number of rows per batch (0 = auto from work_mem) */
extern int ExecutorBatchSize;

/* GUC, libpq chunk size in bytes for chunked row mode (PG17+) */
extern int ExecutorChunkSize;

/* GUC, number of ms to wait between opening connections to the same worker */
extern int ExecutorSlowStartInterval;
extern bool EnableCostBasedConnectionEstablishment;
extern bool PreventIncompleteConnectionEstablishment;

extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState);
extern void AdaptiveExecutorStart(CitusScanState *scanState);
extern bool AdaptiveExecutorRun(CitusScanState *scanState);
extern void AdaptiveExecutorEnd(CitusScanState *scanState);
extern bool ShouldRunTasksSequentially(List *taskList);
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList);
extern uint64 ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported);
extern uint64 ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,
Expand Down
7 changes: 7 additions & 0 deletions src/include/distributed/citus_custom_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "distributed/distributed_planner.h"
#include "distributed/multi_server_executor.h"

struct DistributedExecution;

typedef struct CitusScanState
{
CustomScanState customScanState; /* underlying custom scan node */
Expand All @@ -27,7 +29,12 @@ typedef struct CitusScanState
DistributedPlan *distributedPlan; /* distributed execution plan */
MultiExecutorType executorType; /* distributed executor type */
bool finishedRemoteScan; /* flag to check if remote scan is finished */
bool executionStarted; /* flag to check whether execution started */
bool scrollableCursor; /* true when serving a SCROLL cursor */
Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */

/* execution state when using adaptive executor */
struct DistributedExecution *execution;
} CitusScanState;


Expand Down
2 changes: 0 additions & 2 deletions src/include/distributed/multi_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ extern int ExecutorLevel;
extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);
extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
bool execute_once);
extern void AdaptiveExecutorPreExecutorRun(CitusScanState *scanState);
extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState);


/*
Expand Down
3 changes: 3 additions & 0 deletions src/test/regress/citus_tests/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ def extra_tests(self):
"multi_subquery_in_where_reference_clause": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
"adaptive_executor_batching": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
"subquery_in_where": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
Expand Down
Loading
Loading