Skip to content

feat: parallelize filter evaluation#33

Merged
higebu merged 6 commits into
mainfrom
feat/parallel-filter-eval
Jun 12, 2026
Merged

feat: parallelize filter evaluation#33
higebu merged 6 commits into
mainfrom
feat/parallel-filter-eval

Conversation

@higebu

@higebu higebu commented Jun 12, 2026

Copy link
Copy Markdown
Owner

Parallelize filter evaluation across worker threads, each with its own DissectorRegistry (the Dissector trait is Send but not Sync).

Changes

  • Update packet-dissector crates to 0.3.3.
  • Add FilterExpr::is_parallel_safe: only filters over stateless L2-L4 protocols are parallelized. Filters depending on TCP reassembly state (http, dns, tls, tcp.stream_id, tcp.reassembly_in_progress, etc.) fall back to sequential scanning because per-worker registries would change stream ID assignment and reassembled dissection.
  • TUI: parallel filter scan (src/tui/parallel_scan.rs). Workers reopen the capture file and mmap it independently (same pattern as BgIndexer, no new unsafe), pull 8192-packet chunks via a work-stealing cursor, and results are merged in chunk order. Live mode stays sequential. If all workers die early (e.g. the file disappeared), the scan reports failure and falls back to sequential instead of hanging.
  • CLI read: pipeline-parallel engine (src/parallel_read.rs) — reader thread batches packets round-robin to N workers over bounded channels; the merger reassembles output in strict round-robin order, so JSONL output is byte-identical to the sequential path. Used only for file input with a parallel-safe --filter; stdin streaming and the no-filter fast path are untouched. --count/--offset/--sample-rate/warnings keep their exact sequential semantics.
  • Thread count: --threads flag > DSCT_THREADS env > physical core count. Zero or unparsable values are structured errors (exit 2).
  • Tests: parallel/sequential equivalence on synthetic mixed pcaps (including an HTTP-over-TCP stream for the fallback case), limit/offset/sample interplay, error cases, stdin behavior; plus a Criterion benchmark (benches/parallel_read.rs).

On a 1M-packet pcap with -f udp, wall time drops from 1.67s to 0.62s with 4 threads, with byte-identical output.


Generated by Claude Code

higebu added 5 commits June 12, 2026 00:27
Add --threads flag and DSCT_THREADS env var to dsct read. When the
filter is parallel-safe (stateless L2-L4 protocols) and input is a
file, packets are distributed round-robin to N worker threads each
with their own DissectorRegistry. The merger re-assembles results in
strict round-robin order so output is byte-identical to the sequential
path. Reassembly-dependent filters (HTTP, DNS, TLS, tcp.stream_id, etc.)
and stdin input always fall back to sequential processing.

- src/parallel_read.rs: new library module with reader/worker/merger pipeline
- src/field_config.rs: derive Clone on FieldConfig and internal types
- src/main.rs: add --threads to ReadOptions; gate parallel path on
  is_parallel_safe + esp_sa_args.is_empty + resolved_threads > 1
- tests/cli_parallel_read_test.rs: TDD integration tests (12 tests)
- README.md: document --threads and DSCT_THREADS
Add benches/parallel_read.rs comparing parallel_read::run with
threads=1 vs threads=4 on a 20k-packet mixed UDP/TCP pcap,
writing to io::sink(). sample_size=10 keeps cargo bench fast.
Verified via cargo bench --bench parallel_read -- --test.
@github-actions

github-actions Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

🐰 Bencher Report

Branchfeat/parallel-filter-eval
Testbedubuntu-latest
Click to view all benchmark results
BenchmarkLatencyBenchmark Result
milliseconds (ms)
(Result Δ%)
Upper Boundary
milliseconds (ms)
(Limit %)
target/release/dsct read bench_input.pcap📈 view plot
🚷 view threshold
6.11 ms
(+1.04%)Baseline: 6.05 ms
6.89 ms
(88.77%)
target/release/dsct stats bench_input.pcap📈 view plot
🚷 view threshold
71.29 ms
(+2.84%)Baseline: 69.32 ms
85.42 ms
(83.46%)
🐰 View full continuous benchmarking report in Bencher

@codecov

codecov Bot commented Jun 12, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 91.55447% with 69 lines in your changes missing coverage. Please review.
✅ Project coverage is 91.05%. Comparing base (b3471a0) to head (4654b29).

Files with missing lines Patch % Lines
src/parallel_read.rs 88.42% 25 Missing ⚠️
src/tui/parallel_scan.rs 93.43% 17 Missing ⚠️
src/tui/filter_apply.rs 92.10% 12 Missing ⚠️
src/main.rs 86.66% 6 Missing ⚠️
src/tui/event.rs 20.00% 4 Missing ⚠️
src/tui/ui.rs 25.00% 3 Missing ⚠️
src/filter_expr.rs 98.27% 1 Missing ⚠️
src/tui/mod.rs 50.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #33      +/-   ##
==========================================
+ Coverage   91.03%   91.05%   +0.01%     
==========================================
  Files          76       79       +3     
  Lines       16804    17600     +796     
