Skip to content

mongodb parallel snapshotting support#4097

Merged
jgao54 merged 4 commits intomainfrom
parallel-snapshotting
Apr 14, 2026
Merged

mongodb parallel snapshotting support#4097
jgao54 merged 4 commits intomainfrom
parallel-snapshotting

Conversation

@jgao54
Copy link
Copy Markdown
Contributor

@jgao54 jgao54 commented Mar 26, 2026

Introduce parallel snapshotting for MongoDB.

The partitioning strategy uses min-max range partitioning: we get the smallest and largest objectId, extract the timestamp from them, find the timestamp delta, and uniformly divide it to create partitions. This is prone to data skew if document insertion rate varies significantly over time (e.g. huge backfill of data at once), but for most production workload this should provide a reasonable distribution.

Sometimes user supply their own custom _id column instead of using the default auto-generated object id. We detect this by querying the smallest and largest _id (conveniently, as part of min-max partitioning strategy anyways). If both are objectID, then this guarantees every document in the collection has object id as _id, since MongoDB sort index by BSON type. If one or more non-ObjectID key is detected, we fallback to full table partition.

For now, put this feature behind a feature flag.

Testing

Added e2e test to make sure (1) collection with objectIDs runs parallel snapshots (2) collection with non-ObjectID or mixed _id type uses full table partition (3) empty collection uses full table partition

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 26, 2026

❌ 2 Tests Failed:

Tests completed Failed Passed Skipped
2189 2 2187 196
View the top 3 failed test(s) by shortest run time
github.com/PeerDB-io/peerdb/flow/e2e::TestApiPg
Stack Traces | 0s run time
=== RUN   TestApiPg
=== PAUSE TestApiPg
=== CONT  TestApiPg
--- FAIL: TestApiPg (0.00s)
github.com/PeerDB-io/peerdb/flow/e2e::TestGenericBQ
Stack Traces | 0s run time
=== RUN   TestGenericBQ
=== PAUSE TestGenericBQ
=== CONT  TestGenericBQ
--- FAIL: TestGenericBQ (0.00s)
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuiteMySQL_CH
Stack Traces | 0.01s run time
=== RUN   TestPeerFlowE2ETestSuiteMySQL_CH
=== PAUSE TestPeerFlowE2ETestSuiteMySQL_CH
=== CONT  TestPeerFlowE2ETestSuiteMySQL_CH
--- FAIL: TestPeerFlowE2ETestSuiteMySQL_CH (0.01s)
github.com/PeerDB-io/peerdb/flow/e2e::TestPeerFlowE2ETestSuiteMySQL_CH/Test_Extra_CH_Columns
Stack Traces | 18.3s run time
=== RUN   TestPeerFlowE2ETestSuiteMySQL_CH/Test_Extra_CH_Columns
=== PAUSE TestPeerFlowE2ETestSuiteMySQL_CH/Test_Extra_CH_Columns
=== CONT  TestPeerFlowE2ETestSuiteMySQL_CH/Test_Extra_CH_Columns
2026/04/14 21:56:27 INFO Received AWS credentials from peer for connector: ci x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
2026/04/14 21:56:27 INFO Received AWS credentials from peer for connector: clickhouse x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
2026/04/14 21:56:27 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_api_5ytgsfhu.t1
    clickhouse_test.go:2572: WaitFor waiting on initial 2026-04-14 21:56:31.907528347 +0000 UTC m=+427.289016387
2026/04/14 21:56:31 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_mych_p2rhi5bv.test_extra_ch_cols
    clickhouse_test.go:2575: WaitFor waiting on cdc 2026-04-14 21:56:31.92288904 +0000 UTC m=+427.304377079
