diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query01.ans b/automation/sqlrepo/features/parallel/correctness/expected/query01.ans new file mode 100644 index 00000000..7672dea1 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query01.ans @@ -0,0 +1,13 @@ +-- @description query01 for PXF parallel scan correctness - count with parallel + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) FROM pxf_parallel_enabled; + count +------- + 10000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query02.ans b/automation/sqlrepo/features/parallel/correctness/expected/query02.ans new file mode 100644 index 00000000..4a77aafb --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query02.ans @@ -0,0 +1,13 @@ +-- @description query02 for PXF parallel scan correctness - sum aggregation + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT sum(id) FROM pxf_parallel_enabled; + sum +---------- + 50005000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query03.ans b/automation/sqlrepo/features/parallel/correctness/expected/query03.ans new file mode 100644 index 00000000..918acaa1 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query03.ans @@ -0,0 +1,21 @@ +-- @description query03 for PXF parallel scan correctness - cross-check parallel vs non-parallel count + +SET optimizer = off; +SET +SET enable_parallel = false; +SET +SELECT count(*) AS non_parallel_count FROM pxf_parallel_disabled; + non_parallel_count +-------------------- + 10000 +(1 row) + +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) AS parallel_count FROM pxf_parallel_enabled; + parallel_count +---------------- + 10000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query04.ans b/automation/sqlrepo/features/parallel/correctness/expected/query04.ans new file mode 100644 index 00000000..c3caefbe --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query04.ans @@ -0,0 +1,22 @@ +-- @description query04 for PXF parallel scan correctness - ORDER BY with LIMIT + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT id, val FROM pxf_parallel_enabled ORDER BY id LIMIT 10; + id | val +----+------- + 1 | row_1 + 2 | row_2 + 3 | row_3 + 4 | row_4 + 5 | row_5 + 6 | row_6 + 7 | row_7 + 8 | row_8 + 9 | row_9 + 10 | row_10 +(10 rows) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query05.ans b/automation/sqlrepo/features/parallel/correctness/expected/query05.ans new file mode 100644 index 00000000..7df9da3f --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query05.ans @@ -0,0 +1,13 @@ +-- @description query05 for PXF parallel scan correctness - MIN/MAX/AVG aggregates + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT min(id), max(id), avg(id) FROM pxf_parallel_enabled; + min | max | avg +-----+-------+----------------------- + 1 | 10000 | 5000.5000000000000000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query06.ans b/automation/sqlrepo/features/parallel/correctness/expected/query06.ans new file mode 100644 index 00000000..e37f743a --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query06.ans @@ -0,0 +1,13 @@ +-- @description query06 for PXF parallel scan correctness - WHERE pushdown with parallel + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) FROM pxf_parallel_enabled WHERE id > 5000; + count +------- + 5000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query07.ans b/automation/sqlrepo/features/parallel/correctness/expected/query07.ans new file mode 100644 index 00000000..078b7721 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query07.ans @@ -0,0 +1,17 @@ +-- @description query07 for PXF parallel scan correctness - column projection with WHERE + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT val FROM pxf_parallel_enabled WHERE id <= 5 ORDER BY val; + val +------- + row_1 + row_2 + row_3 + row_4 + row_5 +(5 rows) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query08.ans b/automation/sqlrepo/features/parallel/correctness/expected/query08.ans new file mode 100644 index 00000000..3d27ff32 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query08.ans @@ -0,0 +1,13 @@ +-- @description query08 for PXF parallel scan correctness - empty result edge case + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) FROM pxf_parallel_enabled WHERE id < 0; + count +------- + 0 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query09.ans b/automation/sqlrepo/features/parallel/correctness/expected/query09.ans new file mode 100644 index 00000000..250b4b75 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query09.ans @@ -0,0 +1,13 @@ +-- @description query09 for PXF parallel scan correctness - COUNT DISTINCT no duplicates + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(DISTINCT id) FROM pxf_parallel_enabled; + count +------- + 10000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query10.ans b/automation/sqlrepo/features/parallel/correctness/expected/query10.ans new file mode 100644 index 00000000..d993ffb2 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/expected/query10.ans @@ -0,0 +1,13 @@ +-- @description query10 for PXF parallel scan correctness - workers=0 fallback on parallel table + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 0; +SET +SELECT count(*) FROM pxf_parallel_enabled; + count +------- + 10000 +(1 row) diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query01.sql b/automation/sqlrepo/features/parallel/correctness/sql/query01.sql new file mode 100644 index 00000000..5149f3f8 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query01.sql @@ -0,0 +1,6 @@ +-- @description query01 for PXF parallel scan correctness - count with parallel + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query02.sql b/automation/sqlrepo/features/parallel/correctness/sql/query02.sql new file mode 100644 index 00000000..d9ec4dca --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query02.sql @@ -0,0 +1,6 @@ +-- @description query02 for PXF parallel scan correctness - sum aggregation + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT sum(id) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query03.sql b/automation/sqlrepo/features/parallel/correctness/sql/query03.sql new file mode 100644 index 00000000..536dccd8 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query03.sql @@ -0,0 +1,9 @@ +-- @description query03 for PXF parallel scan correctness - cross-check parallel vs non-parallel count + +SET optimizer = off; +SET enable_parallel = false; +SELECT count(*) AS non_parallel_count FROM pxf_parallel_disabled; + +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) AS parallel_count FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query04.sql b/automation/sqlrepo/features/parallel/correctness/sql/query04.sql new file mode 100644 index 00000000..251f9a4a --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query04.sql @@ -0,0 +1,6 @@ +-- @description query04 for PXF parallel scan correctness - ORDER BY with LIMIT + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT id, val FROM pxf_parallel_enabled ORDER BY id LIMIT 10; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query05.sql b/automation/sqlrepo/features/parallel/correctness/sql/query05.sql new file mode 100644 index 00000000..7379f88c --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query05.sql @@ -0,0 +1,6 @@ +-- @description query05 for PXF parallel scan correctness - MIN/MAX/AVG aggregates + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT min(id), max(id), avg(id) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query06.sql b/automation/sqlrepo/features/parallel/correctness/sql/query06.sql new file mode 100644 index 00000000..2be9bd6a --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query06.sql @@ -0,0 +1,6 @@ +-- @description query06 for PXF parallel scan correctness - WHERE pushdown with parallel + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) FROM pxf_parallel_enabled WHERE id > 5000; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query07.sql b/automation/sqlrepo/features/parallel/correctness/sql/query07.sql new file mode 100644 index 00000000..b7d7ebff --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query07.sql @@ -0,0 +1,6 @@ +-- @description query07 for PXF parallel scan correctness - column projection with WHERE + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT val FROM pxf_parallel_enabled WHERE id <= 5 ORDER BY val; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query08.sql b/automation/sqlrepo/features/parallel/correctness/sql/query08.sql new file mode 100644 index 00000000..136a6a33 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query08.sql @@ -0,0 +1,6 @@ +-- @description query08 for PXF parallel scan correctness - empty result edge case + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) FROM pxf_parallel_enabled WHERE id < 0; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query09.sql b/automation/sqlrepo/features/parallel/correctness/sql/query09.sql new file mode 100644 index 00000000..9743ae30 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query09.sql @@ -0,0 +1,6 @@ +-- @description query09 for PXF parallel scan correctness - COUNT DISTINCT no duplicates + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(DISTINCT id) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query10.sql b/automation/sqlrepo/features/parallel/correctness/sql/query10.sql new file mode 100644 index 00000000..17a89a38 --- /dev/null +++ b/automation/sqlrepo/features/parallel/correctness/sql/query10.sql @@ -0,0 +1,6 @@ +-- @description query10 for PXF parallel scan correctness - workers=0 fallback on parallel table + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 0; +SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/explain/expected/query01.ans b/automation/sqlrepo/features/parallel/explain/expected/query01.ans new file mode 100644 index 00000000..6fb6a328 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/expected/query01.ans @@ -0,0 +1,32 @@ +-- @description query01 for PXF parallel scan EXPLAIN - Gather node present when parallel enabled +-- start_matchsubs +-- +-- m/Workers Planned: \d+/ +-- s/Workers Planned: \d+/Workers Planned: N/ +-- +-- m/cost=\d+\.\d+\.\.\d+\.\d+/ +-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/ +-- +-- m/rows=\d+/ +-- s/rows=\d+/rows=NNN/ +-- +-- m/width=\d+/ +-- s/width=\d+/width=NN/ +-- +-- end_matchsubs +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +EXPLAIN SELECT count(*) FROM pxf_parallel_enabled; + QUERY PLAN +---------------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (cost=XXX rows=NNN width=NN) + -> Finalize Aggregate (cost=XXX rows=NNN width=NN) + -> Gather (cost=XXX rows=NNN width=NN) + Workers Planned: N + -> Partial Aggregate (cost=XXX rows=NNN width=NN) + -> Parallel Foreign Scan on pxf_parallel_enabled (cost=XXX rows=NNN width=NN) +(6 rows) diff --git a/automation/sqlrepo/features/parallel/explain/expected/query02.ans b/automation/sqlrepo/features/parallel/explain/expected/query02.ans new file mode 100644 index 00000000..fd7207d5 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/expected/query02.ans @@ -0,0 +1,24 @@ +-- @description query02 for PXF parallel scan EXPLAIN - no Gather node when parallel disabled +-- start_matchsubs +-- +-- m/cost=\d+\.\d+\.\.\d+\.\d+/ +-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/ +-- +-- m/rows=\d+/ +-- s/rows=\d+/rows=NNN/ +-- +-- m/width=\d+/ +-- s/width=\d+/width=NN/ +-- +-- end_matchsubs +SET optimizer = off; +SET +SET enable_parallel = false; +SET +EXPLAIN SELECT count(*) FROM pxf_parallel_disabled; + QUERY PLAN +---------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (cost=XXX rows=NNN width=NN) + -> Aggregate (cost=XXX rows=NNN width=NN) + -> Foreign Scan on pxf_parallel_disabled (cost=XXX rows=NNN width=NN) +(3 rows) diff --git a/automation/sqlrepo/features/parallel/explain/expected/query03.ans b/automation/sqlrepo/features/parallel/explain/expected/query03.ans new file mode 100644 index 00000000..cbc76f18 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/expected/query03.ans @@ -0,0 +1,84 @@ +-- @description query03 for PXF parallel scan EXPLAIN ANALYZE - Workers Launched present +-- start_matchsubs +-- +-- m/Workers Planned: \d+/ +-- s/Workers Planned: \d+/Workers Planned: N/ +-- +-- m/Workers Launched: \d+/ +-- s/Workers Launched: \d+/Workers Launched: N/ +-- +-- m/cost=\d+\.\d+\.\.\d+\.\d+/ +-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/ +-- +-- m/rows=\d+/ +-- s/rows=\d+/rows=NNN/ +-- +-- m/width=\d+/ +-- s/width=\d+/width=NN/ +-- +-- m/actual time=\d+\.\d+\.\.\d+\.\d+/ +-- s/actual time=\d+\.\d+\.\.\d+\.\d+/actual time=XXX/ +-- +-- m/loops=\d+/ +-- s/loops=\d+/loops=N/ +-- +-- m/Execution Time: \d+\.\d+ ms/ +-- s/Execution Time: \d+\.\d+ ms/Execution Time: XXX ms/ +-- +-- m/Planning Time: \d+\.\d+ ms/ +-- s/Planning Time: \d+\.\d+ ms/Planning Time: XXX ms/ +-- +-- m/Memory Usage: \d+kB/ +-- s/Memory Usage: \d+kB/Memory Usage: NNkB/ +-- +-- m/Memory: \d+kB/ +-- s/Memory: \d+kB/Memory: NNkB/ +-- +-- m/Buckets: \d+/ +-- s/Buckets: \d+/Buckets: NNN/ +-- +-- m/Batches: \d+/ +-- s/Batches: \d+/Batches: NNN/ +-- +-- m/Peak Memory Usage: \d+/ +-- s/Peak Memory Usage: \d+/Peak Memory Usage: NNN/ +-- +-- m/Avg Peak Memory \(per process\): \d+/ +-- s/Avg Peak Memory \(per process\): \d+/Avg Peak Memory (per process): NNN/ +-- +-- m/slice \d+; segments: \d+/ +-- s/slice \d+; segments: \d+/slice N; segments: N/ +-- +-- m/Optimizer: .*/ +-- s/Optimizer: .*/Optimizer: OPT/ +-- +-- end_matchsubs +-- start_matchignore +-- +-- m/^\s+Slice statistics:/ +-- m/^\s+\(slice\d+\)/ +-- m/^\s+Statement statistics:/ +-- m/^\s+Settings:/ +-- m/^\s+Query Identifier:/ +-- +-- end_matchignore +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +EXPLAIN ANALYZE SELECT count(*) FROM pxf_parallel_enabled; + QUERY PLAN +---------------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice N; segments: N) (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + -> Finalize Aggregate (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + -> Gather (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + Workers Planned: N + Workers Launched: N + -> Partial Aggregate (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + -> Parallel Foreign Scan on pxf_parallel_enabled (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N) + Optimizer: OPT + Planning Time: XXX ms + Execution Time: XXX ms +(10 rows) diff --git a/automation/sqlrepo/features/parallel/explain/expected/query04.ans b/automation/sqlrepo/features/parallel/explain/expected/query04.ans new file mode 100644 index 00000000..14bcc381 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/expected/query04.ans @@ -0,0 +1,26 @@ +-- @description query04 for PXF parallel scan EXPLAIN - no Gather with workers=0 on parallel table +-- start_matchsubs +-- +-- m/cost=\d+\.\d+\.\.\d+\.\d+/ +-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/ +-- +-- m/rows=\d+/ +-- s/rows=\d+/rows=NNN/ +-- +-- m/width=\d+/ +-- s/width=\d+/width=NN/ +-- +-- end_matchsubs +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 0; +SET +EXPLAIN SELECT count(*) FROM pxf_parallel_enabled; + QUERY PLAN +---------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (cost=XXX rows=NNN width=NN) + -> Aggregate (cost=XXX rows=NNN width=NN) + -> Foreign Scan on pxf_parallel_enabled (cost=XXX rows=NNN width=NN) +(3 rows) diff --git a/automation/sqlrepo/features/parallel/explain/sql/query01.sql b/automation/sqlrepo/features/parallel/explain/sql/query01.sql new file mode 100644 index 00000000..17eecc09 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/sql/query01.sql @@ -0,0 +1,6 @@ +-- @description query01 for PXF parallel scan EXPLAIN - Gather node present when parallel enabled + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +EXPLAIN SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/explain/sql/query02.sql b/automation/sqlrepo/features/parallel/explain/sql/query02.sql new file mode 100644 index 00000000..be769b18 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/sql/query02.sql @@ -0,0 +1,5 @@ +-- @description query02 for PXF parallel scan EXPLAIN - no Gather node when parallel disabled + +SET optimizer = off; +SET enable_parallel = false; +EXPLAIN SELECT count(*) FROM pxf_parallel_disabled; diff --git a/automation/sqlrepo/features/parallel/explain/sql/query03.sql b/automation/sqlrepo/features/parallel/explain/sql/query03.sql new file mode 100644 index 00000000..a69394d3 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/sql/query03.sql @@ -0,0 +1,6 @@ +-- @description query03 for PXF parallel scan EXPLAIN ANALYZE - Workers Launched present + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +EXPLAIN ANALYZE SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/explain/sql/query04.sql b/automation/sqlrepo/features/parallel/explain/sql/query04.sql new file mode 100644 index 00000000..1b783608 --- /dev/null +++ b/automation/sqlrepo/features/parallel/explain/sql/query04.sql @@ -0,0 +1,6 @@ +-- @description query04 for PXF parallel scan EXPLAIN - no Gather with workers=0 on parallel table + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 0; +EXPLAIN SELECT count(*) FROM pxf_parallel_enabled; diff --git a/automation/sqlrepo/features/parallel/rescan/expected/query01.ans b/automation/sqlrepo/features/parallel/rescan/expected/query01.ans new file mode 100644 index 00000000..8d00c268 --- /dev/null +++ b/automation/sqlrepo/features/parallel/rescan/expected/query01.ans @@ -0,0 +1,18 @@ +-- @description query01 for PXF parallel scan rescan - correlated subquery triggers rescan + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT t.id, (SELECT count(*) FROM pxf_parallel_enabled WHERE id <= t.id) AS running_count +FROM pxf_parallel_enabled t +WHERE t.id IN (1, 5000, 10000) +ORDER BY t.id; + id | running_count +-------+--------------- + 1 | 1 + 5000 | 5000 + 10000 | 10000 +(3 rows) diff --git a/automation/sqlrepo/features/parallel/rescan/sql/query01.sql b/automation/sqlrepo/features/parallel/rescan/sql/query01.sql new file mode 100644 index 00000000..19553ad2 --- /dev/null +++ b/automation/sqlrepo/features/parallel/rescan/sql/query01.sql @@ -0,0 +1,9 @@ +-- @description query01 for PXF parallel scan rescan - correlated subquery triggers rescan + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT t.id, (SELECT count(*) FROM pxf_parallel_enabled WHERE id <= t.id) AS running_count +FROM pxf_parallel_enabled t +WHERE t.id IN (1, 5000, 10000) +ORDER BY t.id; diff --git a/automation/sqlrepo/features/parallel/single_fragment/expected/query01.ans b/automation/sqlrepo/features/parallel/single_fragment/expected/query01.ans new file mode 100644 index 00000000..757c97f4 --- /dev/null +++ b/automation/sqlrepo/features/parallel/single_fragment/expected/query01.ans @@ -0,0 +1,13 @@ +-- @description query01 for PXF parallel scan single fragment - count with excess workers + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT count(*) FROM pxf_parallel_single_frag; + count +------- + 100 +(1 row) diff --git a/automation/sqlrepo/features/parallel/single_fragment/expected/query02.ans b/automation/sqlrepo/features/parallel/single_fragment/expected/query02.ans new file mode 100644 index 00000000..2590defc --- /dev/null +++ b/automation/sqlrepo/features/parallel/single_fragment/expected/query02.ans @@ -0,0 +1,13 @@ +-- @description query02 for PXF parallel scan single fragment - sum aggregation + +SET optimizer = off; +SET +SET enable_parallel = true; +SET +SET max_parallel_workers_per_gather = 4; +SET +SELECT sum(id) FROM pxf_parallel_single_frag; + sum +------ + 5050 +(1 row) diff --git a/automation/sqlrepo/features/parallel/single_fragment/sql/query01.sql b/automation/sqlrepo/features/parallel/single_fragment/sql/query01.sql new file mode 100644 index 00000000..7e7758ce --- /dev/null +++ b/automation/sqlrepo/features/parallel/single_fragment/sql/query01.sql @@ -0,0 +1,6 @@ +-- @description query01 for PXF parallel scan single fragment - count with excess workers + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT count(*) FROM pxf_parallel_single_frag; diff --git a/automation/sqlrepo/features/parallel/single_fragment/sql/query02.sql b/automation/sqlrepo/features/parallel/single_fragment/sql/query02.sql new file mode 100644 index 00000000..06243e54 --- /dev/null +++ b/automation/sqlrepo/features/parallel/single_fragment/sql/query02.sql @@ -0,0 +1,6 @@ +-- @description query02 for PXF parallel scan single fragment - sum aggregation + +SET optimizer = off; +SET enable_parallel = true; +SET max_parallel_workers_per_gather = 4; +SELECT sum(id) FROM pxf_parallel_single_frag; diff --git a/automation/src/test/java/org/apache/cloudberry/pxf/automation/features/parallel/ParallelScanTest.java b/automation/src/test/java/org/apache/cloudberry/pxf/automation/features/parallel/ParallelScanTest.java new file mode 100644 index 00000000..11815b6f --- /dev/null +++ b/automation/src/test/java/org/apache/cloudberry/pxf/automation/features/parallel/ParallelScanTest.java @@ -0,0 +1,106 @@ +package org.apache.cloudberry.pxf.automation.features.parallel; + +import annotations.WorksWithFDW; +import org.apache.cloudberry.pxf.automation.features.BaseFeature; +import org.apache.cloudberry.pxf.automation.structures.tables.basic.Table; +import org.apache.cloudberry.pxf.automation.structures.tables.utils.TableFactory; +import org.apache.cloudberry.pxf.automation.utils.system.ProtocolEnum; +import org.apache.cloudberry.pxf.automation.utils.system.ProtocolUtils; +import org.testng.annotations.Test; + +/** + * Tests for PG-style Gather parallel scan on PXF FDW foreign tables. + * + * Verifies: + * - Correctness: count, sum, ordering match between parallel and non-parallel scans + * - EXPLAIN: Gather node present when parallel enabled, absent when disabled + */ +@WorksWithFDW +public class ParallelScanTest extends BaseFeature { + + private static final String[] FIELDS = {"id integer", "val text"}; + private static final int FILES = 10; + private static final int ROWS_PER_FILE = 1000; + private static final int SINGLE_FRAG_ROWS = 100; + + private String hdfsDir; + private String singleFragDir; + + @Override + protected void beforeClass() throws Exception { + super.beforeClass(); + + hdfsDir = hdfs.getWorkingDirectory() + "/parallel_data"; + hdfs.createDirectory(hdfsDir); + + // Write 10 CSV files to HDFS (10 x 1000 rows = 10,000 total) + for (int fileIdx = 0; fileIdx < FILES; fileIdx++) { + Table dataTable = new Table("part_" + String.format("%02d", fileIdx), null); + int startId = fileIdx * ROWS_PER_FILE + 1; + int endId = (fileIdx + 1) * ROWS_PER_FILE; + for (int id = startId; id <= endId; id++) { + dataTable.addRow(new String[]{String.valueOf(id), "row_" + id}); + } + hdfs.writeTableToFile(hdfsDir + "/part_" + String.format("%02d", fileIdx) + ".csv", + dataTable, ","); + } + + ProtocolEnum protocol = ProtocolUtils.getProtocol(); + String tablePath = protocol.getExternalTablePath(hdfs.getBasePath(), hdfsDir); + + // Create foreign table with enable_parallel=true + exTable = TableFactory.getPxfReadableCSVTable("pxf_parallel_enabled", + FIELDS, tablePath, ","); + exTable.setHost(pxfHost); + exTable.setPort(pxfPort); + exTable.setUserParameters(new String[]{"enable_parallel=true"}); + gpdb.createTableAndVerify(exTable); + + // Create foreign table without enable_parallel (defaults to false) + exTable = TableFactory.getPxfReadableCSVTable("pxf_parallel_disabled", + FIELDS, tablePath, ","); + exTable.setHost(pxfHost); + exTable.setPort(pxfPort); + gpdb.createTableAndVerify(exTable); + + // Write single CSV file to HDFS (1 file, 100 rows) for single-fragment tests + singleFragDir = hdfs.getWorkingDirectory() + "/parallel_single_frag"; + hdfs.createDirectory(singleFragDir); + + Table singleTable = new Table("single_frag", null); + for (int id = 1; id <= SINGLE_FRAG_ROWS; id++) { + singleTable.addRow(new String[]{String.valueOf(id), "row_" + id}); + } + hdfs.writeTableToFile(singleFragDir + "/data.csv", singleTable, ","); + + String singleFragPath = protocol.getExternalTablePath(hdfs.getBasePath(), singleFragDir); + + // Create foreign table for single-fragment with enable_parallel=true + exTable = TableFactory.getPxfReadableCSVTable("pxf_parallel_single_frag", + FIELDS, singleFragPath, ","); + exTable.setHost(pxfHost); + exTable.setPort(pxfPort); + exTable.setUserParameters(new String[]{"enable_parallel=true"}); + gpdb.createTableAndVerify(exTable); + } + + @Test(groups = {"features", "gpdb", "fdw"}) + public void testParallelCorrectness() throws Exception { + runSqlTest("features/parallel/correctness"); + } + + @Test(groups = {"features", "gpdb", "fdw"}) + public void testParallelExplain() throws Exception { + runSqlTest("features/parallel/explain"); + } + + @Test(groups = {"features", "gpdb", "fdw"}) + public void testParallelSingleFragment() throws Exception { + runSqlTest("features/parallel/single_fragment"); + } + + @Test(groups = {"features", "gpdb", "fdw"}) + public void testParallelRescan() throws Exception { + runSqlTest("features/parallel/rescan"); + } +} diff --git a/fdw/pxf_bridge.c b/fdw/pxf_bridge.c index 567665ac..e55598f3 100644 --- a/fdw/pxf_bridge.c +++ b/fdw/pxf_bridge.c @@ -23,6 +23,9 @@ #include "cdb/cdbtm.h" #include "cdb/cdbvars.h" +#include "miscadmin.h" +#include "storage/lock.h" +#include "utils/builtins.h" /* helper function declarations */ static void BuildUriForRead(PxfFdwScanState *pxfsstate); @@ -182,3 +185,275 @@ FillBuffer(PxfFdwScanState *pxfsstate, char *start, int minlen, int maxlen) return ptr - start; } + +/* + * ============================================================================ + * Parallel Execution Support + * ============================================================================ + */ + +/* + * Build URI for fetching fragment list from PXF server + */ +static void +BuildUriForFragments(PxfFdwScanState *pxfsstate) +{ + PxfOptions *options = pxfsstate->options; + + resetStringInfo(&pxfsstate->uri); + appendStringInfo(&pxfsstate->uri, "http://%s:%d/%s/fragments", + options->pxf_host, options->pxf_port, PXF_SERVICE_PREFIX); + elog(DEBUG2, "pxf_fdw: uri %s for fragments", pxfsstate->uri.data); +} + +/* + * PxfBridgeFetchFragments + * Fetch the list of fragments from PXF server. + * + * This function is called by the leader process to get the complete list + * of fragments that need to be processed. The fragments are stored in + * pxfsstate->fragments array. + * + * Returns the number of fragments fetched. + * + * Note: Currently this is a placeholder implementation. The actual + * implementation will need to: + * 1. Call the PXF /fragments endpoint + * 2. Parse the JSON response + * 3. Store fragment metadata in pxfsstate->fragments + */ +int +PxfBridgeFetchFragments(PxfFdwScanState *pxfsstate) +{ + CHURL_HEADERS headers; + CHURL_HANDLE handle; + StringInfoData response; + char buffer[8192]; + size_t bytes_read; + int num_fragments = 0; + + elog(DEBUG3, "pxf_fdw: PxfBridgeFetchFragments starting"); + + /* Initialize headers */ + headers = churl_headers_init(); + + /* Build URI for fragments endpoint */ + BuildUriForFragments(pxfsstate); + + /* Build HTTP headers */ + BuildHttpHeaders(headers, + pxfsstate->options, + pxfsstate->relation, + pxfsstate->filter_str, + pxfsstate->retrieved_attrs, + pxfsstate->projectionInfo); + + /* Add header to request JSON response */ + churl_headers_append(headers, "Accept", "application/json"); + + /* Initialize download */ + handle = churl_init_download(pxfsstate->uri.data, headers); + + /* Read the response */ + initStringInfo(&response); + while ((bytes_read = churl_read(handle, buffer, sizeof(buffer) - 1)) > 0) + { + buffer[bytes_read] = '\0'; + appendStringInfoString(&response, buffer); + } + + churl_read_check_connectivity(handle); + + elog(DEBUG3, "pxf_fdw: fragments response length=%d", response.len); + + /* + * TODO: Parse JSON response to extract fragment metadata. + * For now, we use a simplified approach where the server returns + * the number of fragments as a simple count. + * + * Expected JSON format: + * { + * "PXFFragments": [ + * {"index": 0, "sourceName": "...", "metadata": "..."}, + * {"index": 1, "sourceName": "...", "metadata": "..."}, + * ... + * ] + * } + * + * For the initial implementation, we'll estimate based on response. + * A proper implementation would use a JSON parser. + */ + if (response.len > 0) + { + /* Simple heuristic: count occurrences of "index" */ + char *ptr = response.data; + while ((ptr = strstr(ptr, "\"index\"")) != NULL) + { + num_fragments++; + ptr++; + } + + /* Allocate fragment array if we found any */ + if (num_fragments > 0) + { + pxfsstate->fragments = (PxfFragmentData *) + palloc0(sizeof(PxfFragmentData) * num_fragments); + pxfsstate->num_fragments = num_fragments; + + /* TODO: Actually parse and populate fragment data */ + elog(DEBUG3, "pxf_fdw: found %d fragments", num_fragments); + } + } + + /* Cleanup */ + churl_cleanup(handle, false); + churl_headers_cleanup(headers); + pfree(response.data); + + return num_fragments; +} + +/* + * PxfBridgeGetNextFragment + * Get the next fragment index for this worker to process. + * + * This function atomically increments the next_fragment counter in the + * shared parallel state and returns the fragment index for this worker + * to process. + * + * Returns -1 if all fragments have been assigned. + */ +int +PxfBridgeGetNextFragment(PxfParallelScanState *pstate) +{ + int logical_idx; + + if (pstate == NULL) + return -1; + + SpinLockAcquire(&pstate->mutex); + + if (pstate->next_fragment >= pstate->total_fragments) + { + SpinLockRelease(&pstate->mutex); + return -1; + } + logical_idx = pstate->next_fragment++; + + SpinLockRelease(&pstate->mutex); + + /* Map logical → actual: the K-th fragment for segment S is at + * actual_index = S + K * seg_count (round-robin assignment) */ + { + int seg_id = PXF_SEGMENT_ID; + int seg_count = PXF_SEGMENT_COUNT; + int actual_idx = seg_id + logical_idx * seg_count; + + elog(DEBUG3, "pxf_fdw: segment %d: GetNextFragment logical=%d actual=%d", + seg_id, logical_idx, actual_idx); + + return actual_idx; + } +} + +/* + * PxfBridgeImportStartFragment + * Start import for a specific fragment in parallel mode. + * + * This is similar to PxfBridgeImportStart but includes the fragment + * index in the request headers so the PXF server knows which specific + * fragment to return data for. + */ +void +PxfBridgeImportStartFragment(PxfFdwScanState *pxfsstate, int fragmentIndex) +{ + char fragment_idx_str[16]; + + elog(DEBUG3, "pxf_fdw: PxfBridgeImportStartFragment starting for fragment %d", + fragmentIndex); + + pxfsstate->churl_headers = churl_headers_init(); + + BuildUriForRead(pxfsstate); + BuildHttpHeaders(pxfsstate->churl_headers, + pxfsstate->options, + pxfsstate->relation, + pxfsstate->filter_str, + pxfsstate->retrieved_attrs, + pxfsstate->projectionInfo); + + /* Add fragment index header for parallel mode */ + pg_ltoa(fragmentIndex, fragment_idx_str); + churl_headers_append(pxfsstate->churl_headers, "X-GP-FRAGMENT-INDEX", fragment_idx_str); + + pxfsstate->churl_handle = churl_init_download(pxfsstate->uri.data, pxfsstate->churl_headers); + + /* read some bytes to make sure the connection is established */ + churl_read_check_connectivity(pxfsstate->churl_handle); + + /* Update current fragment tracking */ + pxfsstate->current_fragment = fragmentIndex; +} + +/* + * ============================================================================ + * Cloudberry Gang-Parallel Support (Virtual Segment ID) + * + * In Cloudberry/Greenplum, parallel execution uses "gang expansion" where + * multiple processes share the same physical segment ID. PostgreSQL's DSM + * callbacks (InitializeDSMForeignScan, InitializeWorkerForeignScan) are + * NOT invoked in this model. + * + * Instead of fragment-by-fragment coordination, we use "virtual segment IDs": + * each gang worker sends a unique virtual segment ID to PXF, so PXF's + * existing round-robin fragment distribution splits the data among workers + * automatically — no PXF server changes needed. + * + * Example: 3 physical segments × 4 workers = 12 virtual segments. + * Worker i on physical segment S sends virtual_seg_id = S + i * seg_count, + * with virtual_seg_count = seg_count * workers. + * ============================================================================ + */ + +/* + * PxfBridgeImportStartVirtual + * Start import with virtual segment ID for Cloudberry gang-parallel mode. + * + * Same as PxfBridgeImportStart, but after building the standard HTTP headers, + * overrides X-GP-SEGMENT-ID and X-GP-SEGMENT-COUNT with the virtual values. + * This makes PXF's round-robin assign a unique subset of fragments to each + * gang worker, eliminating data duplication. + */ +void +PxfBridgeImportStartVirtual(PxfFdwScanState *pxfsstate, + int virtualSegId, int virtualSegCount) +{ + char seg_id_str[16]; + char seg_count_str[16]; + + pxfsstate->churl_headers = churl_headers_init(); + + BuildUriForRead(pxfsstate); + BuildHttpHeaders(pxfsstate->churl_headers, + pxfsstate->options, + pxfsstate->relation, + pxfsstate->filter_str, + pxfsstate->retrieved_attrs, + pxfsstate->projectionInfo); + + /* Override physical segment ID/count with virtual values */ + pg_ltoa(virtualSegId, seg_id_str); + pg_ltoa(virtualSegCount, seg_count_str); + churl_headers_override(pxfsstate->churl_headers, "X-GP-SEGMENT-ID", seg_id_str); + churl_headers_override(pxfsstate->churl_headers, "X-GP-SEGMENT-COUNT", seg_count_str); + + elog(DEBUG3, "pxf_fdw: PxfBridgeImportStartVirtual physical_seg=%d " + "virtual_seg_id=%d virtual_seg_count=%d", + PXF_SEGMENT_ID, virtualSegId, virtualSegCount); + + pxfsstate->churl_handle = churl_init_download(pxfsstate->uri.data, + pxfsstate->churl_headers); + + /* read some bytes to make sure the connection is established */ + churl_read_check_connectivity(pxfsstate->churl_handle); +} diff --git a/fdw/pxf_bridge.h b/fdw/pxf_bridge.h index e5e53610..8087c682 100644 --- a/fdw/pxf_bridge.h +++ b/fdw/pxf_bridge.h @@ -30,10 +30,35 @@ #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "storage/spin.h" #define PXF_SEGMENT_ID GpIdentity.segindex #define PXF_SEGMENT_COUNT getgpsegmentCount() +/* + * Fragment metadata for parallel execution. + * Stored in local memory, not in shared memory. + */ +typedef struct PxfFragmentData +{ + char *source_name; /* fragment source name (e.g. file path) */ + int index; /* fragment index */ + char *metadata; /* fragment metadata (base64 encoded) */ + char *profile; /* optional profile override */ +} PxfFragmentData; + +/* + * Shared state for parallel foreign scan. + * This structure is stored in DSM (dynamic shared memory). + */ +typedef struct PxfParallelScanState +{ + slock_t mutex; /* mutex for accessing shared state */ + int total_fragments; /* total number of fragments */ + int next_fragment; /* next fragment index to be assigned */ + bool finished; /* true if all fragments have been processed */ +} PxfParallelScanState; + /* * Execution state of a foreign scan using pxf_fdw. */ @@ -49,6 +74,26 @@ typedef struct PxfFdwScanState PxfOptions *options; CopyFromState cstate; ProjectionInfo *projectionInfo; + + /* Parallel execution state (PG DSM-based) */ + bool is_parallel; /* true if running in DSM parallel mode */ + PxfParallelScanState *pstate; /* pointer to shared state in DSM */ + PxfFragmentData *fragments; /* array of fragment metadata */ + int num_fragments; /* total number of fragments */ + int current_fragment; /* current fragment being processed */ + + /* + * Cloudberry gang-parallel state (virtual segment ID based). + * Used when Cloudberry's gang expansion creates multiple processes per + * segment but PG's DSM callbacks are not invoked. Each gang worker gets + * a unique virtual segment ID so PXF's round-robin distributes fragments + * evenly without data duplication. + */ + bool gang_parallel; /* true when using virtual segment IDs */ + bool plan_parallel_aware; /* plan node's parallel_aware flag */ + int worker_index; /* this worker's index within the segment gang */ + int virtual_seg_id; /* virtual segment ID sent to PXF */ + int virtual_seg_count; /* virtual segment count sent to PXF */ } PxfFdwScanState; /* @@ -80,4 +125,21 @@ int PxfBridgeRead(void *outbuf, int minlen, int maxlen, void *extra); /* Writes data from the given buffer of a given size to the PXF server */ int PxfBridgeWrite(PxfFdwModifyState *context, char *databuf, int datalen); +/* Parallel execution support */ + +/* Fetch fragment list from PXF server */ +int PxfBridgeFetchFragments(PxfFdwScanState *pxfsstate); + +/* Get the next fragment index for this worker (thread-safe) */ +int PxfBridgeGetNextFragment(PxfParallelScanState *pstate); + +/* Start import for a specific fragment in parallel mode */ +void PxfBridgeImportStartFragment(PxfFdwScanState *pxfsstate, int fragmentIndex); + +/* Cloudberry gang-parallel support (virtual segment ID based) */ + +/* Start import with virtual segment ID for gang-parallel mode */ +void PxfBridgeImportStartVirtual(PxfFdwScanState *pxfsstate, + int virtualSegId, int virtualSegCount); + #endif /* _PXFBRIDGE_H */ diff --git a/fdw/pxf_fdw.c b/fdw/pxf_fdw.c index 645a1abc..019ec14c 100644 --- a/fdw/pxf_fdw.c +++ b/fdw/pxf_fdw.c @@ -32,11 +32,18 @@ #include "parser/parsetree.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/guc.h" +#include "access/parallel.h" +#include "miscadmin.h" +#include "storage/lock.h" +#include "storage/shm_toc.h" PG_MODULE_MAGIC; #define DEFAULT_PXF_FDW_STARTUP_COST 50000 +extern int max_parallel_workers_per_gather; + /* * Error token embedded in the data sent by PXF as part of an error row */ @@ -87,12 +94,26 @@ static int pxfIsForeignRelUpdatable(Relation rel); static PxfFdwModifyState *InitForeignModify(Relation relation); static void FinishForeignModify(PxfFdwModifyState *pxfmstate); static void InitCopyState(PxfFdwScanState *pxfsstate); +static void CleanupCurrentFragment(PxfFdwScanState *pxfsstate); +static void InitCopyStateForFragment(PxfFdwScanState *pxfsstate, int fragmentIndex); static void InitCopyStateForModify(PxfFdwModifyState *pxfmstate); static CopyToState BeginCopyToModify(Relation forrel, List *options); static void EndCopyModify(CopyToState cstate); static void PxfBeginScanErrorCallback(void *arg); static void PxfCopyFromErrorCallback(void *arg); +/* Parallel scan support */ +static bool pxfIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte); +static Size pxfEstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt); +static void pxfInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt, void *coordinate); +static void pxfReInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt, void *coordinate); +static void pxfInitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc, void *coordinate); +static void pxfShutdownForeignScan(ForeignScanState *node); + +/* Cloudberry gang-parallel support */ +static void InitGangParallelState(PxfFdwScanState *pxfsstate); +static void InitCopyStateVirtual(PxfFdwScanState *pxfsstate); + /* * Foreign-data wrapper handler functions: * returns a struct with pointers to the @@ -148,6 +169,17 @@ pxf_fdw_handler(PG_FUNCTION_ARGS) fdw_routine->EndForeignModify = pxfEndForeignModify; fdw_routine->IsForeignRelUpdatable = pxfIsForeignRelUpdatable; + /* + * Parallel scan support. + * These callbacks enable parallel foreign scans when enable_parallel option is set. + */ + fdw_routine->IsForeignScanParallelSafe = pxfIsForeignScanParallelSafe; + fdw_routine->EstimateDSMForeignScan = pxfEstimateDSMForeignScan; + fdw_routine->InitializeDSMForeignScan = pxfInitializeDSMForeignScan; + fdw_routine->ReInitializeDSMForeignScan = pxfReInitializeDSMForeignScan; + fdw_routine->InitializeWorkerForeignScan = pxfInitializeWorkerForeignScan; + fdw_routine->ShutdownForeignScan = pxfShutdownForeignScan; + PG_RETURN_POINTER(fdw_routine); } @@ -248,29 +280,47 @@ pxfGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid) { - ForeignPath *path = NULL; - int total_cost = DEFAULT_PXF_FDW_STARTUP_COST; PxfFdwRelationInfo *fpinfo = (PxfFdwRelationInfo *) baserel->fdw_private; - + PxfOptions *options = PxfGetOptions(foreigntableid); + Cost startup = DEFAULT_PXF_FDW_STARTUP_COST; + Cost run_cost = 1000; /* data transfer cost estimate */ + Cost total = startup + run_cost; elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths starts on segment: %d", PXF_SEGMENT_ID); - path = create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ - baserel->rows, - DEFAULT_PXF_FDW_STARTUP_COST, - total_cost, - NIL, /* no pathkeys */ - NULL, /* no outer rel either */ - NULL, /* no extra plan */ - fpinfo->retrieved_attrs); - + /* Path 1: non-parallel (always, as fallback) */ + add_path(baserel, + (Path *) create_foreignscan_path(root, baserel, NULL, + baserel->rows, startup, total, + NIL, NULL, NULL, + fpinfo->retrieved_attrs), + root); + /* Path 2: parallel partial (only if enabled and planner allows) */ + if (options->enable_parallel && baserel->consider_parallel) + { + int workers = max_parallel_workers_per_gather; - /* - * Create a ForeignPath node and add it as only possible path. - */ - add_path(baserel, (Path *) path, root); + if (workers > 0) + { + ForeignPath *pp; + + pp = create_foreignscan_path(root, baserel, NULL, + baserel->rows / workers, /* per-worker rows */ + startup, + startup + run_cost / workers, /* per-worker cost */ + NIL, NULL, NULL, + fpinfo->retrieved_attrs); + pp->path.parallel_safe = true; + pp->path.parallel_aware = true; + pp->path.parallel_workers = workers; + pp->path.locus.parallel_workers = workers; + + add_partial_path(baserel, (Path *) pp); + + elog(DEBUG3, "pxf_fdw: parallel partial path added, workers=%d", workers); + } + } elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths ends on segment: %d", PXF_SEGMENT_ID); } @@ -382,7 +432,7 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags) * Save state in node->fdw_state. We must save enough information to call * BeginCopyFrom() again. */ - pxfsstate = (PxfFdwScanState *) palloc(sizeof(PxfFdwScanState)); + pxfsstate = (PxfFdwScanState *) palloc0(sizeof(PxfFdwScanState)); initStringInfo(&pxfsstate->uri); pxfsstate->filter_str = filter_str; @@ -392,18 +442,42 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags) pxfsstate->retrieved_attrs = retrieved_attrs; pxfsstate->projectionInfo = node->ss.ps.ps_ProjInfo; - /* Set up callback to identify error foreign relation. */ - ErrorContextCallback errcallback; - errcallback.callback = PxfBeginScanErrorCallback; - errcallback.arg = (void *) pxfsstate; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; + /* Initialize parallel scan fields */ + pxfsstate->is_parallel = false; + pxfsstate->pstate = NULL; + pxfsstate->fragments = NULL; + pxfsstate->num_fragments = 0; + pxfsstate->current_fragment = -1; + + /* Initialize gang-parallel fields */ + pxfsstate->gang_parallel = false; + pxfsstate->worker_index = -1; + pxfsstate->virtual_seg_id = -1; + pxfsstate->virtual_seg_count = -1; + + /* Store plan's parallel_aware flag for gang-parallel detection */ + pxfsstate->plan_parallel_aware = foreignScan->scan.plan.parallel_aware; - InitCopyState(pxfsstate); node->fdw_state = (void *) pxfsstate; - /* Restore the previous error callback */ - error_context_stack = errcallback.previous; + /* + * In parallel mode, defer connection setup to IterateForeignScan where + * fragments are assigned. In non-parallel mode, initialize immediately. + */ + if (!options->enable_parallel) + { + /* Set up callback to identify error foreign relation. */ + ErrorContextCallback errcallback; + errcallback.callback = PxfBeginScanErrorCallback; + errcallback.arg = (void *) pxfsstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + InitCopyState(pxfsstate); + + /* Restore the previous error callback */ + error_context_stack = errcallback.previous; + } elog(DEBUG5, "pxf_fdw: pxfBeginForeignScan ends on segment: %d", PXF_SEGMENT_ID); } @@ -426,54 +500,148 @@ pxfIterateForeignScan(ForeignScanState *node) ErrorContextCallback errcallback; bool found; - /* Set up callback to identify error line number. */ - errcallback.callback = PxfCopyFromErrorCallback; - errcallback.arg = (void *) pxfsstate; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* - * The protocol for loading a virtual tuple into a slot is first - * ExecClearTuple, then fill the values/isnull arrays, then - * ExecStoreVirtualTuple. If we don't find another row in the file, we - * just skip the last step, leaving the slot empty as required. - * - * We can pass ExprContext = NULL because we read all columns from the - * file, so no need to evaluate default expressions. - * - * We can also pass tupleOid = NULL because we don't allow oids for - * foreign tables. - */ ExecClearTuple(slot); - found = NextCopyFrom(pxfsstate->cstate, - NULL, - slot->tts_values, - slot->tts_isnull); - - if (found) + if (pxfsstate->is_parallel) + { + /* + * PG DSM-based parallel mode: iterate over fragments one at a time. + * Each worker claims fragments via shared spinlock counter. + */ + for (;;) + { + /* If no active connection, claim the next fragment */ + if (pxfsstate->cstate == NULL) + { + MemoryContext oldcxt; + int fragmentIndex; + + fragmentIndex = PxfBridgeGetNextFragment(pxfsstate->pstate); + if (fragmentIndex < 0) + return slot; /* EOF — no more fragments */ + + oldcxt = MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt); + InitCopyStateForFragment(pxfsstate, fragmentIndex); + MemoryContextSwitchTo(oldcxt); + } + + pxfsstate->cstate->cur_attname = NULL; + pxfsstate->cstate->cur_attval = NULL; + + errcallback.callback = PxfCopyFromErrorCallback; + errcallback.arg = (void *) pxfsstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + found = NextCopyFrom(pxfsstate->cstate, + NULL, + slot->tts_values, + slot->tts_isnull); + + error_context_stack = errcallback.previous; + + if (found) + { + if (pxfsstate->cstate->cdbsreh) + pxfsstate->cstate->cdbsreh->processed++; + + ExecStoreVirtualTuple(slot); + return slot; + } + + /* Current fragment exhausted, clean up and try next */ + CleanupCurrentFragment(pxfsstate); + } + } + else { - if (pxfsstate->cstate->cdbsreh) + /* + * Non-parallel or Cloudberry gang-parallel mode. + * + * Both use a single PXF connection — the difference is only in + * the HTTP headers: + * - Non-parallel: physical segment ID / count + * - Gang-parallel: virtual segment ID / count (splits work among + * gang workers so each reads a unique subset of fragments) + */ + if (pxfsstate->cstate == NULL) { + MemoryContext oldcxt; + ErrorContextCallback begincb; + /* - * If NextCopyFrom failed, the processed row count will have - * already been updated, but we need to update it in a successful - * case. - * - * GPDB_91_MERGE_FIXME: this is almost certainly not the right - * place for this, but row counts are currently scattered all over - * the place. Consolidate. + * Detect Cloudberry gang-parallel case: enable_parallel is set + * and the planner chose a parallel path, but PG DSM callbacks + * were not invoked (Cloudberry gang expansion). */ - pxfsstate->cstate->cdbsreh->processed++; + if (!pxfsstate->gang_parallel && + pxfsstate->options->enable_parallel && + pxfsstate->plan_parallel_aware) + { + elog(DEBUG1, "pxf_fdw: segment %d activating gang-parallel mode " + "(virtual segment IDs)", + PXF_SEGMENT_ID); + + oldcxt = MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt); + InitGangParallelState(pxfsstate); + MemoryContextSwitchTo(oldcxt); + } + + begincb.callback = PxfBeginScanErrorCallback; + begincb.arg = (void *) pxfsstate; + begincb.previous = error_context_stack; + error_context_stack = &begincb; + + oldcxt = MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt); + + if (pxfsstate->gang_parallel) + { + if (pxfsstate->virtual_seg_id < 0) + { + /* + * This gang worker could not claim a worker_index + * (more processes than expected). Return empty + * results to avoid data duplication. + */ + MemoryContextSwitchTo(oldcxt); + error_context_stack = begincb.previous; + return slot; + } + InitCopyStateVirtual(pxfsstate); + } + else + InitCopyState(pxfsstate); + + MemoryContextSwitchTo(oldcxt); + error_context_stack = begincb.previous; } - ExecStoreVirtualTuple(slot); - } + /* Reset error-context fields before each read */ + pxfsstate->cstate->cur_attname = NULL; + pxfsstate->cstate->cur_attval = NULL; - /* Remove error callback. */ - error_context_stack = errcallback.previous; + errcallback.callback = PxfCopyFromErrorCallback; + errcallback.arg = (void *) pxfsstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; - return slot; + found = NextCopyFrom(pxfsstate->cstate, + NULL, + slot->tts_values, + slot->tts_isnull); + + if (found) + { + if (pxfsstate->cstate->cdbsreh) + pxfsstate->cstate->cdbsreh->processed++; + + ExecStoreVirtualTuple(slot); + } + + error_context_stack = errcallback.previous; + + return slot; + } } /* @@ -487,8 +655,29 @@ pxfReScanForeignScan(ForeignScanState *node) PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state; - EndCopyFrom(pxfsstate->cstate); - InitCopyState(pxfsstate); + if (pxfsstate->is_parallel) + { + /* DSM parallel: tear down current fragment connection if any. + * Shared state reset is handled by ReInitializeDSM. */ + CleanupCurrentFragment(pxfsstate); + } + else if (pxfsstate->gang_parallel) + { + /* Gang-parallel: single connection with virtual segment ID. + * If virtual_seg_id < 0, this worker produces no data — nothing to do. */ + if (pxfsstate->virtual_seg_id < 0) + return; + if (pxfsstate->cstate != NULL) + EndCopyFrom(pxfsstate->cstate); + InitCopyStateVirtual(pxfsstate); + } + else + { + /* Non-parallel: original code path */ + if (pxfsstate->cstate != NULL) + EndCopyFrom(pxfsstate->cstate); + InitCopyState(pxfsstate); + } elog(DEBUG5, "pxf_fdw: pxfReScanForeignScan ends on segment: %d", PXF_SEGMENT_ID); } @@ -502,19 +691,86 @@ pxfEndForeignScan(ForeignScanState *node) { elog(DEBUG5, "pxf_fdw: pxfEndForeignScan starts on segment: %d", PXF_SEGMENT_ID); - ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan; PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state; - /* Release resources */ - if (foreignScan->fdw_private) - { - elog(DEBUG5, "Freeing fdw_private"); - pfree(foreignScan->fdw_private); - } - /* if pxfsstate is NULL, we are in EXPLAIN; nothing to do */ if (pxfsstate) - EndCopyFrom(pxfsstate->cstate); + { + if (pxfsstate->is_parallel) + { + /* DSM parallel: tear down any active fragment connection */ + CleanupCurrentFragment(pxfsstate); + + /* Free fragment array */ + if (pxfsstate->fragments != NULL) + { + pfree(pxfsstate->fragments); + pxfsstate->fragments = NULL; + } + } + else if (pxfsstate->gang_parallel) + { + /* Gang-parallel: same cleanup as non-parallel (single connection), + * plus release the worker_index advisory lock. */ + if (pxfsstate->churl_handle != NULL) + { + churl_cleanup(pxfsstate->churl_handle, false); + pxfsstate->churl_handle = NULL; + } + + if (pxfsstate->churl_headers != NULL) + { + churl_headers_cleanup(pxfsstate->churl_headers); + pxfsstate->churl_headers = NULL; + } + + if (pxfsstate->cstate != NULL) + { + EndCopyFrom(pxfsstate->cstate); + pxfsstate->cstate = NULL; + } + + /* Release advisory lock for worker_index */ + if (pxfsstate->worker_index >= 0) + { + LOCKTAG tag; + + SET_LOCKTAG_ADVISORY(tag, + MyDatabaseId, + (uint32) gp_session_id, + (uint32) PXF_SEGMENT_ID, + (uint32) pxfsstate->worker_index); + LockRelease(&tag, ExclusiveLock, true); + } + } + else + { + /* Non-parallel: clean up churl resources and CopyFromState */ + if (pxfsstate->churl_handle != NULL) + { + churl_cleanup(pxfsstate->churl_handle, false); + pxfsstate->churl_handle = NULL; + } + + if (pxfsstate->churl_headers != NULL) + { + churl_headers_cleanup(pxfsstate->churl_headers); + pxfsstate->churl_headers = NULL; + } + + if (pxfsstate->cstate != NULL) + { + EndCopyFrom(pxfsstate->cstate); + pxfsstate->cstate = NULL; + } + } + + /* Free the URI buffer */ + pfree(pxfsstate->uri.data); + + pfree(pxfsstate); + node->fdw_state = NULL; + } elog(DEBUG5, "pxf_fdw: pxfEndForeignScan ends on segment: %d", PXF_SEGMENT_ID); } @@ -777,6 +1033,264 @@ InitCopyState(PxfFdwScanState *pxfsstate) pxfsstate->cstate = cstate; } +/* + * InitCopyStateVirtual + * Gang-parallel equivalent of InitCopyState(). + * + * Identical to InitCopyState except it calls PxfBridgeImportStartVirtual() + * with the pre-computed virtual_seg_id and virtual_seg_count so that PXF + * distributes fragments to this gang worker using the virtual segment scheme. + */ +static void +InitCopyStateVirtual(PxfFdwScanState *pxfsstate) +{ + CopyFromState cstate; + + PxfBridgeImportStartVirtual(pxfsstate, + pxfsstate->virtual_seg_id, + pxfsstate->virtual_seg_count); + + cstate = BeginCopyFrom( + NULL, + pxfsstate->relation, + NULL, + NULL, + false, /* is_program */ + &PxfBridgeRead, /* data_source_cb */ + pxfsstate, /* data_source_cb_extra */ + NIL, /* attnamelist */ + pxfsstate->options->copy_options /* copy options */ + ); + + if (pxfsstate->options->reject_limit == -1) + { + cstate->cdbsreh = NULL; + cstate->errMode = ALL_OR_NOTHING; + } + else + { + cstate->errMode = SREH_IGNORE; + + if (pxfsstate->options->log_errors) + cstate->errMode = SREH_LOG; + + cstate->cdbsreh = makeCdbSreh(pxfsstate->options->reject_limit, + pxfsstate->options->is_reject_limit_rows, + pxfsstate->options->resource, + (char *) cstate->cur_relname, + pxfsstate->options->log_errors ? LOG_ERRORS_ENABLE : LOG_ERRORS_DISABLE); + + cstate->cdbsreh->relid = RelationGetRelid(pxfsstate->relation); + } + + cstate->fe_msgbuf = makeStringInfo(); + + cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, + "PxfFdwMemCxt", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + pxfsstate->cstate = cstate; +} + +/* + * CleanupCurrentFragment + * Safely tears down the churl connection and CopyFromState for the + * current fragment in parallel mode. Handles all NULL cases gracefully. + */ +static void +CleanupCurrentFragment(PxfFdwScanState *pxfsstate) +{ + if (pxfsstate == NULL) + return; + + if (pxfsstate->churl_handle != NULL) + { + churl_cleanup(pxfsstate->churl_handle, false); + pxfsstate->churl_handle = NULL; + } + + if (pxfsstate->churl_headers != NULL) + { + churl_headers_cleanup(pxfsstate->churl_headers); + pxfsstate->churl_headers = NULL; + } + + if (pxfsstate->cstate != NULL) + { + EndCopyFrom(pxfsstate->cstate); + pxfsstate->cstate = NULL; + } + + pxfsstate->current_fragment = -1; +} + +/* + * InitGangParallelState + * Initialize gang-parallel state for Cloudberry's gang expansion model. + * + * Called on first IterateForeignScan when we detect that enable_parallel=true + * and the plan is parallel_aware, but PG's DSM callbacks were not invoked + * (which is the case in Cloudberry's gang-based parallelism). + * + * Uses advisory locks to claim a unique worker_index (0..workers-1) within + * the same physical segment, then computes virtual segment IDs: + * virtual_seg_id = physical_seg_id + worker_index * physical_seg_count + * virtual_seg_count = physical_seg_count * num_workers + * + * PXF's existing round-robin (fragment % seg_count == seg_id) then + * automatically distributes a unique subset of fragments to each gang worker, + * eliminating data duplication without any PXF server changes. + */ +static void +InitGangParallelState(PxfFdwScanState *pxfsstate) +{ + int seg_id = PXF_SEGMENT_ID; + int seg_count = PXF_SEGMENT_COUNT; + int num_workers = max_parallel_workers_per_gather; + int worker_index = -1; + int i; + + elog(DEBUG3, "pxf_fdw: InitGangParallelState segment %d " + "(total segments=%d, max_parallel_workers=%d)", + seg_id, seg_count, num_workers); + + if (num_workers <= 0) + num_workers = 1; + + /* + * Claim a unique worker_index using advisory locks. + * + * Lock key: (gp_session_id, PXF_SEGMENT_ID, worker_index) + * All gang workers on the same physical segment in the same session + * compete for indices 0..num_workers-1. pg_try_advisory_lock returns + * true if the lock was granted (no other worker claimed this index). + */ + for (i = 0; i < num_workers; i++) + { + LOCKTAG tag; + LockAcquireResult res; + + SET_LOCKTAG_ADVISORY(tag, + MyDatabaseId, + (uint32) gp_session_id, + (uint32) seg_id, + (uint32) i); + + res = LockAcquire(&tag, ExclusiveLock, true, true); + if (res != LOCKACQUIRE_NOT_AVAIL) + { + worker_index = i; + break; + } + } + + /* + * Mark gang_parallel regardless of whether we claimed an index. + * This prevents falling back to non-parallel mode (which would read + * duplicate data using physical segment IDs). + */ + pxfsstate->gang_parallel = true; + + if (worker_index < 0) + { + /* + * Could not claim any worker_index — more gang members than + * expected (e.g., leader + workers vs workers only). + * Mark virtual_seg_id = -1 so IterateForeignScan produces + * zero rows for this process, avoiding data duplication. + */ + pxfsstate->worker_index = -1; + pxfsstate->virtual_seg_id = -1; + pxfsstate->virtual_seg_count = -1; + + elog(DEBUG1, "pxf_fdw: segment %d gang-parallel: no worker_index " + "available (workers=%d), will produce zero rows", + seg_id, num_workers); + return; + } + + pxfsstate->worker_index = worker_index; + pxfsstate->virtual_seg_id = seg_id + worker_index * seg_count; + pxfsstate->virtual_seg_count = seg_count * num_workers; + + elog(DEBUG1, "pxf_fdw: segment %d gang-parallel: worker_index=%d " + "virtual_seg_id=%d virtual_seg_count=%d", + seg_id, worker_index, + pxfsstate->virtual_seg_id, pxfsstate->virtual_seg_count); +} + +/* + * InitCopyStateForFragment + * Parallel equivalent of InitCopyState(). Opens a connection to PXF + * for a specific fragment and sets up BeginCopyFrom + SREH + rowcontext. + */ +static void +InitCopyStateForFragment(PxfFdwScanState *pxfsstate, int fragmentIndex) +{ + CopyFromState cstate; + + PxfBridgeImportStartFragment(pxfsstate, fragmentIndex); + + /* + * Create CopyState from FDW options. We always acquire all columns, so + * as to match the expected ScanTupleSlot signature. + */ + cstate = BeginCopyFrom( + NULL, + pxfsstate->relation, + NULL, + NULL, + false, /* is_program */ + &PxfBridgeRead, /* data_source_cb */ + pxfsstate, /* data_source_cb_extra */ + NIL, /* attnamelist */ + pxfsstate->options->copy_options /* copy options */ + ); + + if (pxfsstate->options->reject_limit == -1) + { + /* Default error handling - "all-or-nothing" */ + cstate->cdbsreh = NULL; + cstate->errMode = ALL_OR_NOTHING; + } + else + { + /* no error log by default */ + cstate->errMode = SREH_IGNORE; + + /* select the SREH mode */ + if (pxfsstate->options->log_errors) + cstate->errMode = SREH_LOG; + + cstate->cdbsreh = makeCdbSreh(pxfsstate->options->reject_limit, + pxfsstate->options->is_reject_limit_rows, + pxfsstate->options->resource, + (char *) cstate->cur_relname, + pxfsstate->options->log_errors ? LOG_ERRORS_ENABLE : LOG_ERRORS_DISABLE); + + cstate->cdbsreh->relid = RelationGetRelid(pxfsstate->relation); + } + + /* and 'fe_mgbuf' */ + cstate->fe_msgbuf = makeStringInfo(); + + /* + * Create a temporary memory context that we can reset once per row to + * recover palloc'd memory. This avoids any problems with leaks inside + * datatype input or output routines, and should be faster than retail + * pfree's anyway. + */ + cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, + "PxfFdwMemCxt", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + pxfsstate->cstate = cstate; +} + /* * Initiates a copy state for pxfBeginForeignModify() */ @@ -915,22 +1429,43 @@ void PxfCopyFromErrorCallback(void *arg) { PxfFdwScanState *pxfsstate = (PxfFdwScanState *) arg; - CopyFromState cstate = pxfsstate->cstate; + CopyFromState cstate; char curlineno_str[32]; + if (pxfsstate == NULL) + return; + + /* + * Derive relname and resource from pxfsstate fields that are set once + * in BeginForeignScan and maintained by the executor — these are always + * valid. We deliberately avoid cstate->cur_relname because cstate + * internals can be unreliable when PXF returns error data that corrupts + * the COPY parser state. + */ + const char *relname = pxfsstate->relation + ? RelationGetRelationName(pxfsstate->relation) + : "(unknown)"; + const char *resource = (pxfsstate->options && pxfsstate->options->resource) + ? pxfsstate->options->resource : "(unknown)"; + + cstate = pxfsstate->cstate; + if (cstate == NULL) + { + errcontext("Foreign table %s, resource %s", relname, resource); + return; + } + snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT, cstate->cur_lineno); if (cstate->opts.binary) { - /* can't usefully display the data */ - if (cstate->cur_attname) - errcontext("Foreign table %s, record %s of %s, column %s", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource, - cstate->cur_attname); - else - errcontext("Foreign table %s, record %s of %s", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource); + /* + * PXF does not support binary format. If we land here, cstate + * internals may be unreliable — report only safe information. + */ + errcontext("Foreign table %s, record %s of %s", + relname, curlineno_str, resource); } else { @@ -941,7 +1476,7 @@ PxfCopyFromErrorCallback(void *arg) attval = limit_printout_length(cstate->cur_attval); errcontext("Foreign table %s, record %s of %s, column %s: \"%s\"", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource, + relname, curlineno_str, resource, cstate->cur_attname, attval); pfree(attval); } @@ -949,7 +1484,7 @@ PxfCopyFromErrorCallback(void *arg) { /* error is relevant to a particular column, value is NULL */ errcontext("Foreign table %s, record %s of %s, column %s: null input", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource, + relname, curlineno_str, resource, cstate->cur_attname); } else @@ -969,7 +1504,7 @@ PxfCopyFromErrorCallback(void *arg) /* token was found, get the actual message and set it as the main error message */ errmsg("%s", token_index + PXF_ERROR_TOKEN_SIZE); errcontext("Foreign table %s, record %s of %s", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource); + relname, curlineno_str, resource); } /* * Error is relevant to a particular line. @@ -988,7 +1523,7 @@ PxfCopyFromErrorCallback(void *arg) lineval = limit_printout_length(cstate->line_buf.data); //truncateEolStr(line_buf, cstate->eol_type); <-- this is done in GP6, but not in GP7 ? errcontext("Foreign table %s, record %s of %s: \"%s\"", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource, lineval); + relname, curlineno_str, resource, lineval); pfree(lineval); } else @@ -1002,8 +1537,206 @@ PxfCopyFromErrorCallback(void *arg) * and just report the line number. */ errcontext("Foreign table %s, record %s of %s", - cstate->cur_relname, curlineno_str, pxfsstate->options->resource); + relname, curlineno_str, resource); } } } } + +/* + * ============================================================================ + * Parallel Foreign Scan Support + * ============================================================================ + */ + +/* + * pxfIsForeignScanParallelSafe + * Determine whether a foreign scan is parallel safe. + * + * Returns true if enable_parallel option is set for the foreign table, + * allowing the planner to consider parallel execution. + */ +static bool +pxfIsForeignScanParallelSafe(PlannerInfo *root, + RelOptInfo *rel, + RangeTblEntry *rte) +{ + PxfOptions *options = PxfGetOptions(rte->relid); + + elog(DEBUG3, "pxf_fdw: pxfIsForeignScanParallelSafe called, enable_parallel=%d", + options->enable_parallel); + + /* + * Only return true if parallel execution is explicitly enabled. + * This ensures backward compatibility - existing queries continue + * to work without parallel execution unless explicitly enabled. + */ + return options->enable_parallel; +} + +/* + * pxfEstimateDSMForeignScan + * Estimate the amount of dynamic shared memory required for parallel scan. + * + * We need space for the PxfParallelScanState structure which tracks + * fragment distribution among parallel workers. + */ +static Size +pxfEstimateDSMForeignScan(ForeignScanState *node, + ParallelContext *pcxt) +{ + Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation); + PxfOptions *options = PxfGetOptions(foreigntableid); + + if (!options->enable_parallel) + return 0; + + elog(DEBUG3, "pxf_fdw: pxfEstimateDSMForeignScan returning %zu bytes", + sizeof(PxfParallelScanState)); + + return sizeof(PxfParallelScanState); +} + +/* + * pxfInitializeDSMForeignScan + * Initialize dynamic shared memory for parallel scan (leader process). + * + * The leader process fetches the list of fragments from PXF server and + * initializes the shared state for fragment distribution. + */ +static void +pxfInitializeDSMForeignScan(ForeignScanState *node, + ParallelContext *pcxt, + void *coordinate) +{ + PxfParallelScanState *pstate = (PxfParallelScanState *) coordinate; + PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state; + Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation); + PxfOptions *options = PxfGetOptions(foreigntableid); + + if (!options->enable_parallel) + return; + + elog(DEBUG3, "pxf_fdw: pxfInitializeDSMForeignScan initializing parallel state"); + + /* Initialize the spinlock */ + SpinLockInit(&pstate->mutex); + + /* + * Fetch fragment list from PXF server. + * This populates pxfsstate->fragments and pxfsstate->num_fragments. + */ + if (pxfsstate != NULL) + { + int all_fragments = PxfBridgeFetchFragments(pxfsstate); + int seg_id = PXF_SEGMENT_ID; + int seg_count = PXF_SEGMENT_COUNT; + + /* Count fragments assigned to this segment (round-robin: frag_i → seg_i%count) */ + int my_fragments = 0; + for (int i = 0; i < all_fragments; i++) + { + if (i % seg_count == seg_id) + my_fragments++; + } + + pstate->total_fragments = my_fragments; + pstate->next_fragment = 0; + pstate->finished = (my_fragments == 0); + + /* Store reference to shared state */ + pxfsstate->pstate = pstate; + pxfsstate->is_parallel = true; + + elog(DEBUG3, "pxf_fdw: leader seg %d initialized with %d/%d fragments", + seg_id, my_fragments, all_fragments); + } + else + { + /* No scan state — mark finished to prevent workers from spinning */ + pstate->total_fragments = 0; + pstate->next_fragment = 0; + pstate->finished = true; + } +} + +/* + * pxfReInitializeDSMForeignScan + * Reinitialize parallel scan state for rescan. + * + * Reset the fragment counter so workers can process fragments again. + */ +static void +pxfReInitializeDSMForeignScan(ForeignScanState *node, + ParallelContext *pcxt, + void *coordinate) +{ + PxfParallelScanState *pstate = (PxfParallelScanState *) coordinate; + Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation); + PxfOptions *options = PxfGetOptions(foreigntableid); + + if (!options->enable_parallel) + return; + + elog(DEBUG3, "pxf_fdw: pxfReInitializeDSMForeignScan resetting parallel state"); + + SpinLockAcquire(&pstate->mutex); + pstate->next_fragment = 0; + pstate->finished = false; + SpinLockRelease(&pstate->mutex); +} + +/* + * pxfInitializeWorkerForeignScan + * Initialize a parallel worker's foreign scan state. + * + * Each worker attaches to the shared state and prepares to process fragments. + */ +static void +pxfInitializeWorkerForeignScan(ForeignScanState *node, + shm_toc *toc, + void *coordinate) +{ + PxfParallelScanState *pstate = (PxfParallelScanState *) coordinate; + PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state; + Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation); + PxfOptions *options = PxfGetOptions(foreigntableid); + + if (!options->enable_parallel) + return; + + elog(DEBUG3, "pxf_fdw: pxfInitializeWorkerForeignScan worker attaching to parallel state"); + + if (pxfsstate != NULL) + { + pxfsstate->pstate = pstate; + pxfsstate->is_parallel = true; + pxfsstate->num_fragments = pstate->total_fragments; + pxfsstate->current_fragment = -1; /* Will be assigned on first iteration */ + } +} + +/* + * pxfShutdownForeignScan + * Shutdown parallel foreign scan, cleanup shared resources. + */ +static void +pxfShutdownForeignScan(ForeignScanState *node) +{ + PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state; + + elog(DEBUG3, "pxf_fdw: pxfShutdownForeignScan called"); + + if (pxfsstate != NULL && pxfsstate->is_parallel) + { + /* Signal completion in shared state only. + * All resource cleanup belongs in EndForeignScan (called after + * Shutdown), avoiding double-free. */ + if (pxfsstate->pstate != NULL) + { + SpinLockAcquire(&pxfsstate->pstate->mutex); + pxfsstate->pstate->finished = true; + SpinLockRelease(&pxfsstate->pstate->mutex); + } + } +} diff --git a/fdw/pxf_option.c b/fdw/pxf_option.c index 13ccfe92..0d347031 100644 --- a/fdw/pxf_option.c +++ b/fdw/pxf_option.c @@ -39,6 +39,7 @@ #define FDW_OPTION_REJECT_LIMIT "reject_limit" #define FDW_OPTION_REJECT_LIMIT_TYPE "reject_limit_type" #define FDW_OPTION_RESOURCE "resource" +#define FDW_OPTION_ENABLE_PARALLEL "enable_parallel" #define FDW_COPY_OPTION_FORMAT "format" #define FDW_COPY_OPTION_HEADER "header" @@ -72,6 +73,10 @@ static const struct PxfFdwOption valid_options[] = { {FDW_OPTION_REJECT_LIMIT_TYPE, ForeignTableRelationId}, {FDW_OPTION_LOG_ERRORS, ForeignTableRelationId}, + /* Parallel execution */ + {FDW_OPTION_ENABLE_PARALLEL, ForeignTableRelationId}, + {FDW_OPTION_ENABLE_PARALLEL, ForeignServerRelationId}, + /* Sentinel */ {NULL, InvalidOid} }; @@ -454,6 +459,8 @@ PxfGetOptions(Oid foreigntableid) opt->log_errors = defGetBoolean(def); else if (strcmp(def->defname, FDW_OPTION_DISABLE_PPD) == 0) opt->disable_ppd = defGetBoolean(def); + else if (strcmp(def->defname, FDW_OPTION_ENABLE_PARALLEL) == 0) + opt->enable_parallel = defGetBoolean(def); else if (strcmp(def->defname, FDW_OPTION_FORMAT) == 0) { opt->format = defGetString(def); @@ -565,20 +572,37 @@ static void ValidateOption(char *option, Oid catalog) { const struct PxfFdwOption *entry; + bool found = false; for (entry = valid_options; entry->optname; entry++) { - /* option can only be defined at its catalog level */ - if (strcmp(entry->optname, option) == 0 && catalog != entry->optcontext) + if (strcmp(entry->optname, option) == 0) { - Relation rel = RelationIdGetRelation(entry->optcontext); + /* option is recognized; check if it's allowed at this catalog level */ + if (catalog == entry->optcontext) + return; /* valid — exact match */ + found = true; /* name matches but at a different level */ + } + } - ereport(ERROR, - (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), - errmsg( - "the %s option can only be defined at the %s level", - option, - RelationGetRelationName(rel)))); + if (found) + { + /* + * The option exists but is not valid at this catalog level. + * Report the first matching level for the error message. + */ + for (entry = valid_options; entry->optname; entry++) + { + if (strcmp(entry->optname, option) == 0) + { + Relation rel = RelationIdGetRelation(entry->optcontext); + + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("the %s option can only be defined at the %s level", + option, + RelationGetRelationName(rel)))); + } } } } diff --git a/fdw/pxf_option.h b/fdw/pxf_option.h index 15d27781..317139fc 100644 --- a/fdw/pxf_option.h +++ b/fdw/pxf_option.h @@ -55,6 +55,9 @@ typedef struct PxfOptions /* Encoding options */ char *data_encoding; /* The encoding of the data on the external system */ const char *database_encoding; /* The database encoding */ + + /* Parallel execution options */ + bool enable_parallel; /* whether to enable parallel foreign scan */ } PxfOptions; /* Functions prototypes for pxf_option.c file */ diff --git a/server/pxf-api/src/main/java/org/apache/cloudberry/pxf/api/model/RequestContext.java b/server/pxf-api/src/main/java/org/apache/cloudberry/pxf/api/model/RequestContext.java index 4866744b..b9bdd8d6 100644 --- a/server/pxf-api/src/main/java/org/apache/cloudberry/pxf/api/model/RequestContext.java +++ b/server/pxf-api/src/main/java/org/apache/cloudberry/pxf/api/model/RequestContext.java @@ -116,6 +116,14 @@ public enum RequestType { @Getter(AccessLevel.NONE) private FragmentMetadata fragmentMetadata; + /** + * The specific fragment index requested for parallel execution. + * When set (not null), indicates that only this specific fragment + * should be processed, bypassing the normal segment-based fragment + * distribution. This is used for PostgreSQL parallel workers. + */ + private Integer specificFragmentIndex; + /** * The filter string, null if #hasFilter is false. */ @@ -480,6 +488,16 @@ public boolean hasFilter() { return filterString != null; } + /** + * Returns true if a specific fragment index was requested for parallel execution. + * When true, only the fragment at the specified index should be processed. + * + * @return true if a specific fragment index was requested + */ + public boolean hasSpecificFragment() { + return specificFragmentIndex != null; + } + /** * Returns true if there is column projection. * diff --git a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/FragmenterService.java b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/FragmenterService.java index 590bb659..0f4e6d64 100644 --- a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/FragmenterService.java +++ b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/FragmenterService.java @@ -95,6 +95,67 @@ public List getFragmentsForSegment(RequestContext context) throws IOEx return filteredFragments; } + /** + * Returns all fragments without segment-based filtering. + * This method is used for parallel execution mode where fragment + * distribution is handled by the FDW client rather than the server. + * + * @param context the request context + * @return the complete list of fragments for the request + * @throws IOException when an exception occurs + */ + public List getAllFragments(RequestContext context) throws IOException { + LOG.trace("Received FRAGMENTER call for all fragments (parallel mode)"); + Instant startTime = Instant.now(); + + List fragments = getFragmentsFromCache(context, startTime); + + if (LOG.isDebugEnabled()) { + int numberOfFragments = fragments.size(); + long elapsedMillis = Duration.between(startTime, Instant.now()).toMillis(); + + LOG.debug("Returning all {} fragment{} for path {} in {} ms (parallel mode)", + numberOfFragments, numberOfFragments == 1 ? "" : "s", + context.getDataSource(), elapsedMillis); + } + + return fragments; + } + + /** + * Returns a single fragment at the specified index. + * This method is used for parallel execution mode where each worker + * requests a specific fragment by index. + * + * @param context the request context + * @param fragmentIndex the index of the fragment to return + * @return the fragment at the specified index + * @throws IOException when an exception occurs + * @throws IllegalArgumentException when the fragment index is out of bounds + */ + public Fragment getFragmentByIndex(RequestContext context, int fragmentIndex) throws IOException { + LOG.debug("Received request for specific fragment index {} (parallel mode)", fragmentIndex); + Instant startTime = Instant.now(); + + List allFragments = getFragmentsFromCache(context, startTime); + + if (fragmentIndex < 0 || fragmentIndex >= allFragments.size()) { + throw new IllegalArgumentException(String.format( + "Invalid fragment index %d, valid range is 0 to %d", + fragmentIndex, allFragments.size() - 1)); + } + + Fragment fragment = allFragments.get(fragmentIndex); + + if (LOG.isDebugEnabled()) { + long elapsedMillis = Duration.between(startTime, Instant.now()).toMillis(); + LOG.debug("Returning fragment {} of {} for path {} in {} ms (parallel mode)", + fragmentIndex, allFragments.size(), context.getDataSource(), elapsedMillis); + } + + return fragment; + } + /** * Returns the list of fragments from the fragmenter cache. If the cache is * empty, it populates the cache with the list of fragments. When diff --git a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/HttpRequestParser.java b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/HttpRequestParser.java index 5de31a7a..12b318a4 100644 --- a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/HttpRequestParser.java +++ b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/HttpRequestParser.java @@ -135,6 +135,13 @@ public RequestContext parseRequest(MultiValueMap requestHeaders, context.setFragmenter(params.removeUserProperty("FRAGMENTER")); + // Parse specific fragment index for parallel execution + String fragmentIndexStr = params.removeOptionalProperty("FRAGMENT-INDEX"); + if (StringUtils.isNotBlank(fragmentIndexStr)) { + context.setSpecificFragmentIndex(Integer.parseInt(fragmentIndexStr)); + LOG.debug("Parallel mode: specific fragment index {} requested", fragmentIndexStr); + } + context.setHost(params.removeProperty("URL-HOST")); context.setMetadata(params.removeUserProperty("METADATA")); context.setPort(params.removeIntProperty("URL-PORT")); diff --git a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/controller/ReadServiceImpl.java b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/controller/ReadServiceImpl.java index e4558afe..921254fd 100644 --- a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/controller/ReadServiceImpl.java +++ b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/controller/ReadServiceImpl.java @@ -42,10 +42,10 @@ public class ReadServiceImpl extends BaseServiceImpl implements * @param metricsReporter metrics reporter service */ public ReadServiceImpl(ConfigurationFactory configurationFactory, - BridgeFactory bridgeFactory, - SecurityService securityService, - FragmenterService fragmenterService, - MetricsReporter metricsReporter) { + BridgeFactory bridgeFactory, + SecurityService securityService, + FragmenterService fragmenterService, + MetricsReporter metricsReporter) { super("Read", configurationFactory, bridgeFactory, securityService, metricsReporter); this.fragmenterService = fragmenterService; } @@ -53,14 +53,18 @@ public ReadServiceImpl(ConfigurationFactory configurationFactory, @Override public void readData(RequestContext context, OutputStream outputStream) { // wrapping the invocation of processData(..) with the error reporting logic - // since any exception thrown from it must be logged, as this method is called asynchronously - // and is the last opportunity to log the exception while having MDC logging context defined + // since any exception thrown from it must be logged, as this method is called + // asynchronously + // and is the last opportunity to log the exception while having MDC logging + // context defined invokeWithErrorHandling(() -> processData(context, () -> writeStream(context, outputStream))); } /** - * Calls Fragmenter service to get a list of fragments for the resource, then reads records for each fragment - * and writes them to the output stream. Maintains the satistics about the progress of the query and reports + * Calls Fragmenter service to get a list of fragments for the resource, then + * reads records for each fragment + * and writes them to the output stream. Maintains the satistics about the + * progress of the query and reports * it to the caller even if the operation failed or aborted. * * @param context request context @@ -78,11 +82,26 @@ private OperationResult writeStream(RequestContext context, OutputStream outputS OperationStats queryStats = new OperationStats(OperationStats.Operation.READ, metricsReporter, context); OperationResult queryResult = new OperationResult(); - // dataStream (and outputStream as the result) will close automatically at the end of the try block + // dataStream (and outputStream as the result) will close automatically at the + // end of the try block CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); String sourceName = null; try { - List fragments = fragmenterService.getFragmentsForSegment(context); + List fragments; + + // Check if a specific fragment index was requested (parallel execution mode) + if (context.hasSpecificFragment()) { + // Parallel mode: only process the specified fragment + Fragment specificFragment = fragmenterService.getFragmentByIndex( + context, context.getSpecificFragmentIndex()); + fragments = java.util.Collections.singletonList(specificFragment); + log.debug("Parallel mode: processing only fragment {} of resource {}", + context.getSpecificFragmentIndex(), context.getDataSource()); + } else { + // Normal mode: get fragments filtered by segment + fragments = fragmenterService.getFragmentsForSegment(context); + } + for (int i = 0; i < fragments.size(); i++) { Fragment fragment = fragments.get(i); sourceName = fragment.getSourceName(); @@ -116,8 +135,10 @@ private OperationResult writeStream(RequestContext context, OutputStream outputS } } } catch (Exception e) { - // the exception is not re-thrown but passed to the caller in the queryResult so that - // the caller has a chance to inspect / report query stats before re-throwing the exception + // the exception is not re-thrown but passed to the caller in the queryResult so + // that + // the caller has a chance to inspect / report query stats before re-throwing + // the exception queryResult.setException(e); queryResult.setSourceName(sourceName); } finally { @@ -128,7 +149,8 @@ private OperationResult writeStream(RequestContext context, OutputStream outputS } /** - * Processes a single fragment identified in the RequestContext and updates query statistics. + * Processes a single fragment identified in the RequestContext and updates + * query statistics. * * @param countingOutputStream output stream to write data to * @param context request context @@ -136,8 +158,8 @@ private OperationResult writeStream(RequestContext context, OutputStream outputS * @throws Exception if operation fails */ private void processFragment(CountingOutputStream countingOutputStream, - RequestContext context, - OperationStats queryStats) throws Exception { + RequestContext context, + OperationStats queryStats) throws Exception { Writable record; DataOutputStream dos = new DataOutputStream(countingOutputStream); @@ -177,12 +199,14 @@ private void processFragment(CountingOutputStream countingOutputStream, fragmentStats.setByteCount(countingOutputStream.getCount() - previousStreamByteCount); fragmentStats.flushStats(); - // update query stats even if there was an exception so that they can be properly reported by the + // update query stats even if there was an exception so that they can be + // properly reported by the // error reporter queryStats.update(fragmentStats); log.debug("Finished processing fragment {} of resource {} in {} ms, wrote {} records and {} bytes.", - context.getFragmentIndex(), context.getDataSource(), duration.toMillis(), fragmentStats.getRecordCount(), fragmentStats.getByteCount()); + context.getFragmentIndex(), context.getDataSource(), duration.toMillis(), + fragmentStats.getRecordCount(), fragmentStats.getByteCount()); metricsReporter.reportTimer(MetricsReporter.PxfMetric.FRAGMENTS_SENT, duration, context, success); } }