==========================================
+ Hits        15298    16026     +728     
- Misses       1506     1574      +68     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@greptile-apps

greptile-apps Bot commented Jun 12, 2026

Copy link
Copy Markdown

Greptile Summary

This PR adds parallel filter evaluation across worker threads for both the CLI read command and the TUI filter scan, using a conservative is_parallel_safe whitelist to restrict parallelism to stateless L2–L4 protocols while safely falling back to sequential for TCP-reassembly-dependent filters.

  • CLI read: a reader/worker/merger pipeline where the reader distributes batches round-robin, N workers dissect and filter independently with their own DissectorRegistry, and the merger reassembles results in strict round-robin order to guarantee byte-identical JSONL output; thread count is controlled via --threads / DSCT_THREADS / physical-CPU-count with zero/invalid values as structured errors (exit 2).
  • TUI: a work-stealing parallel scan that splits the packet index into 8 192-packet chunks; on worker failure (e.g. file disappeared) it detects ScanPoll::Failed and falls back to sequential scanning so the scan always terminates.
  • Benchmarks and tests: Criterion benchmark in benches/parallel_read.rs and integration tests covering parallel/sequential equivalence, limit/offset/sample interplay, error cases, and stdin passthrough.

Confidence Score: 4/5

The core parallel output path is safe to merge — byte-identical JSONL ordering is verified by integration tests, thread shutdown and stop-flag logic is clean, and the sequential fallback path is unchanged.

The round-robin merger correctly preserves packet order, the is_parallel_safe whitelist is conservative, and the fallback to sequential for unsafe filters (HTTP, DNS, TLS, tcp.stream_id) is tested. Two areas in src/parallel_read.rs warrant attention: the --progress callback fires on a different metric in parallel mode (written packets vs total processed), meaning users monitoring throughput will see inconsistent packets_processed values; and a serialization failure in a worker silently drops the matched packet rather than surfacing a warning. Neither affects the correctness of the primary output stream.

src/parallel_read.rs — the merger's progress-reporting path and the worker's serialization error handling.

Important Files Changed

Filename Overview
src/parallel_read.rs New pipeline-parallel engine for dsct read; output ordering via strict round-robin merger is correct, stop/shutdown logic is clean, but progress-callback semantics differ from the sequential path and serialization errors silently drop matched packets.
src/tui/parallel_scan.rs Work-stealing parallel filter scan for the TUI; cancel/drain/failure paths look correct, Failed→sequential fallback is safe, tests cover key edge cases including missing-file regression.
src/filter_expr.rs Adds is_parallel_safe with a conservative whitelist; Protocol arm normalises names, Where arm does not — relies implicitly on the SQL parser always lowercasing protocol names in WhereClause.
src/parallel.rs Clean thread-count resolver with flag > env > physical-CPU precedence; zero and unparsable values are structured errors; well-tested.
src/main.rs Parallel path gate in cmd_read is correct: validates decode-as/esp-sa up front, checks is_parallel_safe && !is_packet_number_only, excludes stdin and esp-sa paths, falls through to sequential cleanly.
src/tui/filter_apply.rs Parallel/sequential dispatch and fallback in the TUI filter tick are correct; ScanPoll::Failed re-parses the applied filter and restarts a sequential scan; filter_fraction exposes both paths uniformly.
tests/cli_parallel_read_test.rs Comprehensive integration tests: parallel/sequential byte-identity, fallback for unsafe filters, count/offset/sample-rate interplay, error cases (threads=0, bad env), and stdin passthrough.
src/tui/app.rs Adds parallel_scan, capture_path, and decode_as_args fields to App; initialization is clean and the new fields are correctly used by filter_apply.rs.

Sequence Diagram

sequenceDiagram
    participant Caller as cmd_read (calling thread)
    participant Reader as dsct-reader thread
    participant W0 as dsct-worker-0
    participant W1 as dsct-worker-N
    participant Merger as Merger (calling thread)

    Caller->>Reader: spawn (path, pn_filter, input_txs, stop)
    Caller->>W0: spawn (irx_0, otx_0, filter_str, ...)
    Caller->>W1: spawn (irx_N, otx_N, filter_str, ...)

    Reader->>W0: send InputBatch (round 0)
    Reader->>W1: send InputBatch (round 1)
    Reader->>W0: send InputBatch (round 2)

    W0->>Merger: send OutputBatch (results for batch 0)
    W1->>Merger: send OutputBatch (results for batch 1)
    W0->>Merger: send OutputBatch (results for batch 2)

    Merger->>Merger: recv round-robin (W0→W1→W0…)
    Merger->>Merger: apply sample_rate / offset / count

    alt count limit reached
        Merger->>Reader: stop.store(true)
        Merger-->>W0: drop output_rxs → workers exit
    end

    Merger->>Caller: ReadOutcome
    Caller->>Reader: join
    Caller->>W0: join (check panic)
    Caller->>W1: join (check panic)
Loading

Reviews (1): Last reviewed commit: "bench: add parallel filter scan benchmar..." | Re-trigger Greptile

