feat: parallelize filter evaluation#33
Conversation
Add --threads flag and DSCT_THREADS env var to dsct read. When the filter is parallel-safe (stateless L2-L4 protocols) and input is a file, packets are distributed round-robin to N worker threads each with their own DissectorRegistry. The merger re-assembles results in strict round-robin order so output is byte-identical to the sequential path. Reassembly-dependent filters (HTTP, DNS, TLS, tcp.stream_id, etc.) and stdin input always fall back to sequential processing. - src/parallel_read.rs: new library module with reader/worker/merger pipeline - src/field_config.rs: derive Clone on FieldConfig and internal types - src/main.rs: add --threads to ReadOptions; gate parallel path on is_parallel_safe + esp_sa_args.is_empty + resolved_threads > 1 - tests/cli_parallel_read_test.rs: TDD integration tests (12 tests) - README.md: document --threads and DSCT_THREADS
Add benches/parallel_read.rs comparing parallel_read::run with threads=1 vs threads=4 on a 20k-packet mixed UDP/TCP pcap, writing to io::sink(). sample_size=10 keeps cargo bench fast. Verified via cargo bench --bench parallel_read -- --test.
|
| Branch | feat/parallel-filter-eval |
| Testbed | ubuntu-latest |
Click to view all benchmark results
| Benchmark | Latency | Benchmark Result milliseconds (ms) (Result Δ%) | Upper Boundary milliseconds (ms) (Limit %) |
|---|---|---|---|
| target/release/dsct read bench_input.pcap | 📈 view plot 🚷 view threshold | 6.11 ms(+1.04%)Baseline: 6.05 ms | 6.89 ms (88.77%) |
| target/release/dsct stats bench_input.pcap | 📈 view plot 🚷 view threshold | 71.29 ms(+2.84%)Baseline: 69.32 ms | 85.42 ms (83.46%) |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #33 +/- ##
==========================================
+ Coverage 91.03% 91.05% +0.01%
==========================================
Files 76 79 +3
Lines 16804 17600 +796
==========================================
+ Hits 15298 16026 +728
- Misses 1506 1574 +68 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Greptile SummaryThis PR adds parallel filter evaluation across worker threads for both the CLI
Confidence Score: 4/5The core parallel output path is safe to merge — byte-identical JSONL ordering is verified by integration tests, thread shutdown and stop-flag logic is clean, and the sequential fallback path is unchanged. The round-robin merger correctly preserves packet order, the src/parallel_read.rs — the merger's progress-reporting path and the worker's serialization error handling. Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller as cmd_read (calling thread)
participant Reader as dsct-reader thread
participant W0 as dsct-worker-0
participant W1 as dsct-worker-N
participant Merger as Merger (calling thread)
Caller->>Reader: spawn (path, pn_filter, input_txs, stop)
Caller->>W0: spawn (irx_0, otx_0, filter_str, ...)
Caller->>W1: spawn (irx_N, otx_N, filter_str, ...)
Reader->>W0: send InputBatch (round 0)
Reader->>W1: send InputBatch (round 1)
Reader->>W0: send InputBatch (round 2)
W0->>Merger: send OutputBatch (results for batch 0)
W1->>Merger: send OutputBatch (results for batch 1)
W0->>Merger: send OutputBatch (results for batch 2)
Merger->>Merger: recv round-robin (W0→W1→W0…)
Merger->>Merger: apply sample_rate / offset / count
alt count limit reached
Merger->>Reader: stop.store(true)
Merger-->>W0: drop output_rxs → workers exit
end
Merger->>Caller: ReadOutcome
Caller->>Reader: join
Caller->>W0: join (check panic)
Caller->>W1: join (check panic)
Reviews (1): Last reviewed commit: "bench: add parallel filter scan benchmar..." | Re-trigger Greptile |
| let mut packets_processed = 0u64; | ||
| let mut packets_written = 0u64; | ||
| let mut filter_matches = 0u64; | ||
| let mut results_matched = 0u64; | ||
| let mut truncated_by_limit = false; | ||
| let mut worker_idx = 0usize; | ||
|
|
||
| // Receive from workers in strict round-robin order (same order the reader | ||
| // sent batches to them), preserving global packet order. | ||
| 'outer: loop { | ||
| match output_rxs[worker_idx].recv() { | ||
| Err(_) => { | ||
| // Channel closed: either stop flag is set (expected EOF / limit) | ||
| // or all workers finished (EOF). Either way, we are done. | ||
| break 'outer; | ||
| } | ||
| Ok(batch) => { | ||
| // Count packets represented in this batch for progress reporting. | ||
| let batch_count = batch.len() as u64; | ||
| packets_processed = packets_processed.saturating_add(batch_count); | ||
|
|
||
| for entry in batch { | ||
| match entry { | ||
| WorkerEntry::Warning { number, message } => { | ||
| warn(number, &message); | ||
| } | ||
| WorkerEntry::Match(json_bytes) => { | ||
| filter_matches += 1; | ||
| if sample_rate > 1 && !(filter_matches - 1).is_multiple_of(sample_rate) | ||
| { | ||
| continue; | ||
| } | ||
| results_matched += 1; | ||
| if results_matched <= offset { | ||
| continue; | ||
| } | ||
| writer.write_all(&json_bytes)?; | ||
| writer.write_all(b"\n")?; | ||
| packets_written += 1; | ||
|
|
||
| if let Some(max) = count | ||
| && packets_written >= max | ||
| { | ||
| truncated_by_limit = true; | ||
| stop.store(true, Ordering::Relaxed); | ||
| break 'outer; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Progress reporting at batch granularity. | ||
| if progress_interval > 0 | ||
| && packets_written > 0 | ||
| && packets_written.is_multiple_of(progress_interval) | ||
| { | ||
| progress(packets_processed, packets_written); | ||
| } | ||
|
|
||
| worker_idx = (worker_idx + 1) % n; | ||
| } | ||
| } |
There was a problem hiding this comment.
Progress callback fires on
packets_written instead of packets_processed
The sequential path emits progress every interval total-packets-processed (packets_processed.is_multiple_of(interval)), but the merger fires every interval written (matched) packets. For a sparse filter (e.g., 0.1% match rate) with --progress 10000, sequential would emit ~10 progress events on a 100 K-packet file; parallel would emit 0 (only 100 packets match). Additionally, the packets_processed value passed to the callback only counts WorkerEntry items (matches + warnings), so progress JSON will report a far lower packets_processed than the sequential path does for the same file, breaking any tool that monitors total throughput via --progress.
| json_buf.clear(); | ||
| if write_packet_json( | ||
| &mut json_buf, | ||
| meta, | ||
| dbuf, | ||
| data.as_slice(), | ||
| field_config.as_ref(), | ||
| raw_bytes, | ||
| ) | ||
| .is_ok() | ||
| { | ||
| results.push(WorkerEntry::Match(json_buf.clone())); | ||
| } |
There was a problem hiding this comment.
Matched packet silently dropped when serialization returns
Err
If write_packet_json returns an error, the packet passes the filter but is never pushed as WorkerEntry::Match and is silently dropped — no warning, no error, no output line. The sequential path propagates the same error with ?, which would surface it as a structured error. Writing to Vec<u8> cannot fail today, but the inconsistent contract means a future refactor that allows I/O errors on the JSON path would silently lose matched packets in parallel mode while erroring in sequential mode.
| json_buf.clear(); | |
| if write_packet_json( | |
| &mut json_buf, | |
| meta, | |
| dbuf, | |
| data.as_slice(), | |
| field_config.as_ref(), | |
| raw_bytes, | |
| ) | |
| .is_ok() | |
| { | |
| results.push(WorkerEntry::Match(json_buf.clone())); | |
| } | |
| json_buf.clear(); | |
| match write_packet_json( | |
| &mut json_buf, | |
| meta, | |
| dbuf, | |
| data.as_slice(), | |
| field_config.as_ref(), | |
| raw_bytes, | |
| ) { | |
| Ok(()) => results.push(WorkerEntry::Match(json_buf.clone())), | |
| Err(e) => results.push(WorkerEntry::Warning { | |
| number: meta.number, | |
| message: format!("serialization error: {e}"), | |
| }), | |
| } |
| FilterExpr::Where(clause) => { | ||
| if !SAFE_PROTOCOLS.iter().any(|&s| s == clause.protocol) { | ||
| return false; | ||
| } | ||
| // TCP stateful fields are unsafe even though TCP itself is safe. | ||
| if clause.protocol == "tcp" && UNSAFE_TCP_FIELDS.iter().any(|&f| f == clause.field) | ||
| { | ||
| return false; | ||
| } | ||
| true | ||
| } |
There was a problem hiding this comment.
FilterExpr::Where skips normalize_protocol_name unlike FilterExpr::Protocol
The Protocol arm explicitly normalises the name before comparing against SAFE_PROTOCOLS, but the Where arm compares clause.protocol directly. The tests pass because the SQL parser appears to lower-case protocol names, but if the two code paths are ever decoupled (e.g., WhereClause is constructed directly in tests or serialised/deserialised), a protocol stored with any casing other than lowercase ("TCP", "UDP") would not match the whitelist and would conservatively fall through to sequential scanning without any warning.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
…ential path Workers now report true per-batch packet counts so --progress reflects all processed packets, and serialization failures abort the run instead of silently dropping matched packets. Also normalize the protocol name in the Where arm of is_parallel_safe for symmetry with the Protocol arm.
Parallelize filter evaluation across worker threads, each with its own
DissectorRegistry(theDissectortrait isSendbut notSync).Changes
FilterExpr::is_parallel_safe: only filters over stateless L2-L4 protocols are parallelized. Filters depending on TCP reassembly state (http,dns,tls,tcp.stream_id,tcp.reassembly_in_progress, etc.) fall back to sequential scanning because per-worker registries would change stream ID assignment and reassembled dissection.src/tui/parallel_scan.rs). Workers reopen the capture file and mmap it independently (same pattern asBgIndexer, no newunsafe), pull 8192-packet chunks via a work-stealing cursor, and results are merged in chunk order. Live mode stays sequential. If all workers die early (e.g. the file disappeared), the scan reports failure and falls back to sequential instead of hanging.read: pipeline-parallel engine (src/parallel_read.rs) — reader thread batches packets round-robin to N workers over bounded channels; the merger reassembles output in strict round-robin order, so JSONL output is byte-identical to the sequential path. Used only for file input with a parallel-safe--filter; stdin streaming and the no-filter fast path are untouched.--count/--offset/--sample-rate/warnings keep their exact sequential semantics.--threadsflag >DSCT_THREADSenv > physical core count. Zero or unparsable values are structured errors (exit 2).benches/parallel_read.rs).On a 1M-packet pcap with
-f udp, wall time drops from 1.67s to 0.62s with 4 threads, with byte-identical output.Generated by Claude Code