2026/04/14 21:56:31 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_mych_p2rhi5bv.test_extra_ch_cols
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:31 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_api_5ytgsfhu.t1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:32 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_api_5ytgsfhu.t1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:33 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_api_5ytgsfhu.t1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:35 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_api_5ytgsfhu.t1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:36 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_api_5ytgsfhu.t1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:37 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_api_5ytgsfhu.t1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:38 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_api_5ytgsfhu.t1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:39 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_api_5ytgsfhu.t1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:42 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_mych_nozwmlsq.test_mysql_ghost_schema
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:44 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_mych_nozwmlsq.test_mysql_ghost_schema
    clickhouse_test.go:2575: q.NumRecords: 2
    clickhouse_test.go:2575: other.NumRecords: 1
2026/04/14 21:56:45 INFO fetched schema x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} table=e2e_test_mych_nozwmlsq.test_mysql_ghost_schema
    clickhouse.go:115: 
        	Error Trace:	.../flow/e2e/congen.go:43
        	            				.../flow/e2e/clickhouse.go:115
        	            				.../flow/e2e/clickhouse.go:97
        	            				.../flow/e2e/clickhouse.go:172
        	            				.../flow/e2e/test_utils.go:181
        	            				.../flow/e2e/test_utils.go:852
        	            				.../flow/e2e/test_utils.go:172
        	            				.../flow/e2e/clickhouse_test.go:2575
        	Error:      	Received unexpected error:
        	            	unable to establish connection with catalog: FATAL: terminating connection due to administrator command (SQLSTATE 57P01)
        	Test:       	TestPeerFlowE2ETestSuiteMySQL_CH/Test_Extra_CH_Columns
--- FAIL: TestPeerFlowE2ETestSuiteMySQL_CH/Test_Extra_CH_Columns (18.26s)
github.com/PeerDB-io/peerdb/flow/e2e::TestApiPg/TestCancelAddCancel
Stack Traces | 24.1s run time
=== RUN   TestApiPg/TestCancelAddCancel
=== PAUSE TestApiPg/TestCancelAddCancel
=== CONT  TestApiPg/TestCancelAddCancel
2026/04/14 22:08:28 INFO Received AWS credentials from peer for connector: ci x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
2026/04/14 22:08:28 INFO Received AWS credentials from peer for connector: clickhouse x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN}
    cancel_table_addition_test.go:788: WaitFor wait for initial load to finish 2026-04-14 22:08:32.595496518 +0000 UTC m=+10.080694658
    cancel_table_addition_test.go:792: WaitFor t1 2026-04-14 22:08:32.595843953 +0000 UTC m=+10.081042083
2026/04/14 22:08:32 INFO Executing and processing query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_ose6p2ju.\"t1\" ORDER BY id"
2026/04/14 22:08:32 INFO Executing and processing query stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_ose6p2ju.\"t1\" ORDER BY id"
2026/04/14 22:08:32 INFO [pg_query_executor] declared cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursorQuery="DECLARE peerdb_cursor_735318052786857562 CURSOR FOR SELECT id,val FROM e2e_test_api_ose6p2ju.\"t1\" ORDER BY id" args=[]
2026/04/14 22:08:32 INFO [pg_query_executor] fetching rows start x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_ose6p2ju.\"t1\" ORDER BY id" channelLen=0
2026/04/14 22:08:32 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_735318052786857562
2026/04/14 22:08:32 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_735318052786857562 records=1 bytes=9 channelLen=0
2026/04/14 22:08:32 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_ose6p2ju.\"t1\" ORDER BY id" rows=1 bytes=9 channelLen=0
2026/04/14 22:08:32 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_735318052786857562
2026/04/14 22:08:32 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_735318052786857562 records=0 bytes=0 channelLen=0
2026/04/14 22:08:32 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_ose6p2ju.\"t1\" ORDER BY id" rows=0 bytes=0 channelLen=0
2026/04/14 22:08:32 INFO [pg_query_executor] committing transaction x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart
2026/04/14 22:08:32 INFO [pg_query_executor] committed transaction for query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_ose6p2ju.\"t1\" ORDER BY id" rows=1 bytes=9 channelLen=0
    cancel_table_addition_test.go:127: WaitFor wait for pause for add e2e_test_api_ose6p2ju.t2 2026-04-14 22:08:32.619895965 +0000 UTC m=+10.105094105
    cancel_table_addition_test.go:826: WaitFor wait for stuck snapshot of t2 add table 2026-04-14 22:08:34.627771694 +0000 UTC m=+12.112969834
    cancel_table_addition_test.go:829: WaitFor waiting for initial load MV error messages for t2 2026-04-14 22:08:35.629304452 +0000 UTC m=+13.114502592
    cancel_table_addition_test.go:860: WaitFor wait for running after t2 cancel 2026-04-14 22:08:37.487165404 +0000 UTC m=+14.972363544
    cancel_table_addition_test.go:127: WaitFor wait for pause for add e2e_test_api_ose6p2ju.t2 2026-04-14 22:08:37.497797485 +0000 UTC m=+14.982995625
2026/04/14 22:08:37 INFO Executing and processing query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_qufvaxq2.\"table1\" ORDER BY id"
2026/04/14 22:08:37 INFO Executing and processing query stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_qufvaxq2.\"table1\" ORDER BY id"
2026/04/14 22:08:37 INFO [pg_query_executor] declared cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursorQuery="DECLARE peerdb_cursor_17122840393523900505 CURSOR FOR SELECT id,val FROM e2e_test_api_qufvaxq2.\"table1\" ORDER BY id" args=[]
2026/04/14 22:08:37 INFO [pg_query_executor] fetching rows start x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_qufvaxq2.\"table1\" ORDER BY id" channelLen=0
2026/04/14 22:08:37 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_17122840393523900505
2026/04/14 22:08:37 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_17122840393523900505 records=2 bytes=19 channelLen=1
2026/04/14 22:08:37 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_qufvaxq2.\"table1\" ORDER BY id" rows=2 bytes=19 channelLen=1
2026/04/14 22:08:37 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_17122840393523900505
2026/04/14 22:08:37 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_17122840393523900505 records=0 bytes=0 channelLen=0
2026/04/14 22:08:37 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_qufvaxq2.\"table1\" ORDER BY id" rows=0 bytes=0 channelLen=0
2026/04/14 22:08:37 INFO [pg_query_executor] committing transaction x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart
2026/04/14 22:08:37 INFO [pg_query_executor] committed transaction for query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,val FROM e2e_test_api_qufvaxq2.\"table1\" ORDER BY id" rows=2 bytes=19 channelLen=0
    cancel_table_addition_test.go:128: UNEXPECTED ERROR unable to establish connection with catalog: FATAL: terminating connection due to administrator command (SQLSTATE 57P01)
    api_test.go:48: begin tearing down postgres schema api_ose6p2ju
--- FAIL: TestApiPg/TestCancelAddCancel (24.08s)
github.com/PeerDB-io/peerdb/flow/e2e::TestGenericBQ/Test_Simple_Flow
Stack Traces | 33.6s run time
=== RUN   TestGenericBQ/Test_Simple_Flow
=== PAUSE TestGenericBQ/Test_Simple_Flow
=== CONT  TestGenericBQ/Test_Simple_Flow
2026/04/14 21:51:27 INFO Executing and processing query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes\" ORDER BY id"
2026/04/14 21:51:27 INFO Executing and processing query stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes\" ORDER BY id"
2026/04/14 21:51:27 INFO [pg_query_executor] declared cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursorQuery="DECLARE peerdb_cursor_5232279510531424340 CURSOR FOR SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes\" ORDER BY id" args=[]
2026/04/14 21:51:27 INFO [pg_query_executor] fetching rows start x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes\" ORDER BY id" channelLen=0
2026/04/14 21:51:27 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_5232279510531424340
2026/04/14 21:51:27 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_5232279510531424340 records=4 bytes=48 channelLen=3
2026/04/14 21:51:27 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes\" ORDER BY id" rows=4 bytes=48 channelLen=3
2026/04/14 21:51:27 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_5232279510531424340
2026/04/14 21:51:27 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_5232279510531424340 records=0 bytes=0 channelLen=0
2026/04/14 21:51:27 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes\" ORDER BY id" rows=0 bytes=0 channelLen=0
2026/04/14 21:51:27 INFO [pg_query_executor] committing transaction x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart
2026/04/14 21:51:27 INFO [pg_query_executor] committed transaction for query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes\" ORDER BY id" rows=4 bytes=48 channelLen=0
2026/04/14 21:51:27 INFO Executing and processing query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes_dst\" ORDER BY id"
2026/04/14 21:51:27 INFO Executing and processing query stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes_dst\" ORDER BY id"
2026/04/14 21:51:27 INFO [pg_query_executor] declared cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursorQuery="DECLARE peerdb_cursor_17488672075498793538 CURSOR FOR SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes_dst\" ORDER BY id" args=[]
2026/04/14 21:51:27 INFO [pg_query_executor] fetching rows start x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes_dst\" ORDER BY id" channelLen=0
2026/04/14 21:51:27 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_17488672075498793538
2026/04/14 21:51:27 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_17488672075498793538 records=3 bytes=36 channelLen=2
2026/04/14 21:51:27 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes_dst\" ORDER BY id" rows=3 bytes=36 channelLen=2
2026/04/14 21:51:27 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_17488672075498793538
2026/04/14 21:51:27 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_17488672075498793538 records=0 bytes=0 channelLen=0
2026/04/14 21:51:27 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes_dst\" ORDER BY id" rows=0 bytes=0 channelLen=0
2026/04/14 21:51:27 INFO [pg_query_executor] committing transaction x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart
2026/04/14 21:51:27 INFO [pg_query_executor] committed transaction for query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id,c1 FROM e2e_test_pg_pxxyu4yh.\"test_simple_schema_changes_dst\" ORDER BY id" rows=3 bytes=36 channelLen=0
    generic_test.go:124: UNEXPECTED STATUS TIMEOUT STATUS_SNAPSHOT
    bigquery.go:86: begin tearing down postgres schema bq_mszdpyjs_20260414215127
2026/04/14 21:52:01 INFO Executing and processing query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id, col_to_add FROM e2e_test_bq_mqjdpkrz_20260414215126.\"test_ddl_drop_column\" ORDER BY id"
2026/04/14 21:52:01 INFO Executing and processing query stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id, col_to_add FROM e2e_test_bq_mqjdpkrz_20260414215126.\"test_ddl_drop_column\" ORDER BY id"
2026/04/14 21:52:01 INFO [pg_query_executor] declared cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursorQuery="DECLARE peerdb_cursor_4364632986707296074 CURSOR FOR SELECT id, col_to_add FROM e2e_test_bq_mqjdpkrz_20260414215126.\"test_ddl_drop_column\" ORDER BY id" args=[]
2026/04/14 21:52:01 INFO [pg_query_executor] fetching rows start x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id, col_to_add FROM e2e_test_bq_mqjdpkrz_20260414215126.\"test_ddl_drop_column\" ORDER BY id" channelLen=0
2026/04/14 21:52:01 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_4364632986707296074
2026/04/14 21:52:01 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_4364632986707296074 records=2 bytes=12 channelLen=1
2026/04/14 21:52:01 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id, col_to_add FROM e2e_test_bq_mqjdpkrz_20260414215126.\"test_ddl_drop_column\" ORDER BY id" rows=2 bytes=12 channelLen=1
2026/04/14 21:52:01 INFO [pg_query_executor] fetching from cursor x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_4364632986707296074
2026/04/14 21:52:01 INFO processed row stream x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart cursor=peerdb_cursor_4364632986707296074 records=0 bytes=0 channelLen=0
2026/04/14 21:52:01 INFO [pg_query_executor] fetched rows x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id, col_to_add FROM e2e_test_bq_mqjdpkrz_20260414215126.\"test_ddl_drop_column\" ORDER BY id" rows=0 bytes=0 channelLen=0
2026/04/14 21:52:01 INFO [pg_query_executor] committing transaction x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart
2026/04/14 21:52:01 INFO [pg_query_executor] committed transaction for query x-peerdb-additional-metadata={Operation:FLOW_OPERATION_UNKNOWN} partitionId=testpart query="SELECT id, col_to_add FROM e2e_test_bq_mqjdpkrz_20260414215126.\"test_ddl_drop_column\" ORDER BY id" rows=2 bytes=12 channelLen=0
--- FAIL: TestGenericBQ/Test_Simple_Flow (33.60s)

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@github-actions
Copy link
Copy Markdown
Contributor

🔄 Flaky Test Detected

Analysis: The test TestGenericBQ/Test_Inheritance_Table_With_Dynamic_Setting failed with UNEXPECTED STATUS TIMEOUT STATUS_SNAPSHOT, a timing-related failure in a BigQuery e2e test that the team has explicitly flagged as flaky under high concurrency.
Confidence: 0.9

✅ Automatically retrying the workflow

View workflow run

@jgao54 jgao54 force-pushed the parallel-snapshotting branch from 87b8245 to 1452607 Compare April 8, 2026 08:49
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

❌ Test Failure

Analysis: All 3 TestMongoClickhouseSuite tests fail identically across all matrix variants with deterministic errors (UNEXPECTED STATUS TIMEOUT STATUS_SNAPSHOT and ClickHouse unknown table identifier), indicating a real regression in MongoDB snapshot handling rather than a flaky failure.
Confidence: 0.92

⚠️ This appears to be a real bug - manual intervention needed

View workflow run

@jgao54 jgao54 force-pushed the parallel-snapshotting branch from 1452607 to 7654168 Compare April 8, 2026 20:16
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

🔄 Flaky Test Detected

Analysis: Two MongoDB-ClickHouse e2e tests failed with "UNEXPECTED STATUS TIMEOUT STATUS_SNAPSHOT", indicating they timed out waiting for snapshot workflow completion — a classic flaky pattern in distributed systems testing.
Confidence: 0.9

✅ Automatically retrying the workflow

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

🔄 Flaky Test Detected

Analysis: All three matrix variants failed with "UNEXPECTED STATUS TIMEOUT STATUS_SNAPSHOT" in TestMongoClickhouseSuite at the same wall-clock time, indicating a transient timeout waiting for the MongoDB snapshot — not a code regression.
Confidence: 0.92

✅ Automatically retrying the workflow

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

❌ Test Failure

Analysis: TestMongoClickhouseSuite/Test_CDC and Test_Mongo_Can_Resume_After_Delete_Table fail deterministically across all three CI matrix configurations with identical ~31s durations, indicating a real bug rather than a flaky test.
Confidence: 0.92

⚠️ This appears to be a real bug - manual intervention needed

View workflow run

@jgao54 jgao54 force-pushed the parallel-snapshotting branch from 7654168 to af08f20 Compare April 8, 2026 21:39
@jgao54 jgao54 changed the title [WIP] mongodb parallel snapshotting mongodb parallel snapshotting support Apr 8, 2026
@jgao54 jgao54 marked this pull request as ready for review April 8, 2026 21:56
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

🔄 Flaky Test Detected

Analysis: Only one of three matrix combinations (PG18 + MariaDB 8.0) failed in the e2e integration test suite while identical code passed on PG16 and PG17, with no deterministic assertion failure visible in logs, strongly suggesting a flaky e2e test rather than a real regression.
Confidence: 0.75

✅ Automatically retrying the workflow

View workflow run

@jgao54 jgao54 force-pushed the parallel-snapshotting branch from af08f20 to f1801ff Compare April 8, 2026 22:27
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

🔄 Flaky Test Detected

Analysis: Test teardown race condition: the replication slot was still held by an active background PID when cleanup tried to drop it, causing a false failure unrelated to the test's actual logic.
Confidence: 0.9

✅ Automatically retrying the workflow

View workflow run

@jgao54 jgao54 force-pushed the parallel-snapshotting branch from a1c4da6 to dae8cf8 Compare April 8, 2026 22:59
@jgao54 jgao54 requested a review from ilidemi April 8, 2026 23:01
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

🔄 Flaky Test Detected

Analysis: The e2e test TestGenericBQ/Test_Inheritance_Table_With_Dynamic_Setting failed with UNEXPECTED STATUS TIMEOUT STATUS_SNAPSHOT, indicating the snapshot workflow didn't complete within the polling deadline — a classic transient CI timing/resource contention failure, not a code bug.
Confidence: 0.9

✅ Automatically retrying the workflow

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

🔄 Possible Flaky Test

Analysis: The MySQL GTID e2e test suite failed after ~720s with no specific test failure message visible, suggesting a timing/infrastructure issue in the distributed test environment rather than a code regression.
Confidence: 0.6

⚠️ Confidence too low (0.6) to retry automatically - manual review recommended

View workflow run

@jgao54 jgao54 requested a review from pfcoperez April 9, 2026 17:11
Comment thread flow/connectors/mongo/qrep.go
Comment thread flow/connectors/mongo/qrep.go
Comment thread flow/connectors/mongo/qrep_partition.go Outdated
Comment thread flow/connectors/mongo/qrep_partition.go Outdated
Comment thread flow/connectors/mongo/qrep_partition.go Outdated
Comment thread flow/connectors/mongo/qrep_partition.go Outdated
Comment thread flow/connectors/mongo/qrep_partition.go Outdated
Comment thread flow/connectors/mongo/qrep_partition.go Outdated
Comment thread flow/e2e/mongo.go
Comment thread flow/connectors/mongo/qrep_partition.go Outdated
}
// Cap partitions to the timestamp range in seconds so we don't create additional
// empty partitions unnecessarily when docs span fewer seconds than partitions
numPartitions = min(numPartitions, tsRange)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a stupid question, but why do we need to treat the time part in any special way here? If a ton of records are all inserted the same second, it seems like they could be partitioned by the bottom 8 bytes just as well, so why not do integer math

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a stupid question at all...

The object id consists of: 4-byte timestamp, 5-byte random value unique per process per machine, and 3-byte incremental counter.

Given timestamp is the most significant bits, you are right that integer math is a strictly better partition mechanism, even if all the change events have the same random bytes or if the random bytes are heavily skewed.

This will simplify the partitioning logic as well. Thanks for bringing this up.

Comment thread flow/connectors/mongo/qrep_partition.go Outdated
Comment thread flow/connectors/mongo/qrep_partition.go Outdated
Comment thread flow/connectors/mongo/qrep_partition.go Outdated
Comment thread flow/connectors/mongo/qrep_partition.go
@github-actions
Copy link
Copy Markdown
Contributor

❌ Test Failure

Analysis: The test TestMongoClickhouseSuite/Test_Snapshot_Partition_Capped_To_Timestamp_Range fails consistently across all 3 matrix variants with the same assertion (expected: 5, actual: 100), indicating a real bug in the MongoDB parallel snapshotting PR where the timestamp range cap is not filtering snapshot partitions correctly.
Confidence: 0.92

⚠️ This appears to be a real bug - manual intervention needed

View workflow run

@jgao54 jgao54 force-pushed the parallel-snapshotting branch 3 times, most recently from 6a2dc7a to acefd3c Compare April 14, 2026 20:08
@github-actions
Copy link
Copy Markdown
Contributor

❌ Test Failure

Analysis: TestMongoClickhouseSuite/Test_Snapshot_Partition_Capped_To_Timestamp_Range consistently fails across all CI matrix configurations with expected 5 records but got 100, indicating a real bug in the MongoDB snapshot partition timestamp-capping logic.
Confidence: 0.85

⚠️ This appears to be a real bug - manual intervention needed

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

❌ Test Failure

Analysis: TestMongoClickhouseSuite/Test_Snapshot_Partition_Capped_To_Timestamp_Range fails deterministically across all 3 CI matrix variants with expected 5 but got 100 records, indicating a real bug in the snapshot timestamp-range partition capping logic introduced in the parallel-snapshotting PR.
Confidence: 0.85

⚠️ This appears to be a real bug - manual intervention needed

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