Comment thread src/parallel_read.rs
Comment on lines +405 to +466
let mut packets_processed = 0u64;
let mut packets_written = 0u64;
let mut filter_matches = 0u64;
let mut results_matched = 0u64;
let mut truncated_by_limit = false;
let mut worker_idx = 0usize;

// Receive from workers in strict round-robin order (same order the reader
// sent batches to them), preserving global packet order.
'outer: loop {
match output_rxs[worker_idx].recv() {
Err(_) => {
// Channel closed: either stop flag is set (expected EOF / limit)
// or all workers finished (EOF). Either way, we are done.
break 'outer;
}
Ok(batch) => {
// Count packets represented in this batch for progress reporting.
let batch_count = batch.len() as u64;
packets_processed = packets_processed.saturating_add(batch_count);

for entry in batch {
match entry {
WorkerEntry::Warning { number, message } => {
warn(number, &message);
}
WorkerEntry::Match(json_bytes) => {
filter_matches += 1;
if sample_rate > 1 && !(filter_matches - 1).is_multiple_of(sample_rate)
{
continue;
}
results_matched += 1;
if results_matched <= offset {
continue;
}
writer.write_all(&json_bytes)?;
writer.write_all(b"\n")?;
packets_written += 1;

if let Some(max) = count
&& packets_written >= max
{
truncated_by_limit = true;
stop.store(true, Ordering::Relaxed);
break 'outer;
}
}
}
}

// Progress reporting at batch granularity.
if progress_interval > 0
&& packets_written > 0
&& packets_written.is_multiple_of(progress_interval)
{
progress(packets_processed, packets_written);
}

worker_idx = (worker_idx + 1) % n;
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Progress callback fires on packets_written instead of packets_processed

The sequential path emits progress every interval total-packets-processed (packets_processed.is_multiple_of(interval)), but the merger fires every interval written (matched) packets. For a sparse filter (e.g., 0.1% match rate) with --progress 10000, sequential would emit ~10 progress events on a 100 K-packet file; parallel would emit 0 (only 100 packets match). Additionally, the packets_processed value passed to the callback only counts WorkerEntry items (matches + warnings), so progress JSON will report a far lower packets_processed than the sequential path does for the same file, breaking any tool that monitors total throughput via --progress.

Comment thread src/parallel_read.rs
Comment on lines +365 to +377
json_buf.clear();
if write_packet_json(
&mut json_buf,
meta,
dbuf,
data.as_slice(),
field_config.as_ref(),
raw_bytes,
)
.is_ok()
{
results.push(WorkerEntry::Match(json_buf.clone()));
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Matched packet silently dropped when serialization returns Err

If write_packet_json returns an error, the packet passes the filter but is never pushed as WorkerEntry::Match and is silently dropped — no warning, no error, no output line. The sequential path propagates the same error with ?, which would surface it as a structured error. Writing to Vec<u8> cannot fail today, but the inconsistent contract means a future refactor that allows I/O errors on the JSON path would silently lose matched packets in parallel mode while erroring in sequential mode.

Suggested change
json_buf.clear();
if write_packet_json(
&mut json_buf,
meta,
dbuf,
data.as_slice(),
field_config.as_ref(),
raw_bytes,
)
.is_ok()
{
results.push(WorkerEntry::Match(json_buf.clone()));
}
json_buf.clear();
match write_packet_json(
&mut json_buf,
meta,
dbuf,
data.as_slice(),
field_config.as_ref(),
raw_bytes,
) {
Ok(()) => results.push(WorkerEntry::Match(json_buf.clone())),
Err(e) => results.push(WorkerEntry::Warning {
number: meta.number,
message: format!("serialization error: {e}"),
}),
}

Comment thread src/filter_expr.rs
Comment on lines +135 to +145
FilterExpr::Where(clause) => {
if !SAFE_PROTOCOLS.iter().any(|&s| s == clause.protocol) {
return false;
}
// TCP stateful fields are unsafe even though TCP itself is safe.
if clause.protocol == "tcp" && UNSAFE_TCP_FIELDS.iter().any(|&f| f == clause.field)
{
return false;
}
true
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 FilterExpr::Where skips normalize_protocol_name unlike FilterExpr::Protocol

The Protocol arm explicitly normalises the name before comparing against SAFE_PROTOCOLS, but the Where arm compares clause.protocol directly. The tests pass because the SQL parser appears to lower-case protocol names, but if the two code paths are ever decoupled (e.g., WhereClause is constructed directly in tests or serialised/deserialised), a protocol stored with any casing other than lowercase ("TCP", "UDP") would not match the whitelist and would conservatively fall through to sequential scanning without any warning.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

…ential path

Workers now report true per-batch packet counts so --progress reflects
all processed packets, and serialization failures abort the run instead
of silently dropping matched packets. Also normalize the protocol name
in the Where arm of is_parallel_safe for symmetry with the Protocol arm.
@higebu higebu merged commit c912482 into main Jun 12, 2026
16 checks passed
@higebu higebu deleted the feat/parallel-filter-eval branch June 12, 2026 03:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant