diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query01.ans b/automation/sqlrepo/features/parallel/correctness/expected/query01.ans
new file mode 100644
index 00000000..7672dea1
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query01.ans
@@ -0,0 +1,13 @@
+-- @description query01 for PXF parallel scan correctness - count with parallel
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT count(*) FROM pxf_parallel_enabled;
+ count
+-------
+ 10000
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query02.ans b/automation/sqlrepo/features/parallel/correctness/expected/query02.ans
new file mode 100644
index 00000000..4a77aafb
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query02.ans
@@ -0,0 +1,13 @@
+-- @description query02 for PXF parallel scan correctness - sum aggregation
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT sum(id) FROM pxf_parallel_enabled;
+ sum
+----------
+ 50005000
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query03.ans b/automation/sqlrepo/features/parallel/correctness/expected/query03.ans
new file mode 100644
index 00000000..918acaa1
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query03.ans
@@ -0,0 +1,21 @@
+-- @description query03 for PXF parallel scan correctness - cross-check parallel vs non-parallel count
+
+SET optimizer = off;
+SET
+SET enable_parallel = false;
+SET
+SELECT count(*) AS non_parallel_count FROM pxf_parallel_disabled;
+ non_parallel_count
+--------------------
+ 10000
+(1 row)
+
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT count(*) AS parallel_count FROM pxf_parallel_enabled;
+ parallel_count
+----------------
+ 10000
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query04.ans b/automation/sqlrepo/features/parallel/correctness/expected/query04.ans
new file mode 100644
index 00000000..c3caefbe
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query04.ans
@@ -0,0 +1,22 @@
+-- @description query04 for PXF parallel scan correctness - ORDER BY with LIMIT
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT id, val FROM pxf_parallel_enabled ORDER BY id LIMIT 10;
+ id | val
+----+-------
+ 1 | row_1
+ 2 | row_2
+ 3 | row_3
+ 4 | row_4
+ 5 | row_5
+ 6 | row_6
+ 7 | row_7
+ 8 | row_8
+ 9 | row_9
+ 10 | row_10
+(10 rows)
diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query05.ans b/automation/sqlrepo/features/parallel/correctness/expected/query05.ans
new file mode 100644
index 00000000..7df9da3f
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query05.ans
@@ -0,0 +1,13 @@
+-- @description query05 for PXF parallel scan correctness - MIN/MAX/AVG aggregates
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT min(id), max(id), avg(id) FROM pxf_parallel_enabled;
+ min | max | avg
+-----+-------+-----------------------
+ 1 | 10000 | 5000.5000000000000000
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query06.ans b/automation/sqlrepo/features/parallel/correctness/expected/query06.ans
new file mode 100644
index 00000000..e37f743a
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query06.ans
@@ -0,0 +1,13 @@
+-- @description query06 for PXF parallel scan correctness - WHERE pushdown with parallel
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT count(*) FROM pxf_parallel_enabled WHERE id > 5000;
+ count
+-------
+ 5000
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query07.ans b/automation/sqlrepo/features/parallel/correctness/expected/query07.ans
new file mode 100644
index 00000000..078b7721
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query07.ans
@@ -0,0 +1,17 @@
+-- @description query07 for PXF parallel scan correctness - column projection with WHERE
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT val FROM pxf_parallel_enabled WHERE id <= 5 ORDER BY val;
+ val
+-------
+ row_1
+ row_2
+ row_3
+ row_4
+ row_5
+(5 rows)
diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query08.ans b/automation/sqlrepo/features/parallel/correctness/expected/query08.ans
new file mode 100644
index 00000000..3d27ff32
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query08.ans
@@ -0,0 +1,13 @@
+-- @description query08 for PXF parallel scan correctness - empty result edge case
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT count(*) FROM pxf_parallel_enabled WHERE id < 0;
+ count
+-------
+ 0
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query09.ans b/automation/sqlrepo/features/parallel/correctness/expected/query09.ans
new file mode 100644
index 00000000..250b4b75
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query09.ans
@@ -0,0 +1,13 @@
+-- @description query09 for PXF parallel scan correctness - COUNT DISTINCT no duplicates
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT count(DISTINCT id) FROM pxf_parallel_enabled;
+ count
+-------
+ 10000
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/correctness/expected/query10.ans b/automation/sqlrepo/features/parallel/correctness/expected/query10.ans
new file mode 100644
index 00000000..d993ffb2
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/expected/query10.ans
@@ -0,0 +1,13 @@
+-- @description query10 for PXF parallel scan correctness - workers=0 fallback on parallel table
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 0;
+SET
+SELECT count(*) FROM pxf_parallel_enabled;
+ count
+-------
+ 10000
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query01.sql b/automation/sqlrepo/features/parallel/correctness/sql/query01.sql
new file mode 100644
index 00000000..5149f3f8
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query01.sql
@@ -0,0 +1,6 @@
+-- @description query01 for PXF parallel scan correctness - count with parallel
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT count(*) FROM pxf_parallel_enabled;
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query02.sql b/automation/sqlrepo/features/parallel/correctness/sql/query02.sql
new file mode 100644
index 00000000..d9ec4dca
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query02.sql
@@ -0,0 +1,6 @@
+-- @description query02 for PXF parallel scan correctness - sum aggregation
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT sum(id) FROM pxf_parallel_enabled;
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query03.sql b/automation/sqlrepo/features/parallel/correctness/sql/query03.sql
new file mode 100644
index 00000000..536dccd8
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query03.sql
@@ -0,0 +1,9 @@
+-- @description query03 for PXF parallel scan correctness - cross-check parallel vs non-parallel count
+
+SET optimizer = off;
+SET enable_parallel = false;
+SELECT count(*) AS non_parallel_count FROM pxf_parallel_disabled;
+
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT count(*) AS parallel_count FROM pxf_parallel_enabled;
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query04.sql b/automation/sqlrepo/features/parallel/correctness/sql/query04.sql
new file mode 100644
index 00000000..251f9a4a
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query04.sql
@@ -0,0 +1,6 @@
+-- @description query04 for PXF parallel scan correctness - ORDER BY with LIMIT
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT id, val FROM pxf_parallel_enabled ORDER BY id LIMIT 10;
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query05.sql b/automation/sqlrepo/features/parallel/correctness/sql/query05.sql
new file mode 100644
index 00000000..7379f88c
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query05.sql
@@ -0,0 +1,6 @@
+-- @description query05 for PXF parallel scan correctness - MIN/MAX/AVG aggregates
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT min(id), max(id), avg(id) FROM pxf_parallel_enabled;
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query06.sql b/automation/sqlrepo/features/parallel/correctness/sql/query06.sql
new file mode 100644
index 00000000..2be9bd6a
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query06.sql
@@ -0,0 +1,6 @@
+-- @description query06 for PXF parallel scan correctness - WHERE pushdown with parallel
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT count(*) FROM pxf_parallel_enabled WHERE id > 5000;
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query07.sql b/automation/sqlrepo/features/parallel/correctness/sql/query07.sql
new file mode 100644
index 00000000..b7d7ebff
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query07.sql
@@ -0,0 +1,6 @@
+-- @description query07 for PXF parallel scan correctness - column projection with WHERE
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT val FROM pxf_parallel_enabled WHERE id <= 5 ORDER BY val;
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query08.sql b/automation/sqlrepo/features/parallel/correctness/sql/query08.sql
new file mode 100644
index 00000000..136a6a33
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query08.sql
@@ -0,0 +1,6 @@
+-- @description query08 for PXF parallel scan correctness - empty result edge case
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT count(*) FROM pxf_parallel_enabled WHERE id < 0;
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query09.sql b/automation/sqlrepo/features/parallel/correctness/sql/query09.sql
new file mode 100644
index 00000000..9743ae30
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query09.sql
@@ -0,0 +1,6 @@
+-- @description query09 for PXF parallel scan correctness - COUNT DISTINCT no duplicates
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT count(DISTINCT id) FROM pxf_parallel_enabled;
diff --git a/automation/sqlrepo/features/parallel/correctness/sql/query10.sql b/automation/sqlrepo/features/parallel/correctness/sql/query10.sql
new file mode 100644
index 00000000..17a89a38
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/correctness/sql/query10.sql
@@ -0,0 +1,6 @@
+-- @description query10 for PXF parallel scan correctness - workers=0 fallback on parallel table
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 0;
+SELECT count(*) FROM pxf_parallel_enabled;
diff --git a/automation/sqlrepo/features/parallel/explain/expected/query01.ans b/automation/sqlrepo/features/parallel/explain/expected/query01.ans
new file mode 100644
index 00000000..6fb6a328
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/explain/expected/query01.ans
@@ -0,0 +1,32 @@
+-- @description query01 for PXF parallel scan EXPLAIN - Gather node present when parallel enabled
+-- start_matchsubs
+--
+-- m/Workers Planned: \d+/
+-- s/Workers Planned: \d+/Workers Planned: N/
+--
+-- m/cost=\d+\.\d+\.\.\d+\.\d+/
+-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/
+--
+-- m/rows=\d+/
+-- s/rows=\d+/rows=NNN/
+--
+-- m/width=\d+/
+-- s/width=\d+/width=NN/
+--
+-- end_matchsubs
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+EXPLAIN SELECT count(*) FROM pxf_parallel_enabled;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------
+ Gather Motion 1:1 (slice1; segments: 1) (cost=XXX rows=NNN width=NN)
+ -> Finalize Aggregate (cost=XXX rows=NNN width=NN)
+ -> Gather (cost=XXX rows=NNN width=NN)
+ Workers Planned: N
+ -> Partial Aggregate (cost=XXX rows=NNN width=NN)
+ -> Parallel Foreign Scan on pxf_parallel_enabled (cost=XXX rows=NNN width=NN)
+(6 rows)
diff --git a/automation/sqlrepo/features/parallel/explain/expected/query02.ans b/automation/sqlrepo/features/parallel/explain/expected/query02.ans
new file mode 100644
index 00000000..fd7207d5
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/explain/expected/query02.ans
@@ -0,0 +1,24 @@
+-- @description query02 for PXF parallel scan EXPLAIN - no Gather node when parallel disabled
+-- start_matchsubs
+--
+-- m/cost=\d+\.\d+\.\.\d+\.\d+/
+-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/
+--
+-- m/rows=\d+/
+-- s/rows=\d+/rows=NNN/
+--
+-- m/width=\d+/
+-- s/width=\d+/width=NN/
+--
+-- end_matchsubs
+SET optimizer = off;
+SET
+SET enable_parallel = false;
+SET
+EXPLAIN SELECT count(*) FROM pxf_parallel_disabled;
+ QUERY PLAN
+----------------------------------------------------------------------------------------
+ Gather Motion 1:1 (slice1; segments: 1) (cost=XXX rows=NNN width=NN)
+ -> Aggregate (cost=XXX rows=NNN width=NN)
+ -> Foreign Scan on pxf_parallel_disabled (cost=XXX rows=NNN width=NN)
+(3 rows)
diff --git a/automation/sqlrepo/features/parallel/explain/expected/query03.ans b/automation/sqlrepo/features/parallel/explain/expected/query03.ans
new file mode 100644
index 00000000..cbc76f18
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/explain/expected/query03.ans
@@ -0,0 +1,84 @@
+-- @description query03 for PXF parallel scan EXPLAIN ANALYZE - Workers Launched present
+-- start_matchsubs
+--
+-- m/Workers Planned: \d+/
+-- s/Workers Planned: \d+/Workers Planned: N/
+--
+-- m/Workers Launched: \d+/
+-- s/Workers Launched: \d+/Workers Launched: N/
+--
+-- m/cost=\d+\.\d+\.\.\d+\.\d+/
+-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/
+--
+-- m/rows=\d+/
+-- s/rows=\d+/rows=NNN/
+--
+-- m/width=\d+/
+-- s/width=\d+/width=NN/
+--
+-- m/actual time=\d+\.\d+\.\.\d+\.\d+/
+-- s/actual time=\d+\.\d+\.\.\d+\.\d+/actual time=XXX/
+--
+-- m/loops=\d+/
+-- s/loops=\d+/loops=N/
+--
+-- m/Execution Time: \d+\.\d+ ms/
+-- s/Execution Time: \d+\.\d+ ms/Execution Time: XXX ms/
+--
+-- m/Planning Time: \d+\.\d+ ms/
+-- s/Planning Time: \d+\.\d+ ms/Planning Time: XXX ms/
+--
+-- m/Memory Usage: \d+kB/
+-- s/Memory Usage: \d+kB/Memory Usage: NNkB/
+--
+-- m/Memory: \d+kB/
+-- s/Memory: \d+kB/Memory: NNkB/
+--
+-- m/Buckets: \d+/
+-- s/Buckets: \d+/Buckets: NNN/
+--
+-- m/Batches: \d+/
+-- s/Batches: \d+/Batches: NNN/
+--
+-- m/Peak Memory Usage: \d+/
+-- s/Peak Memory Usage: \d+/Peak Memory Usage: NNN/
+--
+-- m/Avg Peak Memory \(per process\): \d+/
+-- s/Avg Peak Memory \(per process\): \d+/Avg Peak Memory (per process): NNN/
+--
+-- m/slice \d+; segments: \d+/
+-- s/slice \d+; segments: \d+/slice N; segments: N/
+--
+-- m/Optimizer: .*/
+-- s/Optimizer: .*/Optimizer: OPT/
+--
+-- end_matchsubs
+-- start_matchignore
+--
+-- m/^\s+Slice statistics:/
+-- m/^\s+\(slice\d+\)/
+-- m/^\s+Statement statistics:/
+-- m/^\s+Settings:/
+-- m/^\s+Query Identifier:/
+--
+-- end_matchignore
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+EXPLAIN ANALYZE SELECT count(*) FROM pxf_parallel_enabled;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------
+ Gather Motion 1:1 (slice N; segments: N) (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N)
+ -> Finalize Aggregate (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N)
+ -> Gather (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N)
+ Workers Planned: N
+ Workers Launched: N
+ -> Partial Aggregate (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N)
+ -> Parallel Foreign Scan on pxf_parallel_enabled (cost=XXX rows=NNN width=NN) (actual time=XXX rows=NNN loops=N)
+ Optimizer: OPT
+ Planning Time: XXX ms
+ Execution Time: XXX ms
+(10 rows)
diff --git a/automation/sqlrepo/features/parallel/explain/expected/query04.ans b/automation/sqlrepo/features/parallel/explain/expected/query04.ans
new file mode 100644
index 00000000..14bcc381
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/explain/expected/query04.ans
@@ -0,0 +1,26 @@
+-- @description query04 for PXF parallel scan EXPLAIN - no Gather with workers=0 on parallel table
+-- start_matchsubs
+--
+-- m/cost=\d+\.\d+\.\.\d+\.\d+/
+-- s/cost=\d+\.\d+\.\.\d+\.\d+/cost=XXX/
+--
+-- m/rows=\d+/
+-- s/rows=\d+/rows=NNN/
+--
+-- m/width=\d+/
+-- s/width=\d+/width=NN/
+--
+-- end_matchsubs
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 0;
+SET
+EXPLAIN SELECT count(*) FROM pxf_parallel_enabled;
+ QUERY PLAN
+----------------------------------------------------------------------------------------
+ Gather Motion 1:1 (slice1; segments: 1) (cost=XXX rows=NNN width=NN)
+ -> Aggregate (cost=XXX rows=NNN width=NN)
+ -> Foreign Scan on pxf_parallel_enabled (cost=XXX rows=NNN width=NN)
+(3 rows)
diff --git a/automation/sqlrepo/features/parallel/explain/sql/query01.sql b/automation/sqlrepo/features/parallel/explain/sql/query01.sql
new file mode 100644
index 00000000..17eecc09
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/explain/sql/query01.sql
@@ -0,0 +1,6 @@
+-- @description query01 for PXF parallel scan EXPLAIN - Gather node present when parallel enabled
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+EXPLAIN SELECT count(*) FROM pxf_parallel_enabled;
diff --git a/automation/sqlrepo/features/parallel/explain/sql/query02.sql b/automation/sqlrepo/features/parallel/explain/sql/query02.sql
new file mode 100644
index 00000000..be769b18
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/explain/sql/query02.sql
@@ -0,0 +1,5 @@
+-- @description query02 for PXF parallel scan EXPLAIN - no Gather node when parallel disabled
+
+SET optimizer = off;
+SET enable_parallel = false;
+EXPLAIN SELECT count(*) FROM pxf_parallel_disabled;
diff --git a/automation/sqlrepo/features/parallel/explain/sql/query03.sql b/automation/sqlrepo/features/parallel/explain/sql/query03.sql
new file mode 100644
index 00000000..a69394d3
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/explain/sql/query03.sql
@@ -0,0 +1,6 @@
+-- @description query03 for PXF parallel scan EXPLAIN ANALYZE - Workers Launched present
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+EXPLAIN ANALYZE SELECT count(*) FROM pxf_parallel_enabled;
diff --git a/automation/sqlrepo/features/parallel/explain/sql/query04.sql b/automation/sqlrepo/features/parallel/explain/sql/query04.sql
new file mode 100644
index 00000000..1b783608
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/explain/sql/query04.sql
@@ -0,0 +1,6 @@
+-- @description query04 for PXF parallel scan EXPLAIN - no Gather with workers=0 on parallel table
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 0;
+EXPLAIN SELECT count(*) FROM pxf_parallel_enabled;
diff --git a/automation/sqlrepo/features/parallel/rescan/expected/query01.ans b/automation/sqlrepo/features/parallel/rescan/expected/query01.ans
new file mode 100644
index 00000000..8d00c268
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/rescan/expected/query01.ans
@@ -0,0 +1,18 @@
+-- @description query01 for PXF parallel scan rescan - correlated subquery triggers rescan
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT t.id, (SELECT count(*) FROM pxf_parallel_enabled WHERE id <= t.id) AS running_count
+FROM pxf_parallel_enabled t
+WHERE t.id IN (1, 5000, 10000)
+ORDER BY t.id;
+ id | running_count
+-------+---------------
+ 1 | 1
+ 5000 | 5000
+ 10000 | 10000
+(3 rows)
diff --git a/automation/sqlrepo/features/parallel/rescan/sql/query01.sql b/automation/sqlrepo/features/parallel/rescan/sql/query01.sql
new file mode 100644
index 00000000..19553ad2
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/rescan/sql/query01.sql
@@ -0,0 +1,9 @@
+-- @description query01 for PXF parallel scan rescan - correlated subquery triggers rescan
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT t.id, (SELECT count(*) FROM pxf_parallel_enabled WHERE id <= t.id) AS running_count
+FROM pxf_parallel_enabled t
+WHERE t.id IN (1, 5000, 10000)
+ORDER BY t.id;
diff --git a/automation/sqlrepo/features/parallel/single_fragment/expected/query01.ans b/automation/sqlrepo/features/parallel/single_fragment/expected/query01.ans
new file mode 100644
index 00000000..757c97f4
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/single_fragment/expected/query01.ans
@@ -0,0 +1,13 @@
+-- @description query01 for PXF parallel scan single fragment - count with excess workers
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT count(*) FROM pxf_parallel_single_frag;
+ count
+-------
+ 100
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/single_fragment/expected/query02.ans b/automation/sqlrepo/features/parallel/single_fragment/expected/query02.ans
new file mode 100644
index 00000000..2590defc
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/single_fragment/expected/query02.ans
@@ -0,0 +1,13 @@
+-- @description query02 for PXF parallel scan single fragment - sum aggregation
+
+SET optimizer = off;
+SET
+SET enable_parallel = true;
+SET
+SET max_parallel_workers_per_gather = 4;
+SET
+SELECT sum(id) FROM pxf_parallel_single_frag;
+ sum
+------
+ 5050
+(1 row)
diff --git a/automation/sqlrepo/features/parallel/single_fragment/sql/query01.sql b/automation/sqlrepo/features/parallel/single_fragment/sql/query01.sql
new file mode 100644
index 00000000..7e7758ce
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/single_fragment/sql/query01.sql
@@ -0,0 +1,6 @@
+-- @description query01 for PXF parallel scan single fragment - count with excess workers
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT count(*) FROM pxf_parallel_single_frag;
diff --git a/automation/sqlrepo/features/parallel/single_fragment/sql/query02.sql b/automation/sqlrepo/features/parallel/single_fragment/sql/query02.sql
new file mode 100644
index 00000000..06243e54
--- /dev/null
+++ b/automation/sqlrepo/features/parallel/single_fragment/sql/query02.sql
@@ -0,0 +1,6 @@
+-- @description query02 for PXF parallel scan single fragment - sum aggregation
+
+SET optimizer = off;
+SET enable_parallel = true;
+SET max_parallel_workers_per_gather = 4;
+SELECT sum(id) FROM pxf_parallel_single_frag;
diff --git a/automation/src/test/java/org/apache/cloudberry/pxf/automation/features/parallel/ParallelScanTest.java b/automation/src/test/java/org/apache/cloudberry/pxf/automation/features/parallel/ParallelScanTest.java
new file mode 100644
index 00000000..11815b6f
--- /dev/null
+++ b/automation/src/test/java/org/apache/cloudberry/pxf/automation/features/parallel/ParallelScanTest.java
@@ -0,0 +1,106 @@
+package org.apache.cloudberry.pxf.automation.features.parallel;
+
+import annotations.WorksWithFDW;
+import org.apache.cloudberry.pxf.automation.features.BaseFeature;
+import org.apache.cloudberry.pxf.automation.structures.tables.basic.Table;
+import org.apache.cloudberry.pxf.automation.structures.tables.utils.TableFactory;
+import org.apache.cloudberry.pxf.automation.utils.system.ProtocolEnum;
+import org.apache.cloudberry.pxf.automation.utils.system.ProtocolUtils;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for PG-style Gather parallel scan on PXF FDW foreign tables.
+ *
+ * Verifies:
+ * - Correctness: count, sum, ordering match between parallel and non-parallel scans
+ * - EXPLAIN: Gather node present when parallel enabled, absent when disabled
+ */
+@WorksWithFDW
+public class ParallelScanTest extends BaseFeature {
+
+ private static final String[] FIELDS = {"id integer", "val text"};
+ private static final int FILES = 10;
+ private static final int ROWS_PER_FILE = 1000;
+ private static final int SINGLE_FRAG_ROWS = 100;
+
+ private String hdfsDir;
+ private String singleFragDir;
+
+ @Override
+ protected void beforeClass() throws Exception {
+ super.beforeClass();
+
+ hdfsDir = hdfs.getWorkingDirectory() + "/parallel_data";
+ hdfs.createDirectory(hdfsDir);
+
+ // Write 10 CSV files to HDFS (10 x 1000 rows = 10,000 total)
+ for (int fileIdx = 0; fileIdx < FILES; fileIdx++) {
+ Table dataTable = new Table("part_" + String.format("%02d", fileIdx), null);
+ int startId = fileIdx * ROWS_PER_FILE + 1;
+ int endId = (fileIdx + 1) * ROWS_PER_FILE;
+ for (int id = startId; id <= endId; id++) {
+ dataTable.addRow(new String[]{String.valueOf(id), "row_" + id});
+ }
+ hdfs.writeTableToFile(hdfsDir + "/part_" + String.format("%02d", fileIdx) + ".csv",
+ dataTable, ",");
+ }
+
+ ProtocolEnum protocol = ProtocolUtils.getProtocol();
+ String tablePath = protocol.getExternalTablePath(hdfs.getBasePath(), hdfsDir);
+
+ // Create foreign table with enable_parallel=true
+ exTable = TableFactory.getPxfReadableCSVTable("pxf_parallel_enabled",
+ FIELDS, tablePath, ",");
+ exTable.setHost(pxfHost);
+ exTable.setPort(pxfPort);
+ exTable.setUserParameters(new String[]{"enable_parallel=true"});
+ gpdb.createTableAndVerify(exTable);
+
+ // Create foreign table without enable_parallel (defaults to false)
+ exTable = TableFactory.getPxfReadableCSVTable("pxf_parallel_disabled",
+ FIELDS, tablePath, ",");
+ exTable.setHost(pxfHost);
+ exTable.setPort(pxfPort);
+ gpdb.createTableAndVerify(exTable);
+
+ // Write single CSV file to HDFS (1 file, 100 rows) for single-fragment tests
+ singleFragDir = hdfs.getWorkingDirectory() + "/parallel_single_frag";
+ hdfs.createDirectory(singleFragDir);
+
+ Table singleTable = new Table("single_frag", null);
+ for (int id = 1; id <= SINGLE_FRAG_ROWS; id++) {
+ singleTable.addRow(new String[]{String.valueOf(id), "row_" + id});
+ }
+ hdfs.writeTableToFile(singleFragDir + "/data.csv", singleTable, ",");
+
+ String singleFragPath = protocol.getExternalTablePath(hdfs.getBasePath(), singleFragDir);
+
+ // Create foreign table for single-fragment with enable_parallel=true
+ exTable = TableFactory.getPxfReadableCSVTable("pxf_parallel_single_frag",
+ FIELDS, singleFragPath, ",");
+ exTable.setHost(pxfHost);
+ exTable.setPort(pxfPort);
+ exTable.setUserParameters(new String[]{"enable_parallel=true"});
+ gpdb.createTableAndVerify(exTable);
+ }
+
+ @Test(groups = {"features", "gpdb", "fdw"})
+ public void testParallelCorrectness() throws Exception {
+ runSqlTest("features/parallel/correctness");
+ }
+
+ @Test(groups = {"features", "gpdb", "fdw"})
+ public void testParallelExplain() throws Exception {
+ runSqlTest("features/parallel/explain");
+ }
+
+ @Test(groups = {"features", "gpdb", "fdw"})
+ public void testParallelSingleFragment() throws Exception {
+ runSqlTest("features/parallel/single_fragment");
+ }
+
+ @Test(groups = {"features", "gpdb", "fdw"})
+ public void testParallelRescan() throws Exception {
+ runSqlTest("features/parallel/rescan");
+ }
+}
diff --git a/fdw/pxf_bridge.c b/fdw/pxf_bridge.c
index 567665ac..e55598f3 100644
--- a/fdw/pxf_bridge.c
+++ b/fdw/pxf_bridge.c
@@ -23,6 +23,9 @@
#include "cdb/cdbtm.h"
#include "cdb/cdbvars.h"
+#include "miscadmin.h"
+#include "storage/lock.h"
+#include "utils/builtins.h"
/* helper function declarations */
static void BuildUriForRead(PxfFdwScanState *pxfsstate);
@@ -182,3 +185,275 @@ FillBuffer(PxfFdwScanState *pxfsstate, char *start, int minlen, int maxlen)
return ptr - start;
}
+
+/*
+ * ============================================================================
+ * Parallel Execution Support
+ * ============================================================================
+ */
+
+/*
+ * Build URI for fetching fragment list from PXF server
+ */
+static void
+BuildUriForFragments(PxfFdwScanState *pxfsstate)
+{
+ PxfOptions *options = pxfsstate->options;
+
+ resetStringInfo(&pxfsstate->uri);
+ appendStringInfo(&pxfsstate->uri, "http://%s:%d/%s/fragments",
+ options->pxf_host, options->pxf_port, PXF_SERVICE_PREFIX);
+ elog(DEBUG2, "pxf_fdw: uri %s for fragments", pxfsstate->uri.data);
+}
+
+/*
+ * PxfBridgeFetchFragments
+ * Fetch the list of fragments from PXF server.
+ *
+ * This function is called by the leader process to get the complete list
+ * of fragments that need to be processed. The fragments are stored in
+ * pxfsstate->fragments array.
+ *
+ * Returns the number of fragments fetched.
+ *
+ * Note: Currently this is a placeholder implementation. The actual
+ * implementation will need to:
+ * 1. Call the PXF /fragments endpoint
+ * 2. Parse the JSON response
+ * 3. Store fragment metadata in pxfsstate->fragments
+ */
+int
+PxfBridgeFetchFragments(PxfFdwScanState *pxfsstate)
+{
+ CHURL_HEADERS headers;
+ CHURL_HANDLE handle;
+ StringInfoData response;
+ char buffer[8192];
+ size_t bytes_read;
+ int num_fragments = 0;
+
+ elog(DEBUG3, "pxf_fdw: PxfBridgeFetchFragments starting");
+
+ /* Initialize headers */
+ headers = churl_headers_init();
+
+ /* Build URI for fragments endpoint */
+ BuildUriForFragments(pxfsstate);
+
+ /* Build HTTP headers */
+ BuildHttpHeaders(headers,
+ pxfsstate->options,
+ pxfsstate->relation,
+ pxfsstate->filter_str,
+ pxfsstate->retrieved_attrs,
+ pxfsstate->projectionInfo);
+
+ /* Add header to request JSON response */
+ churl_headers_append(headers, "Accept", "application/json");
+
+ /* Initialize download */
+ handle = churl_init_download(pxfsstate->uri.data, headers);
+
+ /* Read the response */
+ initStringInfo(&response);
+ while ((bytes_read = churl_read(handle, buffer, sizeof(buffer) - 1)) > 0)
+ {
+ buffer[bytes_read] = '\0';
+ appendStringInfoString(&response, buffer);
+ }
+
+ churl_read_check_connectivity(handle);
+
+ elog(DEBUG3, "pxf_fdw: fragments response length=%d", response.len);
+
+ /*
+ * TODO: Parse JSON response to extract fragment metadata.
+ * For now, we use a simplified approach where the server returns
+ * the number of fragments as a simple count.
+ *
+ * Expected JSON format:
+ * {
+ * "PXFFragments": [
+ * {"index": 0, "sourceName": "...", "metadata": "..."},
+ * {"index": 1, "sourceName": "...", "metadata": "..."},
+ * ...
+ * ]
+ * }
+ *
+ * For the initial implementation, we'll estimate based on response.
+ * A proper implementation would use a JSON parser.
+ */
+ if (response.len > 0)
+ {
+ /* Simple heuristic: count occurrences of "index" */
+ char *ptr = response.data;
+ while ((ptr = strstr(ptr, "\"index\"")) != NULL)
+ {
+ num_fragments++;
+ ptr++;
+ }
+
+ /* Allocate fragment array if we found any */
+ if (num_fragments > 0)
+ {
+ pxfsstate->fragments = (PxfFragmentData *)
+ palloc0(sizeof(PxfFragmentData) * num_fragments);
+ pxfsstate->num_fragments = num_fragments;
+
+ /* TODO: Actually parse and populate fragment data */
+ elog(DEBUG3, "pxf_fdw: found %d fragments", num_fragments);
+ }
+ }
+
+ /* Cleanup */
+ churl_cleanup(handle, false);
+ churl_headers_cleanup(headers);
+ pfree(response.data);
+
+ return num_fragments;
+}
+
+/*
+ * PxfBridgeGetNextFragment
+ * Get the next fragment index for this worker to process.
+ *
+ * This function atomically increments the next_fragment counter in the
+ * shared parallel state and returns the fragment index for this worker
+ * to process.
+ *
+ * Returns -1 if all fragments have been assigned.
+ */
+int
+PxfBridgeGetNextFragment(PxfParallelScanState *pstate)
+{
+ int logical_idx;
+
+ if (pstate == NULL)
+ return -1;
+
+ SpinLockAcquire(&pstate->mutex);
+
+ if (pstate->next_fragment >= pstate->total_fragments)
+ {
+ SpinLockRelease(&pstate->mutex);
+ return -1;
+ }
+ logical_idx = pstate->next_fragment++;
+
+ SpinLockRelease(&pstate->mutex);
+
+ /* Map logical → actual: the K-th fragment for segment S is at
+ * actual_index = S + K * seg_count (round-robin assignment) */
+ {
+ int seg_id = PXF_SEGMENT_ID;
+ int seg_count = PXF_SEGMENT_COUNT;
+ int actual_idx = seg_id + logical_idx * seg_count;
+
+ elog(DEBUG3, "pxf_fdw: segment %d: GetNextFragment logical=%d actual=%d",
+ seg_id, logical_idx, actual_idx);
+
+ return actual_idx;
+ }
+}
+
+/*
+ * PxfBridgeImportStartFragment
+ * Start import for a specific fragment in parallel mode.
+ *
+ * This is similar to PxfBridgeImportStart but includes the fragment
+ * index in the request headers so the PXF server knows which specific
+ * fragment to return data for.
+ */
+void
+PxfBridgeImportStartFragment(PxfFdwScanState *pxfsstate, int fragmentIndex)
+{
+ char fragment_idx_str[16];
+
+ elog(DEBUG3, "pxf_fdw: PxfBridgeImportStartFragment starting for fragment %d",
+ fragmentIndex);
+
+ pxfsstate->churl_headers = churl_headers_init();
+
+ BuildUriForRead(pxfsstate);
+ BuildHttpHeaders(pxfsstate->churl_headers,
+ pxfsstate->options,
+ pxfsstate->relation,
+ pxfsstate->filter_str,
+ pxfsstate->retrieved_attrs,
+ pxfsstate->projectionInfo);
+
+ /* Add fragment index header for parallel mode */
+ pg_ltoa(fragmentIndex, fragment_idx_str);
+ churl_headers_append(pxfsstate->churl_headers, "X-GP-FRAGMENT-INDEX", fragment_idx_str);
+
+ pxfsstate->churl_handle = churl_init_download(pxfsstate->uri.data, pxfsstate->churl_headers);
+
+ /* read some bytes to make sure the connection is established */
+ churl_read_check_connectivity(pxfsstate->churl_handle);
+
+ /* Update current fragment tracking */
+ pxfsstate->current_fragment = fragmentIndex;
+}
+
+/*
+ * ============================================================================
+ * Cloudberry Gang-Parallel Support (Virtual Segment ID)
+ *
+ * In Cloudberry/Greenplum, parallel execution uses "gang expansion" where
+ * multiple processes share the same physical segment ID. PostgreSQL's DSM
+ * callbacks (InitializeDSMForeignScan, InitializeWorkerForeignScan) are
+ * NOT invoked in this model.
+ *
+ * Instead of fragment-by-fragment coordination, we use "virtual segment IDs":
+ * each gang worker sends a unique virtual segment ID to PXF, so PXF's
+ * existing round-robin fragment distribution splits the data among workers
+ * automatically — no PXF server changes needed.
+ *
+ * Example: 3 physical segments × 4 workers = 12 virtual segments.
+ * Worker i on physical segment S sends virtual_seg_id = S + i * seg_count,
+ * with virtual_seg_count = seg_count * workers.
+ * ============================================================================
+ */
+
+/*
+ * PxfBridgeImportStartVirtual
+ * Start import with virtual segment ID for Cloudberry gang-parallel mode.
+ *
+ * Same as PxfBridgeImportStart, but after building the standard HTTP headers,
+ * overrides X-GP-SEGMENT-ID and X-GP-SEGMENT-COUNT with the virtual values.
+ * This makes PXF's round-robin assign a unique subset of fragments to each
+ * gang worker, eliminating data duplication.
+ */
+void
+PxfBridgeImportStartVirtual(PxfFdwScanState *pxfsstate,
+ int virtualSegId, int virtualSegCount)
+{
+ char seg_id_str[16];
+ char seg_count_str[16];
+
+ pxfsstate->churl_headers = churl_headers_init();
+
+ BuildUriForRead(pxfsstate);
+ BuildHttpHeaders(pxfsstate->churl_headers,
+ pxfsstate->options,
+ pxfsstate->relation,
+ pxfsstate->filter_str,
+ pxfsstate->retrieved_attrs,
+ pxfsstate->projectionInfo);
+
+ /* Override physical segment ID/count with virtual values */
+ pg_ltoa(virtualSegId, seg_id_str);
+ pg_ltoa(virtualSegCount, seg_count_str);
+ churl_headers_override(pxfsstate->churl_headers, "X-GP-SEGMENT-ID", seg_id_str);
+ churl_headers_override(pxfsstate->churl_headers, "X-GP-SEGMENT-COUNT", seg_count_str);
+
+ elog(DEBUG3, "pxf_fdw: PxfBridgeImportStartVirtual physical_seg=%d "
+ "virtual_seg_id=%d virtual_seg_count=%d",
+ PXF_SEGMENT_ID, virtualSegId, virtualSegCount);
+
+ pxfsstate->churl_handle = churl_init_download(pxfsstate->uri.data,
+ pxfsstate->churl_headers);
+
+ /* read some bytes to make sure the connection is established */
+ churl_read_check_connectivity(pxfsstate->churl_handle);
+}
diff --git a/fdw/pxf_bridge.h b/fdw/pxf_bridge.h
index e5e53610..8087c682 100644
--- a/fdw/pxf_bridge.h
+++ b/fdw/pxf_bridge.h
@@ -30,10 +30,35 @@
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
+#include "storage/spin.h"
#define PXF_SEGMENT_ID GpIdentity.segindex
#define PXF_SEGMENT_COUNT getgpsegmentCount()
+/*
+ * Fragment metadata for parallel execution.
+ * Stored in local memory, not in shared memory.
+ */
+typedef struct PxfFragmentData
+{
+ char *source_name; /* fragment source name (e.g. file path) */
+ int index; /* fragment index */
+ char *metadata; /* fragment metadata (base64 encoded) */
+ char *profile; /* optional profile override */
+} PxfFragmentData;
+
+/*
+ * Shared state for parallel foreign scan.
+ * This structure is stored in DSM (dynamic shared memory).
+ */
+typedef struct PxfParallelScanState
+{
+ slock_t mutex; /* mutex for accessing shared state */
+ int total_fragments; /* total number of fragments */
+ int next_fragment; /* next fragment index to be assigned */
+ bool finished; /* true if all fragments have been processed */
+} PxfParallelScanState;
+
/*
* Execution state of a foreign scan using pxf_fdw.
*/
@@ -49,6 +74,26 @@ typedef struct PxfFdwScanState
PxfOptions *options;
CopyFromState cstate;
ProjectionInfo *projectionInfo;
+
+ /* Parallel execution state (PG DSM-based) */
+ bool is_parallel; /* true if running in DSM parallel mode */
+ PxfParallelScanState *pstate; /* pointer to shared state in DSM */
+ PxfFragmentData *fragments; /* array of fragment metadata */
+ int num_fragments; /* total number of fragments */
+ int current_fragment; /* current fragment being processed */
+
+ /*
+ * Cloudberry gang-parallel state (virtual segment ID based).
+ * Used when Cloudberry's gang expansion creates multiple processes per
+ * segment but PG's DSM callbacks are not invoked. Each gang worker gets
+ * a unique virtual segment ID so PXF's round-robin distributes fragments
+ * evenly without data duplication.
+ */
+ bool gang_parallel; /* true when using virtual segment IDs */
+ bool plan_parallel_aware; /* plan node's parallel_aware flag */
+ int worker_index; /* this worker's index within the segment gang */
+ int virtual_seg_id; /* virtual segment ID sent to PXF */
+ int virtual_seg_count; /* virtual segment count sent to PXF */
} PxfFdwScanState;
/*
@@ -80,4 +125,21 @@ int PxfBridgeRead(void *outbuf, int minlen, int maxlen, void *extra);
/* Writes data from the given buffer of a given size to the PXF server */
int PxfBridgeWrite(PxfFdwModifyState *context, char *databuf, int datalen);
+/* Parallel execution support */
+
+/* Fetch fragment list from PXF server */
+int PxfBridgeFetchFragments(PxfFdwScanState *pxfsstate);
+
+/* Get the next fragment index for this worker (thread-safe) */
+int PxfBridgeGetNextFragment(PxfParallelScanState *pstate);
+
+/* Start import for a specific fragment in parallel mode */
+void PxfBridgeImportStartFragment(PxfFdwScanState *pxfsstate, int fragmentIndex);
+
+/* Cloudberry gang-parallel support (virtual segment ID based) */
+
+/* Start import with virtual segment ID for gang-parallel mode */
+void PxfBridgeImportStartVirtual(PxfFdwScanState *pxfsstate,
+ int virtualSegId, int virtualSegCount);
+
#endif /* _PXFBRIDGE_H */
diff --git a/fdw/pxf_fdw.c b/fdw/pxf_fdw.c
index 645a1abc..019ec14c 100644
--- a/fdw/pxf_fdw.c
+++ b/fdw/pxf_fdw.c
@@ -32,11 +32,18 @@
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/guc.h"
+#include "access/parallel.h"
+#include "miscadmin.h"
+#include "storage/lock.h"
+#include "storage/shm_toc.h"
PG_MODULE_MAGIC;
#define DEFAULT_PXF_FDW_STARTUP_COST 50000
+extern int max_parallel_workers_per_gather;
+
/*
* Error token embedded in the data sent by PXF as part of an error row
*/
@@ -87,12 +94,26 @@ static int pxfIsForeignRelUpdatable(Relation rel);
static PxfFdwModifyState *InitForeignModify(Relation relation);
static void FinishForeignModify(PxfFdwModifyState *pxfmstate);
static void InitCopyState(PxfFdwScanState *pxfsstate);
+static void CleanupCurrentFragment(PxfFdwScanState *pxfsstate);
+static void InitCopyStateForFragment(PxfFdwScanState *pxfsstate, int fragmentIndex);
static void InitCopyStateForModify(PxfFdwModifyState *pxfmstate);
static CopyToState BeginCopyToModify(Relation forrel, List *options);
static void EndCopyModify(CopyToState cstate);
static void PxfBeginScanErrorCallback(void *arg);
static void PxfCopyFromErrorCallback(void *arg);
+/* Parallel scan support */
+static bool pxfIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte);
+static Size pxfEstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt);
+static void pxfInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt, void *coordinate);
+static void pxfReInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt, void *coordinate);
+static void pxfInitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc, void *coordinate);
+static void pxfShutdownForeignScan(ForeignScanState *node);
+
+/* Cloudberry gang-parallel support */
+static void InitGangParallelState(PxfFdwScanState *pxfsstate);
+static void InitCopyStateVirtual(PxfFdwScanState *pxfsstate);
+
/*
* Foreign-data wrapper handler functions:
* returns a struct with pointers to the
@@ -148,6 +169,17 @@ pxf_fdw_handler(PG_FUNCTION_ARGS)
fdw_routine->EndForeignModify = pxfEndForeignModify;
fdw_routine->IsForeignRelUpdatable = pxfIsForeignRelUpdatable;
+ /*
+ * Parallel scan support.
+ * These callbacks enable parallel foreign scans when enable_parallel option is set.
+ */
+ fdw_routine->IsForeignScanParallelSafe = pxfIsForeignScanParallelSafe;
+ fdw_routine->EstimateDSMForeignScan = pxfEstimateDSMForeignScan;
+ fdw_routine->InitializeDSMForeignScan = pxfInitializeDSMForeignScan;
+ fdw_routine->ReInitializeDSMForeignScan = pxfReInitializeDSMForeignScan;
+ fdw_routine->InitializeWorkerForeignScan = pxfInitializeWorkerForeignScan;
+ fdw_routine->ShutdownForeignScan = pxfShutdownForeignScan;
+
PG_RETURN_POINTER(fdw_routine);
}
@@ -248,29 +280,47 @@ pxfGetForeignPaths(PlannerInfo *root,
RelOptInfo *baserel,
Oid foreigntableid)
{
- ForeignPath *path = NULL;
- int total_cost = DEFAULT_PXF_FDW_STARTUP_COST;
PxfFdwRelationInfo *fpinfo = (PxfFdwRelationInfo *) baserel->fdw_private;
-
+ PxfOptions *options = PxfGetOptions(foreigntableid);
+ Cost startup = DEFAULT_PXF_FDW_STARTUP_COST;
+ Cost run_cost = 1000; /* data transfer cost estimate */
+ Cost total = startup + run_cost;
elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths starts on segment: %d", PXF_SEGMENT_ID);
- path = create_foreignscan_path(root, baserel,
- NULL, /* default pathtarget */
- baserel->rows,
- DEFAULT_PXF_FDW_STARTUP_COST,
- total_cost,
- NIL, /* no pathkeys */
- NULL, /* no outer rel either */
- NULL, /* no extra plan */
- fpinfo->retrieved_attrs);
-
+ /* Path 1: non-parallel (always, as fallback) */
+ add_path(baserel,
+ (Path *) create_foreignscan_path(root, baserel, NULL,
+ baserel->rows, startup, total,
+ NIL, NULL, NULL,
+ fpinfo->retrieved_attrs),
+ root);
+ /* Path 2: parallel partial (only if enabled and planner allows) */
+ if (options->enable_parallel && baserel->consider_parallel)
+ {
+ int workers = max_parallel_workers_per_gather;
- /*
- * Create a ForeignPath node and add it as only possible path.
- */
- add_path(baserel, (Path *) path, root);
+ if (workers > 0)
+ {
+ ForeignPath *pp;
+
+ pp = create_foreignscan_path(root, baserel, NULL,
+ baserel->rows / workers, /* per-worker rows */
+ startup,
+ startup + run_cost / workers, /* per-worker cost */
+ NIL, NULL, NULL,
+ fpinfo->retrieved_attrs);
+ pp->path.parallel_safe = true;
+ pp->path.parallel_aware = true;
+ pp->path.parallel_workers = workers;
+ pp->path.locus.parallel_workers = workers;
+
+ add_partial_path(baserel, (Path *) pp);
+
+ elog(DEBUG3, "pxf_fdw: parallel partial path added, workers=%d", workers);
+ }
+ }
elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths ends on segment: %d", PXF_SEGMENT_ID);
}
@@ -382,7 +432,7 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags)
* Save state in node->fdw_state. We must save enough information to call
* BeginCopyFrom() again.
*/
- pxfsstate = (PxfFdwScanState *) palloc(sizeof(PxfFdwScanState));
+ pxfsstate = (PxfFdwScanState *) palloc0(sizeof(PxfFdwScanState));
initStringInfo(&pxfsstate->uri);
pxfsstate->filter_str = filter_str;
@@ -392,18 +442,42 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags)
pxfsstate->retrieved_attrs = retrieved_attrs;
pxfsstate->projectionInfo = node->ss.ps.ps_ProjInfo;
- /* Set up callback to identify error foreign relation. */
- ErrorContextCallback errcallback;
- errcallback.callback = PxfBeginScanErrorCallback;
- errcallback.arg = (void *) pxfsstate;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
+ /* Initialize parallel scan fields */
+ pxfsstate->is_parallel = false;
+ pxfsstate->pstate = NULL;
+ pxfsstate->fragments = NULL;
+ pxfsstate->num_fragments = 0;
+ pxfsstate->current_fragment = -1;
+
+ /* Initialize gang-parallel fields */
+ pxfsstate->gang_parallel = false;
+ pxfsstate->worker_index = -1;
+ pxfsstate->virtual_seg_id = -1;
+ pxfsstate->virtual_seg_count = -1;
+
+ /* Store plan's parallel_aware flag for gang-parallel detection */
+ pxfsstate->plan_parallel_aware = foreignScan->scan.plan.parallel_aware;
- InitCopyState(pxfsstate);
node->fdw_state = (void *) pxfsstate;
- /* Restore the previous error callback */
- error_context_stack = errcallback.previous;
+ /*
+ * In parallel mode, defer connection setup to IterateForeignScan where
+ * fragments are assigned. In non-parallel mode, initialize immediately.
+ */
+ if (!options->enable_parallel)
+ {
+ /* Set up callback to identify error foreign relation. */
+ ErrorContextCallback errcallback;
+ errcallback.callback = PxfBeginScanErrorCallback;
+ errcallback.arg = (void *) pxfsstate;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ InitCopyState(pxfsstate);
+
+ /* Restore the previous error callback */
+ error_context_stack = errcallback.previous;
+ }
elog(DEBUG5, "pxf_fdw: pxfBeginForeignScan ends on segment: %d", PXF_SEGMENT_ID);
}
@@ -426,54 +500,148 @@ pxfIterateForeignScan(ForeignScanState *node)
ErrorContextCallback errcallback;
bool found;
- /* Set up callback to identify error line number. */
- errcallback.callback = PxfCopyFromErrorCallback;
- errcallback.arg = (void *) pxfsstate;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
-
- /*
- * The protocol for loading a virtual tuple into a slot is first
- * ExecClearTuple, then fill the values/isnull arrays, then
- * ExecStoreVirtualTuple. If we don't find another row in the file, we
- * just skip the last step, leaving the slot empty as required.
- *
- * We can pass ExprContext = NULL because we read all columns from the
- * file, so no need to evaluate default expressions.
- *
- * We can also pass tupleOid = NULL because we don't allow oids for
- * foreign tables.
- */
ExecClearTuple(slot);
- found = NextCopyFrom(pxfsstate->cstate,
- NULL,
- slot->tts_values,
- slot->tts_isnull);
-
- if (found)
+ if (pxfsstate->is_parallel)
+ {
+ /*
+ * PG DSM-based parallel mode: iterate over fragments one at a time.
+ * Each worker claims fragments via shared spinlock counter.
+ */
+ for (;;)
+ {
+ /* If no active connection, claim the next fragment */
+ if (pxfsstate->cstate == NULL)
+ {
+ MemoryContext oldcxt;
+ int fragmentIndex;
+
+ fragmentIndex = PxfBridgeGetNextFragment(pxfsstate->pstate);
+ if (fragmentIndex < 0)
+ return slot; /* EOF — no more fragments */
+
+ oldcxt = MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt);
+ InitCopyStateForFragment(pxfsstate, fragmentIndex);
+ MemoryContextSwitchTo(oldcxt);
+ }
+
+ pxfsstate->cstate->cur_attname = NULL;
+ pxfsstate->cstate->cur_attval = NULL;
+
+ errcallback.callback = PxfCopyFromErrorCallback;
+ errcallback.arg = (void *) pxfsstate;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ found = NextCopyFrom(pxfsstate->cstate,
+ NULL,
+ slot->tts_values,
+ slot->tts_isnull);
+
+ error_context_stack = errcallback.previous;
+
+ if (found)
+ {
+ if (pxfsstate->cstate->cdbsreh)
+ pxfsstate->cstate->cdbsreh->processed++;
+
+ ExecStoreVirtualTuple(slot);
+ return slot;
+ }
+
+ /* Current fragment exhausted, clean up and try next */
+ CleanupCurrentFragment(pxfsstate);
+ }
+ }
+ else
{
- if (pxfsstate->cstate->cdbsreh)
+ /*
+ * Non-parallel or Cloudberry gang-parallel mode.
+ *
+ * Both use a single PXF connection — the difference is only in
+ * the HTTP headers:
+ * - Non-parallel: physical segment ID / count
+ * - Gang-parallel: virtual segment ID / count (splits work among
+ * gang workers so each reads a unique subset of fragments)
+ */
+ if (pxfsstate->cstate == NULL)
{
+ MemoryContext oldcxt;
+ ErrorContextCallback begincb;
+
/*
- * If NextCopyFrom failed, the processed row count will have
- * already been updated, but we need to update it in a successful
- * case.
- *
- * GPDB_91_MERGE_FIXME: this is almost certainly not the right
- * place for this, but row counts are currently scattered all over
- * the place. Consolidate.
+ * Detect Cloudberry gang-parallel case: enable_parallel is set
+ * and the planner chose a parallel path, but PG DSM callbacks
+ * were not invoked (Cloudberry gang expansion).
*/
- pxfsstate->cstate->cdbsreh->processed++;
+ if (!pxfsstate->gang_parallel &&
+ pxfsstate->options->enable_parallel &&
+ pxfsstate->plan_parallel_aware)
+ {
+ elog(DEBUG1, "pxf_fdw: segment %d activating gang-parallel mode "
+ "(virtual segment IDs)",
+ PXF_SEGMENT_ID);
+
+ oldcxt = MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt);
+ InitGangParallelState(pxfsstate);
+ MemoryContextSwitchTo(oldcxt);
+ }
+
+ begincb.callback = PxfBeginScanErrorCallback;
+ begincb.arg = (void *) pxfsstate;
+ begincb.previous = error_context_stack;
+ error_context_stack = &begincb;
+
+ oldcxt = MemoryContextSwitchTo(node->ss.ps.state->es_query_cxt);
+
+ if (pxfsstate->gang_parallel)
+ {
+ if (pxfsstate->virtual_seg_id < 0)
+ {
+ /*
+ * This gang worker could not claim a worker_index
+ * (more processes than expected). Return empty
+ * results to avoid data duplication.
+ */
+ MemoryContextSwitchTo(oldcxt);
+ error_context_stack = begincb.previous;
+ return slot;
+ }
+ InitCopyStateVirtual(pxfsstate);
+ }
+ else
+ InitCopyState(pxfsstate);
+
+ MemoryContextSwitchTo(oldcxt);
+ error_context_stack = begincb.previous;
}
- ExecStoreVirtualTuple(slot);
- }
+ /* Reset error-context fields before each read */
+ pxfsstate->cstate->cur_attname = NULL;
+ pxfsstate->cstate->cur_attval = NULL;
- /* Remove error callback. */
- error_context_stack = errcallback.previous;
+ errcallback.callback = PxfCopyFromErrorCallback;
+ errcallback.arg = (void *) pxfsstate;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
- return slot;
+ found = NextCopyFrom(pxfsstate->cstate,
+ NULL,
+ slot->tts_values,
+ slot->tts_isnull);
+
+ if (found)
+ {
+ if (pxfsstate->cstate->cdbsreh)
+ pxfsstate->cstate->cdbsreh->processed++;
+
+ ExecStoreVirtualTuple(slot);
+ }
+
+ error_context_stack = errcallback.previous;
+
+ return slot;
+ }
}
/*
@@ -487,8 +655,29 @@ pxfReScanForeignScan(ForeignScanState *node)
PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state;
- EndCopyFrom(pxfsstate->cstate);
- InitCopyState(pxfsstate);
+ if (pxfsstate->is_parallel)
+ {
+ /* DSM parallel: tear down current fragment connection if any.
+ * Shared state reset is handled by ReInitializeDSM. */
+ CleanupCurrentFragment(pxfsstate);
+ }
+ else if (pxfsstate->gang_parallel)
+ {
+ /* Gang-parallel: single connection with virtual segment ID.
+ * If virtual_seg_id < 0, this worker produces no data — nothing to do. */
+ if (pxfsstate->virtual_seg_id < 0)
+ return;
+ if (pxfsstate->cstate != NULL)
+ EndCopyFrom(pxfsstate->cstate);
+ InitCopyStateVirtual(pxfsstate);
+ }
+ else
+ {
+ /* Non-parallel: original code path */
+ if (pxfsstate->cstate != NULL)
+ EndCopyFrom(pxfsstate->cstate);
+ InitCopyState(pxfsstate);
+ }
elog(DEBUG5, "pxf_fdw: pxfReScanForeignScan ends on segment: %d", PXF_SEGMENT_ID);
}
@@ -502,19 +691,86 @@ pxfEndForeignScan(ForeignScanState *node)
{
elog(DEBUG5, "pxf_fdw: pxfEndForeignScan starts on segment: %d", PXF_SEGMENT_ID);
- ForeignScan *foreignScan = (ForeignScan *) node->ss.ps.plan;
PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state;
- /* Release resources */
- if (foreignScan->fdw_private)
- {
- elog(DEBUG5, "Freeing fdw_private");
- pfree(foreignScan->fdw_private);
- }
-
/* if pxfsstate is NULL, we are in EXPLAIN; nothing to do */
if (pxfsstate)
- EndCopyFrom(pxfsstate->cstate);
+ {
+ if (pxfsstate->is_parallel)
+ {
+ /* DSM parallel: tear down any active fragment connection */
+ CleanupCurrentFragment(pxfsstate);
+
+ /* Free fragment array */
+ if (pxfsstate->fragments != NULL)
+ {
+ pfree(pxfsstate->fragments);
+ pxfsstate->fragments = NULL;
+ }
+ }
+ else if (pxfsstate->gang_parallel)
+ {
+ /* Gang-parallel: same cleanup as non-parallel (single connection),
+ * plus release the worker_index advisory lock. */
+ if (pxfsstate->churl_handle != NULL)
+ {
+ churl_cleanup(pxfsstate->churl_handle, false);
+ pxfsstate->churl_handle = NULL;
+ }
+
+ if (pxfsstate->churl_headers != NULL)
+ {
+ churl_headers_cleanup(pxfsstate->churl_headers);
+ pxfsstate->churl_headers = NULL;
+ }
+
+ if (pxfsstate->cstate != NULL)
+ {
+ EndCopyFrom(pxfsstate->cstate);
+ pxfsstate->cstate = NULL;
+ }
+
+ /* Release advisory lock for worker_index */
+ if (pxfsstate->worker_index >= 0)
+ {
+ LOCKTAG tag;
+
+ SET_LOCKTAG_ADVISORY(tag,
+ MyDatabaseId,
+ (uint32) gp_session_id,
+ (uint32) PXF_SEGMENT_ID,
+ (uint32) pxfsstate->worker_index);
+ LockRelease(&tag, ExclusiveLock, true);
+ }
+ }
+ else
+ {
+ /* Non-parallel: clean up churl resources and CopyFromState */
+ if (pxfsstate->churl_handle != NULL)
+ {
+ churl_cleanup(pxfsstate->churl_handle, false);
+ pxfsstate->churl_handle = NULL;
+ }
+
+ if (pxfsstate->churl_headers != NULL)
+ {
+ churl_headers_cleanup(pxfsstate->churl_headers);
+ pxfsstate->churl_headers = NULL;
+ }
+
+ if (pxfsstate->cstate != NULL)
+ {
+ EndCopyFrom(pxfsstate->cstate);
+ pxfsstate->cstate = NULL;
+ }
+ }
+
+ /* Free the URI buffer */
+ pfree(pxfsstate->uri.data);
+
+ pfree(pxfsstate);
+ node->fdw_state = NULL;
+ }
elog(DEBUG5, "pxf_fdw: pxfEndForeignScan ends on segment: %d", PXF_SEGMENT_ID);
}
@@ -777,6 +1033,264 @@ InitCopyState(PxfFdwScanState *pxfsstate)
pxfsstate->cstate = cstate;
}
+/*
+ * InitCopyStateVirtual
+ * Gang-parallel equivalent of InitCopyState().
+ *
+ * Identical to InitCopyState except it calls PxfBridgeImportStartVirtual()
+ * with the pre-computed virtual_seg_id and virtual_seg_count so that PXF
+ * distributes fragments to this gang worker using the virtual segment scheme.
+ */
+static void
+InitCopyStateVirtual(PxfFdwScanState *pxfsstate)
+{
+ CopyFromState cstate;
+
+ PxfBridgeImportStartVirtual(pxfsstate,
+ pxfsstate->virtual_seg_id,
+ pxfsstate->virtual_seg_count);
+
+ cstate = BeginCopyFrom(
+ NULL,
+ pxfsstate->relation,
+ NULL,
+ NULL,
+ false, /* is_program */
+ &PxfBridgeRead, /* data_source_cb */
+ pxfsstate, /* data_source_cb_extra */
+ NIL, /* attnamelist */
+ pxfsstate->options->copy_options /* copy options */
+ );
+
+ if (pxfsstate->options->reject_limit == -1)
+ {
+ cstate->cdbsreh = NULL;
+ cstate->errMode = ALL_OR_NOTHING;
+ }
+ else
+ {
+ cstate->errMode = SREH_IGNORE;
+
+ if (pxfsstate->options->log_errors)
+ cstate->errMode = SREH_LOG;
+
+ cstate->cdbsreh = makeCdbSreh(pxfsstate->options->reject_limit,
+ pxfsstate->options->is_reject_limit_rows,
+ pxfsstate->options->resource,
+ (char *) cstate->cur_relname,
+ pxfsstate->options->log_errors ? LOG_ERRORS_ENABLE : LOG_ERRORS_DISABLE);
+
+ cstate->cdbsreh->relid = RelationGetRelid(pxfsstate->relation);
+ }
+
+ cstate->fe_msgbuf = makeStringInfo();
+
+ cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "PxfFdwMemCxt",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ pxfsstate->cstate = cstate;
+}
+
+/*
+ * CleanupCurrentFragment
+ * Safely tears down the churl connection and CopyFromState for the
+ * current fragment in parallel mode. Handles all NULL cases gracefully.
+ */
+static void
+CleanupCurrentFragment(PxfFdwScanState *pxfsstate)
+{
+ if (pxfsstate == NULL)
+ return;
+
+ if (pxfsstate->churl_handle != NULL)
+ {
+ churl_cleanup(pxfsstate->churl_handle, false);
+ pxfsstate->churl_handle = NULL;
+ }
+
+ if (pxfsstate->churl_headers != NULL)
+ {
+ churl_headers_cleanup(pxfsstate->churl_headers);
+ pxfsstate->churl_headers = NULL;
+ }
+
+ if (pxfsstate->cstate != NULL)
+ {
+ EndCopyFrom(pxfsstate->cstate);
+ pxfsstate->cstate = NULL;
+ }
+
+ pxfsstate->current_fragment = -1;
+}
+
+/*
+ * InitGangParallelState
+ * Initialize gang-parallel state for Cloudberry's gang expansion model.
+ *
+ * Called on first IterateForeignScan when we detect that enable_parallel=true
+ * and the plan is parallel_aware, but PG's DSM callbacks were not invoked
+ * (which is the case in Cloudberry's gang-based parallelism).
+ *
+ * Uses advisory locks to claim a unique worker_index (0..workers-1) within
+ * the same physical segment, then computes virtual segment IDs:
+ * virtual_seg_id = physical_seg_id + worker_index * physical_seg_count
+ * virtual_seg_count = physical_seg_count * num_workers
+ *
+ * PXF's existing round-robin (fragment % seg_count == seg_id) then
+ * automatically distributes a unique subset of fragments to each gang worker,
+ * eliminating data duplication without any PXF server changes.
+ */
+static void
+InitGangParallelState(PxfFdwScanState *pxfsstate)
+{
+ int seg_id = PXF_SEGMENT_ID;
+ int seg_count = PXF_SEGMENT_COUNT;
+ int num_workers = max_parallel_workers_per_gather;
+ int worker_index = -1;
+ int i;
+
+ elog(DEBUG3, "pxf_fdw: InitGangParallelState segment %d "
+ "(total segments=%d, max_parallel_workers=%d)",
+ seg_id, seg_count, num_workers);
+
+ if (num_workers <= 0)
+ num_workers = 1;
+
+ /*
+ * Claim a unique worker_index using advisory locks.
+ *
+ * Lock key: (gp_session_id, PXF_SEGMENT_ID, worker_index)
+ * All gang workers on the same physical segment in the same session
+ * compete for indices 0..num_workers-1. pg_try_advisory_lock returns
+ * true if the lock was granted (no other worker claimed this index).
+ */
+ for (i = 0; i < num_workers; i++)
+ {
+ LOCKTAG tag;
+ LockAcquireResult res;
+
+ SET_LOCKTAG_ADVISORY(tag,
+ MyDatabaseId,
+ (uint32) gp_session_id,
+ (uint32) seg_id,
+ (uint32) i);
+
+ res = LockAcquire(&tag, ExclusiveLock, true, true);
+ if (res != LOCKACQUIRE_NOT_AVAIL)
+ {
+ worker_index = i;
+ break;
+ }
+ }
+
+ /*
+ * Mark gang_parallel regardless of whether we claimed an index.
+ * This prevents falling back to non-parallel mode (which would read
+ * duplicate data using physical segment IDs).
+ */
+ pxfsstate->gang_parallel = true;
+
+ if (worker_index < 0)
+ {
+ /*
+ * Could not claim any worker_index — more gang members than
+ * expected (e.g., leader + workers vs workers only).
+ * Mark virtual_seg_id = -1 so IterateForeignScan produces
+ * zero rows for this process, avoiding data duplication.
+ */
+ pxfsstate->worker_index = -1;
+ pxfsstate->virtual_seg_id = -1;
+ pxfsstate->virtual_seg_count = -1;
+
+ elog(DEBUG1, "pxf_fdw: segment %d gang-parallel: no worker_index "
+ "available (workers=%d), will produce zero rows",
+ seg_id, num_workers);
+ return;
+ }
+
+ pxfsstate->worker_index = worker_index;
+ pxfsstate->virtual_seg_id = seg_id + worker_index * seg_count;
+ pxfsstate->virtual_seg_count = seg_count * num_workers;
+
+ elog(DEBUG1, "pxf_fdw: segment %d gang-parallel: worker_index=%d "
+ "virtual_seg_id=%d virtual_seg_count=%d",
+ seg_id, worker_index,
+ pxfsstate->virtual_seg_id, pxfsstate->virtual_seg_count);
+}
+
+/*
+ * InitCopyStateForFragment
+ * Parallel equivalent of InitCopyState(). Opens a connection to PXF
+ * for a specific fragment and sets up BeginCopyFrom + SREH + rowcontext.
+ */
+static void
+InitCopyStateForFragment(PxfFdwScanState *pxfsstate, int fragmentIndex)
+{
+ CopyFromState cstate;
+
+ PxfBridgeImportStartFragment(pxfsstate, fragmentIndex);
+
+ /*
+ * Create CopyState from FDW options. We always acquire all columns, so
+ * as to match the expected ScanTupleSlot signature.
+ */
+ cstate = BeginCopyFrom(
+ NULL,
+ pxfsstate->relation,
+ NULL,
+ NULL,
+ false, /* is_program */
+ &PxfBridgeRead, /* data_source_cb */
+ pxfsstate, /* data_source_cb_extra */
+ NIL, /* attnamelist */
+ pxfsstate->options->copy_options /* copy options */
+ );
+
+ if (pxfsstate->options->reject_limit == -1)
+ {
+ /* Default error handling - "all-or-nothing" */
+ cstate->cdbsreh = NULL;
+ cstate->errMode = ALL_OR_NOTHING;
+ }
+ else
+ {
+ /* no error log by default */
+ cstate->errMode = SREH_IGNORE;
+
+ /* select the SREH mode */
+ if (pxfsstate->options->log_errors)
+ cstate->errMode = SREH_LOG;
+
+ cstate->cdbsreh = makeCdbSreh(pxfsstate->options->reject_limit,
+ pxfsstate->options->is_reject_limit_rows,
+ pxfsstate->options->resource,
+ (char *) cstate->cur_relname,
+ pxfsstate->options->log_errors ? LOG_ERRORS_ENABLE : LOG_ERRORS_DISABLE);
+
+ cstate->cdbsreh->relid = RelationGetRelid(pxfsstate->relation);
+ }
+
+ /* and 'fe_mgbuf' */
+ cstate->fe_msgbuf = makeStringInfo();
+
+ /*
+ * Create a temporary memory context that we can reset once per row to
+ * recover palloc'd memory. This avoids any problems with leaks inside
+ * datatype input or output routines, and should be faster than retail
+ * pfree's anyway.
+ */
+ cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "PxfFdwMemCxt",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ pxfsstate->cstate = cstate;
+}
+
/*
* Initiates a copy state for pxfBeginForeignModify()
*/
@@ -915,22 +1429,43 @@ void
PxfCopyFromErrorCallback(void *arg)
{
PxfFdwScanState *pxfsstate = (PxfFdwScanState *) arg;
- CopyFromState cstate = pxfsstate->cstate;
+ CopyFromState cstate;
char curlineno_str[32];
+ if (pxfsstate == NULL)
+ return;
+
+ /*
+ * Derive relname and resource from pxfsstate fields that are set once
+ * in BeginForeignScan and maintained by the executor — these are always
+ * valid. We deliberately avoid cstate->cur_relname because cstate
+ * internals can be unreliable when PXF returns error data that corrupts
+ * the COPY parser state.
+ */
+ const char *relname = pxfsstate->relation
+ ? RelationGetRelationName(pxfsstate->relation)
+ : "(unknown)";
+ const char *resource = (pxfsstate->options && pxfsstate->options->resource)
+ ? pxfsstate->options->resource : "(unknown)";
+
+ cstate = pxfsstate->cstate;
+ if (cstate == NULL)
+ {
+ errcontext("Foreign table %s, resource %s", relname, resource);
+ return;
+ }
+
snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
cstate->cur_lineno);
if (cstate->opts.binary)
{
- /* can't usefully display the data */
- if (cstate->cur_attname)
- errcontext("Foreign table %s, record %s of %s, column %s",
- cstate->cur_relname, curlineno_str, pxfsstate->options->resource,
- cstate->cur_attname);
- else
- errcontext("Foreign table %s, record %s of %s",
- cstate->cur_relname, curlineno_str, pxfsstate->options->resource);
+ /*
+ * PXF does not support binary format. If we land here, cstate
+ * internals may be unreliable — report only safe information.
+ */
+ errcontext("Foreign table %s, record %s of %s",
+ relname, curlineno_str, resource);
}
else
{
@@ -941,7 +1476,7 @@ PxfCopyFromErrorCallback(void *arg)
attval = limit_printout_length(cstate->cur_attval);
errcontext("Foreign table %s, record %s of %s, column %s: \"%s\"",
- cstate->cur_relname, curlineno_str, pxfsstate->options->resource,
+ relname, curlineno_str, resource,
cstate->cur_attname, attval);
pfree(attval);
}
@@ -949,7 +1484,7 @@ PxfCopyFromErrorCallback(void *arg)
{
/* error is relevant to a particular column, value is NULL */
errcontext("Foreign table %s, record %s of %s, column %s: null input",
- cstate->cur_relname, curlineno_str, pxfsstate->options->resource,
+ relname, curlineno_str, resource,
cstate->cur_attname);
}
else
@@ -969,7 +1504,7 @@ PxfCopyFromErrorCallback(void *arg)
/* token was found, get the actual message and set it as the main error message */
errmsg("%s", token_index + PXF_ERROR_TOKEN_SIZE);
errcontext("Foreign table %s, record %s of %s",
- cstate->cur_relname, curlineno_str, pxfsstate->options->resource);
+ relname, curlineno_str, resource);
}
/*
* Error is relevant to a particular line.
@@ -988,7 +1523,7 @@ PxfCopyFromErrorCallback(void *arg)
lineval = limit_printout_length(cstate->line_buf.data);
//truncateEolStr(line_buf, cstate->eol_type); <-- this is done in GP6, but not in GP7 ?
errcontext("Foreign table %s, record %s of %s: \"%s\"",
- cstate->cur_relname, curlineno_str, pxfsstate->options->resource, lineval);
+ relname, curlineno_str, resource, lineval);
pfree(lineval);
}
else
@@ -1002,8 +1537,206 @@ PxfCopyFromErrorCallback(void *arg)
* and just report the line number.
*/
errcontext("Foreign table %s, record %s of %s",
- cstate->cur_relname, curlineno_str, pxfsstate->options->resource);
+ relname, curlineno_str, resource);
}
}
}
}
+
+/*
+ * ============================================================================
+ * Parallel Foreign Scan Support
+ * ============================================================================
+ */
+
+/*
+ * pxfIsForeignScanParallelSafe
+ * Determine whether a foreign scan is parallel safe.
+ *
+ * Returns true if enable_parallel option is set for the foreign table,
+ * allowing the planner to consider parallel execution.
+ */
+static bool
+pxfIsForeignScanParallelSafe(PlannerInfo *root,
+ RelOptInfo *rel,
+ RangeTblEntry *rte)
+{
+ PxfOptions *options = PxfGetOptions(rte->relid);
+
+ elog(DEBUG3, "pxf_fdw: pxfIsForeignScanParallelSafe called, enable_parallel=%d",
+ options->enable_parallel);
+
+ /*
+ * Only return true if parallel execution is explicitly enabled.
+ * This ensures backward compatibility - existing queries continue
+ * to work without parallel execution unless explicitly enabled.
+ */
+ return options->enable_parallel;
+}
+
+/*
+ * pxfEstimateDSMForeignScan
+ * Estimate the amount of dynamic shared memory required for parallel scan.
+ *
+ * We need space for the PxfParallelScanState structure which tracks
+ * fragment distribution among parallel workers.
+ */
+static Size
+pxfEstimateDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt)
+{
+ Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation);
+ PxfOptions *options = PxfGetOptions(foreigntableid);
+
+ if (!options->enable_parallel)
+ return 0;
+
+ elog(DEBUG3, "pxf_fdw: pxfEstimateDSMForeignScan returning %zu bytes",
+ sizeof(PxfParallelScanState));
+
+ return sizeof(PxfParallelScanState);
+}
+
+/*
+ * pxfInitializeDSMForeignScan
+ * Initialize dynamic shared memory for parallel scan (leader process).
+ *
+ * The leader process fetches the list of fragments from PXF server and
+ * initializes the shared state for fragment distribution.
+ */
+static void
+pxfInitializeDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate)
+{
+ PxfParallelScanState *pstate = (PxfParallelScanState *) coordinate;
+ PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state;
+ Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation);
+ PxfOptions *options = PxfGetOptions(foreigntableid);
+
+ if (!options->enable_parallel)
+ return;
+
+ elog(DEBUG3, "pxf_fdw: pxfInitializeDSMForeignScan initializing parallel state");
+
+ /* Initialize the spinlock */
+ SpinLockInit(&pstate->mutex);
+
+ /*
+ * Fetch fragment list from PXF server.
+ * This populates pxfsstate->fragments and pxfsstate->num_fragments.
+ */
+ if (pxfsstate != NULL)
+ {
+ int all_fragments = PxfBridgeFetchFragments(pxfsstate);
+ int seg_id = PXF_SEGMENT_ID;
+ int seg_count = PXF_SEGMENT_COUNT;
+
+ /* Count fragments assigned to this segment (round-robin: frag_i → seg_i%count) */
+ int my_fragments = 0;
+ for (int i = 0; i < all_fragments; i++)
+ {
+ if (i % seg_count == seg_id)
+ my_fragments++;
+ }
+
+ pstate->total_fragments = my_fragments;
+ pstate->next_fragment = 0;
+ pstate->finished = (my_fragments == 0);
+
+ /* Store reference to shared state */
+ pxfsstate->pstate = pstate;
+ pxfsstate->is_parallel = true;
+
+ elog(DEBUG3, "pxf_fdw: leader seg %d initialized with %d/%d fragments",
+ seg_id, my_fragments, all_fragments);
+ }
+ else
+ {
+ /* No scan state — mark finished to prevent workers from spinning */
+ pstate->total_fragments = 0;
+ pstate->next_fragment = 0;
+ pstate->finished = true;
+ }
+}
+
+/*
+ * pxfReInitializeDSMForeignScan
+ * Reinitialize parallel scan state for rescan.
+ *
+ * Reset the fragment counter so workers can process fragments again.
+ */
+static void
+pxfReInitializeDSMForeignScan(ForeignScanState *node,
+ ParallelContext *pcxt,
+ void *coordinate)
+{
+ PxfParallelScanState *pstate = (PxfParallelScanState *) coordinate;
+ Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation);
+ PxfOptions *options = PxfGetOptions(foreigntableid);
+
+ if (!options->enable_parallel)
+ return;
+
+ elog(DEBUG3, "pxf_fdw: pxfReInitializeDSMForeignScan resetting parallel state");
+
+ SpinLockAcquire(&pstate->mutex);
+ pstate->next_fragment = 0;
+ pstate->finished = false;
+ SpinLockRelease(&pstate->mutex);
+}
+
+/*
+ * pxfInitializeWorkerForeignScan
+ * Initialize a parallel worker's foreign scan state.
+ *
+ * Each worker attaches to the shared state and prepares to process fragments.
+ */
+static void
+pxfInitializeWorkerForeignScan(ForeignScanState *node,
+ shm_toc *toc,
+ void *coordinate)
+{
+ PxfParallelScanState *pstate = (PxfParallelScanState *) coordinate;
+ PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state;
+ Oid foreigntableid = RelationGetRelid(node->ss.ss_currentRelation);
+ PxfOptions *options = PxfGetOptions(foreigntableid);
+
+ if (!options->enable_parallel)
+ return;
+
+ elog(DEBUG3, "pxf_fdw: pxfInitializeWorkerForeignScan worker attaching to parallel state");
+
+ if (pxfsstate != NULL)
+ {
+ pxfsstate->pstate = pstate;
+ pxfsstate->is_parallel = true;
+ pxfsstate->num_fragments = pstate->total_fragments;
+ pxfsstate->current_fragment = -1; /* Will be assigned on first iteration */
+ }
+}
+
+/*
+ * pxfShutdownForeignScan
+ * Shutdown parallel foreign scan, cleanup shared resources.
+ */
+static void
+pxfShutdownForeignScan(ForeignScanState *node)
+{
+ PxfFdwScanState *pxfsstate = (PxfFdwScanState *) node->fdw_state;
+
+ elog(DEBUG3, "pxf_fdw: pxfShutdownForeignScan called");
+
+ if (pxfsstate != NULL && pxfsstate->is_parallel)
+ {
+ /* Signal completion in shared state only.
+ * All resource cleanup belongs in EndForeignScan (called after
+ * Shutdown), avoiding double-free. */
+ if (pxfsstate->pstate != NULL)
+ {
+ SpinLockAcquire(&pxfsstate->pstate->mutex);
+ pxfsstate->pstate->finished = true;
+ SpinLockRelease(&pxfsstate->pstate->mutex);
+ }
+ }
+}
diff --git a/fdw/pxf_option.c b/fdw/pxf_option.c
index 13ccfe92..0d347031 100644
--- a/fdw/pxf_option.c
+++ b/fdw/pxf_option.c
@@ -39,6 +39,7 @@
#define FDW_OPTION_REJECT_LIMIT "reject_limit"
#define FDW_OPTION_REJECT_LIMIT_TYPE "reject_limit_type"
#define FDW_OPTION_RESOURCE "resource"
+#define FDW_OPTION_ENABLE_PARALLEL "enable_parallel"
#define FDW_COPY_OPTION_FORMAT "format"
#define FDW_COPY_OPTION_HEADER "header"
@@ -72,6 +73,10 @@ static const struct PxfFdwOption valid_options[] = {
{FDW_OPTION_REJECT_LIMIT_TYPE, ForeignTableRelationId},
{FDW_OPTION_LOG_ERRORS, ForeignTableRelationId},
+ /* Parallel execution */
+ {FDW_OPTION_ENABLE_PARALLEL, ForeignTableRelationId},
+ {FDW_OPTION_ENABLE_PARALLEL, ForeignServerRelationId},
+
/* Sentinel */
{NULL, InvalidOid}
};
@@ -454,6 +459,8 @@ PxfGetOptions(Oid foreigntableid)
opt->log_errors = defGetBoolean(def);
else if (strcmp(def->defname, FDW_OPTION_DISABLE_PPD) == 0)
opt->disable_ppd = defGetBoolean(def);
+ else if (strcmp(def->defname, FDW_OPTION_ENABLE_PARALLEL) == 0)
+ opt->enable_parallel = defGetBoolean(def);
else if (strcmp(def->defname, FDW_OPTION_FORMAT) == 0)
{
opt->format = defGetString(def);
@@ -565,20 +572,37 @@ static void
ValidateOption(char *option, Oid catalog)
{
const struct PxfFdwOption *entry;
+ bool found = false;
for (entry = valid_options; entry->optname; entry++)
{
- /* option can only be defined at its catalog level */
- if (strcmp(entry->optname, option) == 0 && catalog != entry->optcontext)
+ if (strcmp(entry->optname, option) == 0)
{
- Relation rel = RelationIdGetRelation(entry->optcontext);
+ /* option is recognized; check if it's allowed at this catalog level */
+ if (catalog == entry->optcontext)
+ return; /* valid — exact match */
+ found = true; /* name matches but at a different level */
+ }
+ }
- ereport(ERROR,
- (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
- errmsg(
- "the %s option can only be defined at the %s level",
- option,
- RelationGetRelationName(rel))));
+ if (found)
+ {
+ /*
+ * The option exists but is not valid at this catalog level.
+ * Report the first matching level for the error message.
+ */
+ for (entry = valid_options; entry->optname; entry++)
+ {
+ if (strcmp(entry->optname, option) == 0)
+ {
+ Relation rel = RelationIdGetRelation(entry->optcontext);
+
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
+ errmsg("the %s option can only be defined at the %s level",
+ option,
+ RelationGetRelationName(rel))));
+ }
}
}
}
diff --git a/fdw/pxf_option.h b/fdw/pxf_option.h
index 15d27781..317139fc 100644
--- a/fdw/pxf_option.h
+++ b/fdw/pxf_option.h
@@ -55,6 +55,9 @@ typedef struct PxfOptions
/* Encoding options */
char *data_encoding; /* The encoding of the data on the external system */
const char *database_encoding; /* The database encoding */
+
+ /* Parallel execution options */
+ bool enable_parallel; /* whether to enable parallel foreign scan */
} PxfOptions;
/* Functions prototypes for pxf_option.c file */
diff --git a/server/pxf-api/src/main/java/org/apache/cloudberry/pxf/api/model/RequestContext.java b/server/pxf-api/src/main/java/org/apache/cloudberry/pxf/api/model/RequestContext.java
index 4866744b..b9bdd8d6 100644
--- a/server/pxf-api/src/main/java/org/apache/cloudberry/pxf/api/model/RequestContext.java
+++ b/server/pxf-api/src/main/java/org/apache/cloudberry/pxf/api/model/RequestContext.java
@@ -116,6 +116,14 @@ public enum RequestType {
@Getter(AccessLevel.NONE)
private FragmentMetadata fragmentMetadata;
+ /**
+ * The specific fragment index requested for parallel execution.
+ * When set (not null), indicates that only this specific fragment
+ * should be processed, bypassing the normal segment-based fragment
+ * distribution. This is used for PostgreSQL parallel workers.
+ */
+ private Integer specificFragmentIndex;
+
/**
* The filter string, null if #hasFilter is false.
*/
@@ -480,6 +488,16 @@ public boolean hasFilter() {
return filterString != null;
}
+ /**
+ * Returns true if a specific fragment index was requested for parallel execution.
+ * When true, only the fragment at the specified index should be processed.
+ *
+ * @return true if a specific fragment index was requested
+ */
+ public boolean hasSpecificFragment() {
+ return specificFragmentIndex != null;
+ }
+
/**
* Returns true if there is column projection.
*
diff --git a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/FragmenterService.java b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/FragmenterService.java
index 590bb659..0f4e6d64 100644
--- a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/FragmenterService.java
+++ b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/FragmenterService.java
@@ -95,6 +95,67 @@ public List getFragmentsForSegment(RequestContext context) throws IOEx
return filteredFragments;
}
+ /**
+ * Returns all fragments without segment-based filtering.
+ * This method is used for parallel execution mode where fragment
+ * distribution is handled by the FDW client rather than the server.
+ *
+ * @param context the request context
+ * @return the complete list of fragments for the request
+ * @throws IOException when an exception occurs
+ */
+ public List getAllFragments(RequestContext context) throws IOException {
+ LOG.trace("Received FRAGMENTER call for all fragments (parallel mode)");
+ Instant startTime = Instant.now();
+
+ List fragments = getFragmentsFromCache(context, startTime);
+
+ if (LOG.isDebugEnabled()) {
+ int numberOfFragments = fragments.size();
+ long elapsedMillis = Duration.between(startTime, Instant.now()).toMillis();
+
+ LOG.debug("Returning all {} fragment{} for path {} in {} ms (parallel mode)",
+ numberOfFragments, numberOfFragments == 1 ? "" : "s",
+ context.getDataSource(), elapsedMillis);
+ }
+
+ return fragments;
+ }
+
+ /**
+ * Returns a single fragment at the specified index.
+ * This method is used for parallel execution mode where each worker
+ * requests a specific fragment by index.
+ *
+ * @param context the request context
+ * @param fragmentIndex the index of the fragment to return
+ * @return the fragment at the specified index
+ * @throws IOException when an exception occurs
+ * @throws IllegalArgumentException when the fragment index is out of bounds
+ */
+ public Fragment getFragmentByIndex(RequestContext context, int fragmentIndex) throws IOException {
+ LOG.debug("Received request for specific fragment index {} (parallel mode)", fragmentIndex);
+ Instant startTime = Instant.now();
+
+ List allFragments = getFragmentsFromCache(context, startTime);
+
+ if (fragmentIndex < 0 || fragmentIndex >= allFragments.size()) {
+ throw new IllegalArgumentException(String.format(
+ "Invalid fragment index %d, valid range is 0 to %d",
+ fragmentIndex, allFragments.size() - 1));
+ }
+
+ Fragment fragment = allFragments.get(fragmentIndex);
+
+ if (LOG.isDebugEnabled()) {
+ long elapsedMillis = Duration.between(startTime, Instant.now()).toMillis();
+ LOG.debug("Returning fragment {} of {} for path {} in {} ms (parallel mode)",
+ fragmentIndex, allFragments.size(), context.getDataSource(), elapsedMillis);
+ }
+
+ return fragment;
+ }
+
/**
* Returns the list of fragments from the fragmenter cache. If the cache is
* empty, it populates the cache with the list of fragments. When
diff --git a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/HttpRequestParser.java b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/HttpRequestParser.java
index 5de31a7a..12b318a4 100644
--- a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/HttpRequestParser.java
+++ b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/HttpRequestParser.java
@@ -135,6 +135,13 @@ public RequestContext parseRequest(MultiValueMap requestHeaders,
context.setFragmenter(params.removeUserProperty("FRAGMENTER"));
+ // Parse specific fragment index for parallel execution
+ String fragmentIndexStr = params.removeOptionalProperty("FRAGMENT-INDEX");
+ if (StringUtils.isNotBlank(fragmentIndexStr)) {
+ context.setSpecificFragmentIndex(Integer.parseInt(fragmentIndexStr));
+ LOG.debug("Parallel mode: specific fragment index {} requested", fragmentIndexStr);
+ }
+
context.setHost(params.removeProperty("URL-HOST"));
context.setMetadata(params.removeUserProperty("METADATA"));
context.setPort(params.removeIntProperty("URL-PORT"));
diff --git a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/controller/ReadServiceImpl.java b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/controller/ReadServiceImpl.java
index e4558afe..921254fd 100644
--- a/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/controller/ReadServiceImpl.java
+++ b/server/pxf-service/src/main/java/org/apache/cloudberry/pxf/service/controller/ReadServiceImpl.java
@@ -42,10 +42,10 @@ public class ReadServiceImpl extends BaseServiceImpl implements
* @param metricsReporter metrics reporter service
*/
public ReadServiceImpl(ConfigurationFactory configurationFactory,
- BridgeFactory bridgeFactory,
- SecurityService securityService,
- FragmenterService fragmenterService,
- MetricsReporter metricsReporter) {
+ BridgeFactory bridgeFactory,
+ SecurityService securityService,
+ FragmenterService fragmenterService,
+ MetricsReporter metricsReporter) {
super("Read", configurationFactory, bridgeFactory, securityService, metricsReporter);
this.fragmenterService = fragmenterService;
}
@@ -53,14 +53,18 @@ public ReadServiceImpl(ConfigurationFactory configurationFactory,
@Override
public void readData(RequestContext context, OutputStream outputStream) {
// wrapping the invocation of processData(..) with the error reporting logic
- // since any exception thrown from it must be logged, as this method is called asynchronously
- // and is the last opportunity to log the exception while having MDC logging context defined
+ // since any exception thrown from it must be logged, as this method is called
+ // asynchronously
+ // and is the last opportunity to log the exception while having MDC logging
+ // context defined
invokeWithErrorHandling(() -> processData(context, () -> writeStream(context, outputStream)));
}
/**
- * Calls Fragmenter service to get a list of fragments for the resource, then reads records for each fragment
- * and writes them to the output stream. Maintains the satistics about the progress of the query and reports
+ * Calls Fragmenter service to get a list of fragments for the resource, then
+ * reads records for each fragment
+ * and writes them to the output stream. Maintains the satistics about the
+ * progress of the query and reports
* it to the caller even if the operation failed or aborted.
*
* @param context request context
@@ -78,11 +82,26 @@ private OperationResult writeStream(RequestContext context, OutputStream outputS
OperationStats queryStats = new OperationStats(OperationStats.Operation.READ, metricsReporter, context);
OperationResult queryResult = new OperationResult();
- // dataStream (and outputStream as the result) will close automatically at the end of the try block
+ // dataStream (and outputStream as the result) will close automatically at the
+ // end of the try block
CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream);
String sourceName = null;
try {
- List fragments = fragmenterService.getFragmentsForSegment(context);
+ List fragments;
+
+ // Check if a specific fragment index was requested (parallel execution mode)
+ if (context.hasSpecificFragment()) {
+ // Parallel mode: only process the specified fragment
+ Fragment specificFragment = fragmenterService.getFragmentByIndex(
+ context, context.getSpecificFragmentIndex());
+ fragments = java.util.Collections.singletonList(specificFragment);
+ log.debug("Parallel mode: processing only fragment {} of resource {}",
+ context.getSpecificFragmentIndex(), context.getDataSource());
+ } else {
+ // Normal mode: get fragments filtered by segment
+ fragments = fragmenterService.getFragmentsForSegment(context);
+ }
+
for (int i = 0; i < fragments.size(); i++) {
Fragment fragment = fragments.get(i);
sourceName = fragment.getSourceName();
@@ -116,8 +135,10 @@ private OperationResult writeStream(RequestContext context, OutputStream outputS
}
}
} catch (Exception e) {
- // the exception is not re-thrown but passed to the caller in the queryResult so that
- // the caller has a chance to inspect / report query stats before re-throwing the exception
+ // the exception is not re-thrown but passed to the caller in the queryResult so
+ // that
+ // the caller has a chance to inspect / report query stats before re-throwing
+ // the exception
queryResult.setException(e);
queryResult.setSourceName(sourceName);
} finally {
@@ -128,7 +149,8 @@ private OperationResult writeStream(RequestContext context, OutputStream outputS
}
/**
- * Processes a single fragment identified in the RequestContext and updates query statistics.
+ * Processes a single fragment identified in the RequestContext and updates
+ * query statistics.
*
* @param countingOutputStream output stream to write data to
* @param context request context
@@ -136,8 +158,8 @@ private OperationResult writeStream(RequestContext context, OutputStream outputS
* @throws Exception if operation fails
*/
private void processFragment(CountingOutputStream countingOutputStream,
- RequestContext context,
- OperationStats queryStats) throws Exception {
+ RequestContext context,
+ OperationStats queryStats) throws Exception {
Writable record;
DataOutputStream dos = new DataOutputStream(countingOutputStream);
@@ -177,12 +199,14 @@ private void processFragment(CountingOutputStream countingOutputStream,
fragmentStats.setByteCount(countingOutputStream.getCount() - previousStreamByteCount);
fragmentStats.flushStats();
- // update query stats even if there was an exception so that they can be properly reported by the
+ // update query stats even if there was an exception so that they can be
+ // properly reported by the
// error reporter
queryStats.update(fragmentStats);
log.debug("Finished processing fragment {} of resource {} in {} ms, wrote {} records and {} bytes.",
- context.getFragmentIndex(), context.getDataSource(), duration.toMillis(), fragmentStats.getRecordCount(), fragmentStats.getByteCount());
+ context.getFragmentIndex(), context.getDataSource(), duration.toMillis(),
+ fragmentStats.getRecordCount(), fragmentStats.getByteCount());
metricsReporter.reportTimer(MetricsReporter.PxfMetric.FRAGMENTS_SENT, duration, context, success);
}
}