🔄 Flaky Test Detected

Analysis: TestMongoClickhouseSuite/Test_Snapshot_Partition_Capped_To_Timestamp_Range failed due to a race condition where ClickHouse returned "Unknown table expression identifier" during the WaitFor loop (table not yet created), then the snapshot completed with all 100 records instead of the expected 5 within the timestamp range cap.
Confidence: 0.75

✅ Automatically retrying the workflow

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

🔄 Flaky Test Detected

Analysis: Test_Snapshot_Partition_Capped_To_Timestamp_Range fails with a ClickHouse "Unknown table expression identifier" race condition (table queried before creation completes), unrelated to the last Avro-chunking commit, and compounded by a transient port-binding infrastructure failure in the maria matrix.
Confidence: 0.82

✅ Automatically retrying the workflow

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

❌ Test Failure

Analysis: TestMongoClickhouseSuite/Test_Snapshot_Partition_Capped_To_Timestamp_Range fails deterministically across all matrix variants with expected: 5, actual: 100, indicating a real regression in timestamp-range partition capping rather than a flaky timing issue.
Confidence: 0.82

⚠️ This appears to be a real bug - manual intervention needed

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

❌ Test Failure

Analysis: The primary failure TestMongoClickhouseSuite/Test_Snapshot_Partition_Capped_To_Timestamp_Range fails consistently across all 3 CI matrix variants with an identical deterministic assertion (expected 5, actual 100), indicating a real functional regression in snapshot timestamp-range partition capping rather than a flaky test.
Confidence: 0.82

⚠️ This appears to be a real bug - manual intervention needed

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

❌ Test Failure

Analysis: TestMongoClickhouseSuite/Test_Snapshot_Partition_Capped_To_Timestamp_Range fails deterministically across all CI matrix builds with a consistent count mismatch (expected 5 rows, got 100), indicating a real regression — likely from the recent "Always chunk Avro on uncompressed bytes" commit breaking MongoDB snapshot partition timestamp-range capping.
Confidence: 0.85

⚠️ This appears to be a real bug - manual intervention needed

View workflow run

@jgao54 jgao54 force-pushed the parallel-snapshotting branch 2 times, most recently from 9afdc6b to 36d16d2 Compare April 14, 2026 21:44
@jgao54 jgao54 force-pushed the parallel-snapshotting branch from 36d16d2 to b106d5e Compare April 14, 2026 21:46
@jgao54 jgao54 enabled auto-merge (squash) April 14, 2026 21:47
@github-actions
Copy link
Copy Markdown
Contributor

🔄 Flaky Test Detected

Analysis: Two e2e tests failed due to flaky infrastructure issues: a snapshot status timeout in TestGenericBQ/Test_Simple_Flow and a transient catalog DB connection termination (SQLSTATE 57P01 admin_shutdown) in TestPeerFlowE2ETestSuiteMySQL_CH/Test_Extra_CH_Columns, neither of which indicates a real code bug.
Confidence: 0.93

✅ Automatically retrying the workflow

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

🔄 Flaky Test Detected

Analysis: Three e2e tests failed across different matrix jobs due to flaky infrastructure issues: a Temporal server connection timeout (context deadline exceeded), a schema teardown failure, and a CDC record-count race condition — none related to the retry transient tcp error code change.
Confidence: 0.93

✅ Automatically retrying the workflow

View workflow run

@github-actions
Copy link
Copy Markdown
Contributor

🔄 Flaky Test Detected

Analysis: TestApiPg/TestCancelAddCancel timed out in a WaitFor polling loop waiting for MV error messages, a timing-sensitive operation that passed in both other matrix configurations (PG16 and PG17), indicating a flaky e2e race condition rather than a code regression.
Confidence: 0.85

✅ Automatically retrying the workflow

View workflow run

@jgao54 jgao54 merged commit 6bec7d0 into main Apr 14, 2026
14 of 17 checks passed
@jgao54 jgao54 deleted the parallel-snapshotting branch April 14, 2026 22:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants