Skip to content

Streaming sorted merge#8545

Open
neildsh wants to merge 7 commits intocitusdata:sortedMergefrom
neildsh:streamingSortedMerge
Open

Streaming sorted merge#8545
neildsh wants to merge 7 commits intocitusdata:sortedMergefrom
neildsh:streamingSortedMerge

Conversation

@neildsh
Copy link
Copy Markdown

@neildsh neildsh commented Apr 15, 2026

Streaming Sorted Merge Adapter

Summary

Adds a streaming k-way merge adapter controlled by citus.enable_streaming_sorted_merge (bool, default off, experimental). When enabled alongside citus.enable_sorted_merge, the coordinator returns globally-sorted tuples directly from the binary heap instead of eagerly materializing all results into a tuplestore first. This reduces peak memory and improves time-to-first-tuple, especially for LIMIT queries.

Changes

SortedMergeAdapter (new opaque type in sorted_merge.c)

Function Purpose
CreateSortedMergeAdapter() Build adapter over K per-task stores (optional ownership transfer)
SortedMergeAdapterNext() Return next globally-sorted tuple via binary heap
SortedMergeAdapterRescan() Reset for re-read (CitusReScan)
FreeSortedMergeAdapter() Release all resources (CitusEndScan)

MergePerTaskStoresIntoFinalStore() is refactored to use the adapter internally (drain + free), eliminating duplicated merge logic.

Executor

  • adaptive_executor.c: Creates a SortedMergeAdapter with store ownership when streaming is active; falls back to eager merge otherwise.
  • multi_executor.c: New FetchNextScanTuple() dispatches between adapter and tuplestore. Includes a defensive error for backward scan requests that bypass the Material node (should never fire in practice — see below).
  • citus_custom_scan.c: Cleanup in CitusEndScan(); rescan in CitusReScan().
  • CitusScanState: New mergeAdapter field.

Backward scan & SCROLL cursors

The streaming adapter is forward-only by design. Two planner-level changes ensure SCROLL cursors still work correctly using standard PostgreSQL APIs:

  1. Drop CUSTOMPATH_SUPPORT_BACKWARD_SCAN: In FinalizePlan(), when streaming sorted merge is active, the CustomScan node's flags no longer include CUSTOMPATH_SUPPORT_BACKWARD_SCAN (extensible.h:0x0001). This tells PostgreSQL's ExecSupportsBackwardScan() that our node cannot satisfy backward fetches.

  2. Insert a Material node for SCROLL cursors: PostgreSQL's standard_planner() normally calls materialize_finished_plan() to wrap non-backward-scannable plans in a Material node when CURSOR_OPT_SCROLL is set. However, Citus replaces the entire plan tree after standard_planner() returns (via FinalizePlan()), discarding any Material node it inserted. We therefore re-perform the same check in CreateDistributedPlannedStmt():

    if ((planContext->cursorOptions & CURSOR_OPT_SCROLL) &&
        distributedPlan->useSortedMerge && EnableStreamingSortedMerge &&
        !ExecSupportsBackwardScan(resultPlan->planTree))
    {
        resultPlan->planTree = materialize_finished_plan(resultPlan->planTree);
    }

    The Material node buffers tuples and provides bidirectional access. The streaming adapter only ever sees forward requests. Test case G3b verifies that FETCH BACKWARD on a SCROLL cursor produces correct results with streaming mode active.

Tests

  • multi_orderby_pushdown_streaming.sql (new): Runs the full sorted-merge test suite twice — eager merge, then streaming — verifying identical results. Includes G3b for SCROLL cursor backward scan.
  • setup_multi_orderby_pushdown.sql (new): Extracted shared table setup.
  • Schedule updated to use the new combined test.

Testing

  • check-multi: 193/193 pass
  • check-style: pass
  • CI lint scripts: pass

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 15, 2026

Codecov Report

❌ Patch coverage is 11.60714% with 99 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (sortedMerge@86f82a1). Learn more about missing BASE report.

Additional details and impacted files
@@              Coverage Diff               @@
##             sortedMerge    #8545   +/-   ##
==============================================
  Coverage               ?   44.57%           
==============================================
  Files                  ?      287           
  Lines                  ?    63183           
  Branches               ?     7793           
==============================================
  Hits                   ?    28164           
  Misses                 ?    32534           
  Partials               ?     2485           
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant