Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 127 additions & 110 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ quic = ["packet-dissector/quic"]
stun = ["packet-dissector/stun"]

[dependencies]
packet-dissector = { version = "0.3.1", default-features = false }
packet-dissector-core = "0.3.1"
packet-dissector-pcap = "0.3.1"
packet-dissector = { version = "0.3.3", default-features = false }
packet-dissector-core = "0.3.3"
packet-dissector-pcap = "0.3.3"
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1", features = ["preserve_order"] }
Expand All @@ -149,12 +149,13 @@ nucleo-matcher = { version = "0.3", optional = true }
tempfile = { version = "3", optional = true }
rustix = { version = "1", features = ["stdio"], optional = true }
lru = { version = "0.16", optional = true }
num_cpus = "1"
sqlparser = "0.61.0"

[dev-dependencies]
assert_cmd = "2"
criterion = { version = "0.8", features = ["html_reports"] }
packet-dissector-test-alloc = "0.3.1"
packet-dissector-test-alloc = "0.3.3"
predicates = "3"
tempfile = "3"
nix = { version = "0.31", default-features = false, features = [
Expand All @@ -173,6 +174,10 @@ name = "tui_index"
harness = false
required-features = ["tui"]

[[bench]]
name = "parallel_read"
harness = false

# The profile that 'dist' will build with
[profile.dist]
inherits = "release"
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,19 @@ Include the original packet bytes (link-layer included) as a hex string under
dsct read capture.pcap --raw-bytes --count 1
```

Speed up filter evaluation on large files with `--threads`:

```bash
dsct read capture.pcap -f "udp" --no-limit --threads 4
DSCT_THREADS=4 dsct read capture.pcap -f "tcp.dst_port > 1024" --no-limit
```

`--threads` distributes dissection and filter evaluation across N worker
threads when the filter is stateless (L2–L4 protocols: `tcp`, `udp`, `ipv4`,
etc.). Filters that require TCP reassembly such as `http`, `dns`, `tls`, and
`tcp.stream_id` automatically fall back to sequential processing regardless of
`--threads`. Stdin input always uses the sequential path.

Inspect available fields and schemas:

```bash
Expand Down Expand Up @@ -258,6 +271,7 @@ Resource limits can be tuned via environment variables:
| `DSCT_MCP_TIMEOUT` | 300 | Timeout per tool execution in seconds |
| `DSCT_MCP_WRITE_BUFFER_SIZE` | 65536 | Stdout write buffer size in bytes |
| `DSCT_MCP_MAX_FILE_SIZE` | 10737418240 | Maximum capture file size in bytes |
| `DSCT_THREADS` | physical CPU count | Worker threads for `dsct read --filter` (see `--threads`) |

## Output

Expand Down
152 changes: 152 additions & 0 deletions benches/parallel_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//! Benchmark for parallel filter evaluation (`dsct read --threads`).
//!
//! Generates a synthetic pcap with mixed UDP and TCP packets and measures
//! the throughput of the parallel read engine at threads=1 vs threads=4,
//! writing matched records to [`io::sink()`].

use std::io;
use std::path::Path;

use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};

use dsct::parallel_read::{ParallelReadOptions, run};

// ---------------------------------------------------------------------------
// Pcap generation
// ---------------------------------------------------------------------------

/// Build a synthetic pcap with `n` packets alternating UDP and TCP.
///
/// Even-indexed packets: Ethernet + IPv4 + UDP (42 bytes).
/// Odd-indexed packets: Ethernet + IPv4 + TCP (54 bytes).
fn build_bench_pcap(n: usize) -> Vec<u8> {
let mut buf = Vec::with_capacity(24 + n * 60);

// Global header
buf.extend_from_slice(&0xA1B2C3D4u32.to_le_bytes());
buf.extend_from_slice(&2u16.to_le_bytes());
buf.extend_from_slice(&4u16.to_le_bytes());
buf.extend_from_slice(&0i32.to_le_bytes());
buf.extend_from_slice(&0u32.to_le_bytes());
buf.extend_from_slice(&65535u32.to_le_bytes());
buf.extend_from_slice(&1u32.to_le_bytes()); // Ethernet

// UDP packet template (42 bytes): Eth + IPv4 (UDP, 10.0.0.1→10.0.0.2)
let udp: &[u8] = &[
// Ethernet (14 bytes)
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // dst mac
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, // src mac
0x08, 0x00, // ethertype IPv4
// IPv4 (20 bytes)
0x45, 0x00, 0x00, 0x1c, // version/IHL, DSCP, total length (28)
0x00, 0x00, 0x00, 0x00, // id, flags+frag
0x40, 0x11, 0x00, 0x00, // TTL=64, proto=UDP, checksum=0
0x0a, 0x00, 0x00, 0x01, // src 10.0.0.1
0x0a, 0x00, 0x00, 0x02, // dst 10.0.0.2
// UDP (8 bytes)
0x10, 0x00, // src port 4096
0x10, 0x01, // dst port 4097
0x00, 0x08, // length 8
0x00, 0x00, // checksum
];

// TCP SYN packet template (54 bytes): Eth + IPv4 (TCP, 10.0.0.3→10.0.0.4)
let tcp: &[u8] = &[
// Ethernet (14 bytes)
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x08, 0x00,
// IPv4 (20 bytes)
0x45, 0x00, 0x00, 0x28, // total length = 40
0x00, 0x00, 0x00, 0x00, 0x40, 0x06, 0x00, 0x00, // TTL=64, proto=TCP
0x0a, 0x00, 0x00, 0x03, // src 10.0.0.3
0x0a, 0x00, 0x00, 0x04, // dst 10.0.0.4
// TCP (20 bytes)
0x30, 0x39, // src port 12345
0x07, 0xd0, // dst port 2000
0x00, 0x00, 0x00, 0x01, // seq
0x00, 0x00, 0x00, 0x00, // ack
0x50, 0x02, // data offset=5, SYN
0xff, 0xff, // window
0x00, 0x00, // checksum
0x00, 0x00, // urgent
];

for i in 0..n {
let pkt = if i % 2 == 0 { udp } else { tcp };
let ts_sec = (i / 1_000_000) as u32;
let ts_usec = (i % 1_000_000) as u32;
buf.extend_from_slice(&ts_sec.to_le_bytes());
buf.extend_from_slice(&ts_usec.to_le_bytes());
buf.extend_from_slice(&(pkt.len() as u32).to_le_bytes());
buf.extend_from_slice(&(pkt.len() as u32).to_le_bytes());
buf.extend_from_slice(pkt);
}

buf
}

/// Write a bench pcap to `path` and return the file size in bytes.
fn write_bench_pcap(path: &Path, n: usize) -> u64 {
let data = build_bench_pcap(n);
std::fs::write(path, &data).expect("write bench pcap");
data.len() as u64
}

// ---------------------------------------------------------------------------
// Benchmark
// ---------------------------------------------------------------------------

fn bench_parallel_read(c: &mut Criterion) {
const N: usize = 20_000; // ~20 k packets; each is 42 or 54 bytes

let path = std::env::temp_dir().join(format!(
"dsct_bench_parallel_{}_{}.pcap",
N,
std::process::id()
));
let file_size = write_bench_pcap(&path, N);

let mut group = c.benchmark_group("parallel_read");
group.sample_size(10);
group.throughput(Throughput::Bytes(file_size));

// Filter that is parallel-safe: match all UDP packets.
let filter = "udp";

for &threads in &[1usize, 4usize] {
group.bench_with_input(
BenchmarkId::new("udp_filter", format!("threads={threads}")),
&threads,
|b, &t| {
b.iter(|| {
let mut sink = io::sink();
run(
&ParallelReadOptions {
path: &path,
filter_str: filter,
decode_as_args: &[],
threads: t,
sample_rate: 1,
offset: 0,
count: None,
pn_filter: None,
field_config: None, // verbose mode
raw_bytes: false,
progress_interval: 0,
},
&mut sink,
&mut |_, _| {},
&mut |_, _| {},
)
.expect("parallel_read::run failed in benchmark");
});
},
);
}

group.finish();

let _ = std::fs::remove_file(&path);
}

criterion_group!(benches, bench_parallel_read);
criterion_main!(benches);
6 changes: 3 additions & 3 deletions src/field_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ const DEFAULT_CONFIG: &str = include_str!("default_fields.toml");
///
/// When a container field is included but has no nested patterns defined,
/// all of its sub-fields are shown (e.g., SRv6 `segments_structure`).
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct FieldConfig {
protocols: HashMap<String, FieldFilter>,
}

/// Per-protocol field filter with top-level and nested patterns.
#[derive(Debug)]
#[derive(Debug, Clone)]
struct FieldFilter {
/// Patterns for top-level field names (no dots).
top_level: PatternSet,
Expand All @@ -43,7 +43,7 @@ struct FieldFilter {
}

/// A set of patterns for matching field names.
#[derive(Debug)]
#[derive(Debug, Clone)]
struct PatternSet {
/// When `true`, all names match (used for `"parent.*"` patterns).
match_all: bool,
Expand Down
118 changes: 118 additions & 0 deletions src/filter_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,64 @@ impl FilterExpr {
FilterExpr::Protocol(_) | FilterExpr::Where(_) => false,
}
}

/// Returns `true` if this expression is safe to evaluate in parallel across
/// independently opened capture file chunks.
///
/// # Why some filters are unsafe for parallel evaluation
///
/// The `DissectorRegistry` contains a `TcpReassemblyService` (behind a
/// `Mutex`) that tracks TCP stream state across packets. Fields like
/// `tcp.stream_id` and `tcp.reassembly_in_progress` are assigned
/// sequentially in encounter order. Upper-layer protocols over TCP (HTTP,
/// TLS, SIP, DNS-over-TCP, etc.) are dissected from reassembled stream data,
/// which means their fields are only meaningful when packets are processed in
/// order. Splitting the capture into chunks and dissecting each chunk with
/// an independent registry changes `stream_id` assignment and may miss
/// reassembled data entirely.
///
/// The conservative whitelist below contains only protocols whose dissection
/// is deterministic per-packet and never depends on reassembled TCP payload
/// or other cross-packet state: link layers (ethernet, sll, sll2), ARP,
/// LACP, IPv4, IPv6, ICMP, ICMPv6, IGMP, UDP, TCP itself (excluding its
/// stateful fields), and SCTP.
pub fn is_parallel_safe(&self) -> bool {
/// Protocols whose per-packet dissection is state-free and therefore
/// safe to run in any order across parallel chunks.
const SAFE_PROTOCOLS: &[&str] = &[
"ethernet", "sll", "sll2", "arp", "lacp", "ipv4", "ipv6", "icmp", "icmpv6", "igmp",
"udp", "tcp", "sctp",
];
/// TCP fields that reflect cross-packet reassembly state; filtering on
/// these requires sequential processing.
const UNSAFE_TCP_FIELDS: &[&str] = &["stream_id", "reassembly_in_progress"];

match self {
FilterExpr::PacketNumber(_) => true,
FilterExpr::Not(e) => e.is_parallel_safe(),
FilterExpr::And(a, b) | FilterExpr::Or(a, b) => {
a.is_parallel_safe() && b.is_parallel_safe()
}
FilterExpr::Protocol(name) => {
let norm = crate::filter::normalize_protocol_name(name);
SAFE_PROTOCOLS.iter().any(|&s| s == norm)
}
FilterExpr::Where(clause) => {
// WhereClause::new already normalises the protocol name, but
// normalise again so this check does not silently depend on
// that constructor invariant (mirrors the Protocol arm).
let norm = crate::filter::normalize_protocol_name(&clause.protocol);
if !SAFE_PROTOCOLS.iter().any(|&s| s == norm) {
return false;
}
// TCP stateful fields are unsafe even though TCP itself is safe.
if norm == "tcp" && UNSAFE_TCP_FIELDS.iter().any(|&f| f == clause.field) {
return false;
}
true
}
Comment on lines +135 to +148

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 FilterExpr::Where skips normalize_protocol_name unlike FilterExpr::Protocol

The Protocol arm explicitly normalises the name before comparing against SAFE_PROTOCOLS, but the Where arm compares clause.protocol directly. The tests pass because the SQL parser appears to lower-case protocol names, but if the two code paths are ever decoupled (e.g., WhereClause is constructed directly in tests or serialised/deserialised), a protocol stored with any casing other than lowercase ("TCP", "UDP") would not match the whitelist and would conservatively fall through to sequential scanning without any warning.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -463,4 +521,64 @@ mod tests {
let pkt = make_tcp_packet_ref(&buf);
assert!(expr.matches(&pkt));
}

// --- is_parallel_safe ---

#[test]
fn parallel_safe_tcp() {
let expr = FilterExpr::parse("tcp").unwrap().unwrap();
assert!(expr.is_parallel_safe());
}

#[test]
fn parallel_safe_udp_and_ipv4() {
let expr = FilterExpr::parse("udp AND ipv4.src = '10.0.0.1'")
.unwrap()
.unwrap();
assert!(expr.is_parallel_safe());
}

#[test]
fn parallel_unsafe_http() {
let expr = FilterExpr::parse("http").unwrap().unwrap();
assert!(!expr.is_parallel_safe());
}

#[test]
fn parallel_unsafe_dns() {
let expr = FilterExpr::parse("dns").unwrap().unwrap();
assert!(!expr.is_parallel_safe());
}

#[test]
fn parallel_unsafe_tcp_stream_id() {
let expr = FilterExpr::parse("tcp.stream_id = 1").unwrap().unwrap();
assert!(!expr.is_parallel_safe());
}

#[test]
fn parallel_safe_not_tcp() {
let expr = FilterExpr::parse("NOT tcp").unwrap().unwrap();
assert!(expr.is_parallel_safe());
}

#[test]
fn parallel_unsafe_tcp_or_http() {
let expr = FilterExpr::parse("tcp OR http").unwrap().unwrap();
assert!(!expr.is_parallel_safe());
}

#[test]
fn parallel_safe_packet_number_between() {
let expr = FilterExpr::parse("packet_number BETWEEN 1 AND 10")
.unwrap()
.unwrap();
assert!(expr.is_parallel_safe());
}

#[test]
fn parallel_safe_tcp_dst_port() {
let expr = FilterExpr::parse("tcp.dst_port > 1024").unwrap().unwrap();
assert!(expr.is_parallel_safe());
}
}
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub mod mcp;
#[doc(hidden)]
pub mod output;
#[doc(hidden)]
pub mod parallel;
#[doc(hidden)]
pub mod parallel_read;
#[doc(hidden)]
pub mod schema;
#[doc(hidden)]
pub mod serialize;
Expand Down
Loading
Loading