Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions src/backend/distributed/executor/adaptive_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -947,23 +947,45 @@ AdaptiveExecutor(CitusScanState *scanState)
* When sorted merge is active, k-way merge the per-task stores into
* the final tuplestore. This produces globally sorted output that the
* existing ReturnTupleFromTuplestore() path can read unchanged.
*
* When streaming sorted merge is enabled, create an adapter instead
* that delivers tuples one at a time without a final tuplestore.
*/
if (execution->useSortedMerge && execution->perTaskStoreCount > 0)
{
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);

MergePerTaskStoresIntoFinalStore(scanState->tuplestorestate,
execution->perTaskStores,
execution->perTaskStoreCount,
distributedPlan->sortedMergeKeys,
distributedPlan->sortedMergeKeyCount,
tupleDescriptor);

/* free per-task stores — they are no longer needed */
for (int i = 0; i < execution->perTaskStoreCount; i++)
if (EnableStreamingSortedMerge)
{
/*
* Streaming mode: create an adapter that delivers tuples one
* at a time from the per-task stores via a binary heap. The
* adapter takes ownership of the per-task stores.
*/
scanState->mergeAdapter = CreateSortedMergeAdapter(
execution->perTaskStores,
execution->perTaskStoreCount,
distributedPlan->sortedMergeKeys,
distributedPlan->sortedMergeKeyCount,
tupleDescriptor,
true);
}
else
{
tuplestore_end(execution->perTaskStores[i]);
/* Eager mode (default): merge all tuples into a final tuplestore */
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);

MergePerTaskStoresIntoFinalStore(scanState->tuplestorestate,
execution->perTaskStores,
execution->perTaskStoreCount,
distributedPlan->sortedMergeKeys,
distributedPlan->sortedMergeKeyCount,
tupleDescriptor);

/* free per-task stores — they are no longer needed */
for (int i = 0; i < execution->perTaskStoreCount; i++)
{
tuplestore_end(execution->perTaskStores[i]);
}
}
}

Expand Down
14 changes: 13 additions & 1 deletion src/backend/distributed/executor/citus_custom_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/shard_utils.h"
#include "distributed/sorted_merge.h"
#include "distributed/stats/query_stats.h"
#include "distributed/stats/stat_counters.h"
#include "distributed/subplan_execution.h"
Expand Down Expand Up @@ -835,6 +836,12 @@ CitusEndScan(CustomScanState *node)
CitusQueryStatsExecutorsEntry(queryId, executorType, partitionKeyString);
}

if (scanState->mergeAdapter)
{
FreeSortedMergeAdapter(scanState->mergeAdapter);
scanState->mergeAdapter = NULL;
}

if (scanState->tuplestorestate)
{
tuplestore_end(scanState->tuplestorestate);
Expand All @@ -857,7 +864,12 @@ CitusReScan(CustomScanState *node)
ExecScanReScan(&node->ss);

CitusScanState *scanState = (CitusScanState *) node;
if (scanState->tuplestorestate)

if (scanState->mergeAdapter)
{
SortedMergeAdapterRescan(scanState->mergeAdapter);
}
else if (scanState->tuplestorestate)
{
tuplestore_rescan(scanState->tuplestorestate);
}
Expand Down
69 changes: 55 additions & 14 deletions src/backend/distributed/executor/multi_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "distributed/multi_server_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
#include "distributed/sorted_merge.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
Expand Down Expand Up @@ -88,6 +89,9 @@ bool SortReturning = false;
/* when true at planning time, enables coordinator sorted merge for ORDER BY */
bool EnableSortedMerge = false;

/* when true, uses streaming adapter instead of eager merge for sorted merge */
bool EnableStreamingSortedMerge = false;

/*
* How many nested executors have we started? This can happen for SQL
* UDF calls. The outer query starts an executor, then postgres opens
Expand Down Expand Up @@ -343,21 +347,60 @@ CitusCustomScanStateWalker(PlanState *planState, List **citusCustomScanStates)


/*
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
* given Citus scan node and returns it. It returns null if all tuples are read
* from the tuple store.
* FetchNextScanTuple loads the next tuple into the scan slot.
* Returns true if a tuple was loaded, false if exhausted.
*
* When a merge adapter is active, it streams from the adapter.
* Otherwise, it reads from the tuplestore in the given direction.
*/
TupleTableSlot *
ReturnTupleFromTuplestore(CitusScanState *scanState)
static inline bool
FetchNextScanTuple(CitusScanState *scanState, bool forward, TupleTableSlot *slot)
{
Tuplestorestate *tupleStore = scanState->tuplestorestate;
bool forwardScanDirection = true;
if (scanState->mergeAdapter != NULL)
{
/*
* The streaming merge adapter is forward-only.
*
* Citus replaces the entire plan tree after standard_planner()
* returns, so PostgreSQL's cursor-time materialize_finished_plan()
* check does not see the Citus CustomScan. That means SCROLL
* cursors can reach here with a backward scan request even though
* the adapter cannot satisfy it. Report a user-facing error
* rather than crashing.
*/
if (!forward)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("streaming sorted merge does not support "
"backward scan"),
errhint("Use SET citus.enable_streaming_sorted_merge "
"TO off to allow backward scan.")));
}
return SortedMergeAdapterNext(scanState->mergeAdapter, slot);
}

Tuplestorestate *tupleStore = scanState->tuplestorestate;
if (tupleStore == NULL)
{
return NULL;
ExecClearTuple(slot);
return false;
}

return tuplestore_gettupleslot(tupleStore, forward, false, slot);
}


/*
* ReturnTupleFromTuplestore reads the next tuple from the tuple store (or
* streaming merge adapter) of the given Citus scan node and returns it.
* It returns null if all tuples are read.
*/
TupleTableSlot *
ReturnTupleFromTuplestore(CitusScanState *scanState)
{
bool forwardScanDirection = true;

EState *executorState = ScanStateGetExecutorState(scanState);
ScanDirection scanDirection = executorState->es_direction;
Assert(ScanDirectionIsValid(scanDirection));
Expand All @@ -373,9 +416,9 @@ ReturnTupleFromTuplestore(CitusScanState *scanState)

if (!qual && !projInfo)
{
/* no quals, nor projections return directly from the tuple store. */
/* no quals, nor projections return directly from the tuple source. */
TupleTableSlot *slot = scanState->customScanState.ss.ss_ScanTupleSlot;
tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, slot);
FetchNextScanTuple(scanState, forwardScanDirection, slot);
return slot;
}

Expand All @@ -394,12 +437,10 @@ ReturnTupleFromTuplestore(CitusScanState *scanState)
ResetExprContext(econtext);

TupleTableSlot *slot = scanState->customScanState.ss.ss_ScanTupleSlot;
tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, slot);

if (TupIsNull(slot))
if (!FetchNextScanTuple(scanState, forwardScanDirection, slot))
{
/*
* When the tuple is null we have reached the end of the tuplestore. We will
* When the tuple is null we have reached the end of the source. We will
* return a null tuple, however, depending on the existence of a projection we
* need to either return the scan tuple or the projected tuple.
*/
Expand Down
Loading
Loading