From 0dab88fb9e0da41cfca5b5a42b60e848286af59d Mon Sep 17 00:00:00 2001 From: Yuya Kusakabe Date: Fri, 12 Jun 2026 00:27:17 +0000 Subject: [PATCH 1/6] chore: update packet-dissector crates to 0.3.3 --- Cargo.lock | 220 ++++++++++++++++++++++++++--------------------------- Cargo.toml | 8 +- 2 files changed, 114 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a408461..ae56266 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1067,9 +1067,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "packet-dissector" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9c41271b1ce5ce1b19858bf4246748165bda26b4215d26b96639f598081ca90" +checksum = "7aff481313c7d5de03560c724b3119dad9a2c1ecef6a903ddc6845d91ea0f0da" dependencies = [ "packet-dissector-ah", "packet-dissector-arp", @@ -1127,90 +1127,90 @@ dependencies = [ [[package]] name = "packet-dissector-ah" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a77d3b2e2f02176268396bfbd07f1908c9c523bf7a2fc043e0030294924d4d97" +checksum = "4baf14545c0cb5719de713871b7b625efa3705e6f9d256dafe9c9e3cf3f1fe99" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-arp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8978194aa8ce5e5ccfd93a839f8224579e1d9030a06195772daa880cf72a1e" +checksum = "18bbe811d07d2399217e3a70aaededb27e9456e074962a1f772674e425f8e00e" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-bfd" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "584da89fad456bf9694605c38d1ec3eb5e8e40f8c8e1bb2ebf9e2b91737e9a0e" +checksum = "90997c85608db6857bf251d9b8d10ad944673ffd807fad8d05cedf9278d6862a" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-bgp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb973b5100601a847af0d68e69d0812e007ebec98bf0d21421d7300c24031cd6" +checksum = "cb60b7370e8a97ae5b3be6ad48477ec1dc2cf2944d1946430a3f20bec8928505" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-core" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbe2393b32434970ed28832463652bfa9095b9502ce309904de1d38f15260b79" +checksum = "e47d5d81ba2275f91e9e04de25d243a2b9093f8dadccb6ab82b613deb7f83945" dependencies = [ "thiserror 2.0.18", ] [[package]] name = "packet-dissector-dhcp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "779aac11aef58ce703d32b9ab9658262006ac24d02d31da81cfef0025a10a0b8" +checksum = "b7158fdf52a30cdf3fe7c5ae1ada19c6815fdfb7e0a2b6b141d48d1d37fd3881" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-dhcpv6" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f15b8787d38985631c8088837f3669663eb8949251ece5f3665f16411c1f1f42" +checksum = "f6270ea698d46c7acd226a20545df36868a4da01d2cafa51e3c2841dcb7b4d9e" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-diameter" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c257f8ec78f1aaae89453afa45d18c0df894db0f0682aa4f20b34fc093356e69" +checksum = "461ab63b1b149c407b5d8f0523b58f61ce3f3da3a485a4404fca0985a1fa56a9" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-dns" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04b82a48cfda98adb9eaaba80360c84ea9665dc74911c43b27d38d9a77105a80" +checksum = "ab3aa0748b6e59c366974169456bdbab56bcabfebf83ad783444671f4f4a0abd" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-esp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56b7e9f1c04a9fab4860ca289cc36f520b71e7fdb6f49c3140d5cdf371ad198f" +checksum = "36e7517029dee571e19549c347189db020f68da770f3eb06fa55fc12a241e334" dependencies = [ "aes 0.9.0", "aes-gcm", @@ -1220,45 +1220,45 @@ dependencies = [ [[package]] name = "packet-dissector-ethernet" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ade85bfbaab129c1d3b14ff25fc02a52c0d4267825f23e1bb8f699431121c833" +checksum = "fed2dfdee62f9fafd07a57705aee55edd6e2437439d406d26b53ffe27031ca53" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-geneve" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b96763af2254197b75bc6cd46aa25248067f89cc776fed1ede7bd9fc712b7497" +checksum = "3f2a5261588d14d7a7c0b36071989ee2af4421eaa0ab5190df1720fc369a7992" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-gre" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b31604e8138dabf1204e2e80745c45ade9b5288ea45b18c7803aafc644238f3d" +checksum = "87cba09a0d17d492ef82344aaf65cf97fd7d481150eb6e15de78ea278798e2db" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-gtpv1u" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a732218c83b9d4eba3f772a7f730176e0e64734f0eeda813fda558a07b5d2c3e" +checksum = "6652730f3b1d801d7571f077fe211cb090e38fe16f513e7318f6751f050af727" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-gtpv2c" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ded13b16dd5b20947b041e0f46bba3d9c700c6ecad5f490cce466612c7d3585" +checksum = "f00d4de22e17aaaf36c44c033ffa0df89316365d9cb9c13a29df93207202ecc0" dependencies = [ "packet-dissector-core", "packet-dissector-ppp", @@ -1266,9 +1266,9 @@ dependencies = [ [[package]] name = "packet-dissector-http" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac338723b914ef3bf9cb414d9b7a351a09dbba0f314314c5df40b9417d2a4804" +checksum = "0bd7ceb17c0314a7d1d650586a64f095f51819c073a2bdb9357dcfe69d44caa0" dependencies = [ "httparse", "packet-dissector-core", @@ -1276,135 +1276,135 @@ dependencies = [ [[package]] name = "packet-dissector-http2" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "187f38bb089dcac8ced23a0787fa218c7e53a5d526b3b3c57fc9c15ac61c9ff9" +checksum = "f4d8a78bbc6732d9632acf007fafd5d58f54601e9cab40238d063f97faecd87c" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-icmp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c211786a5e10ca3e7a8615986df20131c0501f0fa87eb42f6880c800022d2f93" +checksum = "1ab1e9de3717eb9c65c77be3e9735659915993343a950af8edf0c0663a4c156d" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-icmpv6" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "204be4b49a164f3decc8073909c85a9c5e39c24c01256486fcc6f89e4324a282" +checksum = "6c8f6cc20840507687ceb32d60a01951ce14414bb30c3432d9e764e35d422ba3" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-igmp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9078152a15530693a661eb854c9ec1c93ebb6ec644ab4f8455d6e8c096a3e5ef" +checksum = "95c216ebdfd106a3313c68cbcfbf22a04aa2aa630529d8d5ddd8334cb11263e6" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-ike" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "336925c6c17294ae35dea5e5fce18aae7bf08610e5b6e287b17d2101c804ff34" +checksum = "c6fb88a4f2262015fdefa0dec3a8f210a4338f9571889b552496395e4884106b" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-ipv4" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0ca2ba66fe82af9ea3d78ce3d96f46a5e0efaa1c3a7a4ffe04d61f655b700be" +checksum = "adb8d1dc6b6b569981edb1165e9d61ba2a51846b65447d38296fe70fae2f6901" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-ipv6" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ffd21462d8601e4d504be74b350532fdf28859bb57da096754be7e8b347022a" +checksum = "08a5f92088854e53dba10fb6f8d06e34cca3136ae17573514ff714e943953a2f" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-isis" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f07713a8de56cef9fb3addca0d2e28b1607b0a192a37e0f2df6bda481a881d1f" +checksum = "5acf10c4fec5535de953abe435cac05911fd433fa5359a590b3aa042ed349fbd" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-l2tp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d281f1534ddb842bd977bb665d2fabd44eb44265c6f5d53c84982f678d2d12ed" +checksum = "eb1519c4abaa4992b8978df731a8e7b9480449f5a9411cb801bdb4ec8763cc5d" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-l2tpv3" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a65e58d306742ceb137d45cc8893f922f57aedb9f1f5bf6c2a03cddacd8705f3" +checksum = "9d7245ff07d15b59b40bc4e36022acd8ee903fa7a821b5e5f6743bfcf0687447" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-lacp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da000cbc219df22b558a3f3388d783987e2418eaaf4452a5dde92ca1758fddc0" +checksum = "d872fb634a74e32b15c0e793af59106c128e69e3df5147f46fdf7b880b27e873" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-linux-sll" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f727c62ee9eb1a2a7f7dc26598d6a7a2b68dc4a4da55a32a2f34d0d089cd69f" +checksum = "5724ad0b407506d7688e6a57bdb486c8246c68f0ab6135f540c4b9e5a62b12bb" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-linux-sll2" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55b02587f4d8e15acfac5a0308b9f8bfc90618e9d50d58acb0468935da040a28" +checksum = "ebd7286d170d44e427ce4f922fa6df3f902d5b1eca5c58483e305a7357c3685d" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-lldp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22255697ef38c769cdc78e7957ea6ce7eb177e400ce52ace37a9ff1c742bc43d" +checksum = "9d57a76a1f028b3ad2e58a8e83322ca16837d833b5076bec2c88b56f01c1c228" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-mdns" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8a0b4804590485b73f07b811e81d033c45fc3455645abcb2059d17fff620ea7" +checksum = "a6e8b465c8687310e589d0ae0b841a3c0ab20174dda2dbb58ca777652890c777" dependencies = [ "packet-dissector-core", "packet-dissector-dns", @@ -1412,27 +1412,27 @@ dependencies = [ [[package]] name = "packet-dissector-mpls" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d9308e0ebb0b634d3dfbbff5a2dfa9fbc39fe5a63a466aaa15bd51c41d62a4b" +checksum = "29ee24abec2eab08ce342d572e7ae07db5441b7b33e679f7c3cff6ad622f6333" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-nas5g" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf77d53cf54e2487baefd756433284f0d82f26f7f6c9a0c9a1bd89e550dd238" +checksum = "1ec2d1b9590123b6618e445c59ce736a2c8e8218e3115556b86c8a2f437bfc01" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-ngap" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8728339486182f55814f0b28bd08080b351f6bd626b60623404246264f6d44d3" +checksum = "c09c809bdf5fd8c7ad8eee98ab7046e509b38a528bdb836fedfbb9453b46a88a" dependencies = [ "packet-dissector-core", "packet-dissector-nas5g", @@ -1440,99 +1440,99 @@ dependencies = [ [[package]] name = "packet-dissector-ntp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e982787c413d374c49a6cfdf3382113b2e2d6af7334515cfe103a7c9a10c190b" +checksum = "58c31223031adf516f19584cf6595ab7ebffc718dad0c847252d92a8a7b8ba35" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-ospf" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fc99faa16605ef0a88b64e970f75e5b6f5d6efc69f51836079f244bb99e104f" +checksum = "9039421d40facd58d0c5a0b12a60f3191c4f40633e174c31f6a0e51b93c1c3b3" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-pcap" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e10140863afc934a46b4bbfb24108588247e468d4961fae80989d0177c428c35" +checksum = "dd8d97497c0dfdbeb998e91bdcf4839406c132858b501e6b7c4172ef109d054e" dependencies = [ "thiserror 2.0.18", ] [[package]] name = "packet-dissector-pfcp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04679fff6b97d9532463dc656b38ecee46685faab3466c59e9536ee2e3503256" +checksum = "809adc63d1447a694fcd954c96e82719c497b2d044e6e80162a90a50dca86d0b" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-ppp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a89373a7590157ef6adbf2a5c153fffe72fb8e18b9ef362cf55cdd0dbb2c8d15" +checksum = "941e28b396194c78460ac4478e3211c0126b25e72fd131288cc0b1474574c710" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-quic" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efcbdb19b32263aa672d7e5693e9d9599287bb61f3f7e16a38ca2750b13248d8" +checksum = "23ace00801507595c11e7236fb3697e87554be79d4de0b70f2f14b728677e3a1" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-radius" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa8a669d5e28175ef2d11a5e9d9818233b5449e6b06bb8d43118e8fc061c8de6" +checksum = "88deaa62ba6563a1c4b5ca67c177a11a84fbe319bb8c762fc61922f5314ac236" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-reassembly" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3561ea5b8dd66e6aa0f8ea49e1825b0f25b7fcfefb5e684676ec352c06dd06a" +checksum = "7cb8e1cd76a499998f85a1f92727609636b9927d1d9c3fb8f0bdc3c8e9598dbf" dependencies = [ "thiserror 2.0.18", ] [[package]] name = "packet-dissector-rtp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b56e81b1928fd61da1e8514af9e12807f78b12914b0cc2202e94668d86ac391a" +checksum = "bcfe86e6a11cba4f5db363efe5020cbce12accd59a34d8460ea55d3ce4b6b432" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-sctp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ed23e5b3d1d5a19aa477bcf1ed6c344bcb2cb1084df5f6af738ccdb28f9766a" +checksum = "0ec8ab5ab0e32772050dc7f9e22681b1f18dca816a95c201a8b7659477e9a5ad" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-sip" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99d8c35dc5b2379e162157735094208bb1d3a999c22c70477cd7b3533ec8de12" +checksum = "c37a3f2f04121d139d8982698a3539bf40f3c1e078e38ed05f13a4467cdf3594" dependencies = [ "httparse", "packet-dissector-core", @@ -1540,81 +1540,81 @@ dependencies = [ [[package]] name = "packet-dissector-srv6" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "256048548f319f875ee309d3305991bea5bf5d236743d289b0da695ee7087cad" +checksum = "cc0d519e9939289df40c9077caee46452fdccf8890dac3d0a02195d6142caa7f" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-stp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fdbcb74d01a043b5732176bce07ee32c536b58ba33cbce92fca35ed8da6576" +checksum = "a6b818a3b752914c113f8a4478a429b9e4c23e4ab9a46c1d1b80332df81b3132" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-stun" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a53c370aec7c05bf3f9561ef84aa9241ce30fdec67f1974588d71aaf3e0ed5f9" +checksum = "18e8a343c1e90e6064b4575bf7ca6395e7e1d17c6db9552df68e714ad6240539" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-tcp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af61f36a12c4fa2e8ffdf361752ccbf2251590bad13daf22f7dd24e36e9534d2" +checksum = "7b95c4364cb4d6c5c4386db05982f62a810a0feb56f03a1439881d786b0bd1f2" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-test-alloc" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fa9295f5419b5dc62ae3d2551efa6c4b85d4c9e49f646ecc54f4e86357639c4" +checksum = "9c006aa4f5f416d4612ade5ea3aeb60335b58caea46b7ebbda60365438767858" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-tls" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33db1e21b7fc4ff9bdd837ec0801a2822e153c1a7b6750f37b0f91907816e10d" +checksum = "9b16d9166d0a749b0406e3e538a97b4dad1ea42198439c3fcdf533fa3b32a1cb" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-udp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4cb5390b37ba19c9c08d2b4c6eda0c176acc64b20268863d67de48b15644e20" +checksum = "18224e55d3a21709288bbc35b53b76004b6691d27a21c1f2dcacb528f8b8383c" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-vrrp" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "421963fb20e54faf9622e131f8ac191c0baed4c70b83ccd15dd0bb806f017fa7" +checksum = "30c1b7486df10a9f0d6fa139a2180d9e874b1cd1ecfc3d075c9bd08053625b33" dependencies = [ "packet-dissector-core", ] [[package]] name = "packet-dissector-vxlan" -version = "0.3.1" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2ed26d458603bde93bdd0293a1b06936daab53134bea68559c32145b27265d" +checksum = "0e28f6daa229fb67a544e4ac6a82af89e93b598a33e33231545d61c5636454cd" dependencies = [ "packet-dissector-core", ] diff --git a/Cargo.toml b/Cargo.toml index 664420c..7407774 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,9 +132,9 @@ quic = ["packet-dissector/quic"] stun = ["packet-dissector/stun"] [dependencies] -packet-dissector = { version = "0.3.1", default-features = false } -packet-dissector-core = "0.3.1" -packet-dissector-pcap = "0.3.1" +packet-dissector = { version = "0.3.3", default-features = false } +packet-dissector-core = "0.3.3" +packet-dissector-pcap = "0.3.3" clap = { version = "4", features = ["derive"] } serde = { version = "1", features = ["derive"] } serde_json = { version = "1", features = ["preserve_order"] } @@ -154,7 +154,7 @@ sqlparser = "0.61.0" [dev-dependencies] assert_cmd = "2" criterion = { version = "0.8", features = ["html_reports"] } -packet-dissector-test-alloc = "0.3.1" +packet-dissector-test-alloc = "0.3.3" predicates = "3" tempfile = "3" nix = { version = "0.31", default-features = false, features = [ From 8e7ec9743adb6477cbac6f347a0dc2ae21809af7 Mon Sep 17 00:00:00 2001 From: Yuya Kusakabe Date: Fri, 12 Jun 2026 00:30:49 +0000 Subject: [PATCH 2/6] feat: add thread pool sizing and filter parallel-safety check --- Cargo.lock | 17 +++++++ Cargo.toml | 1 + src/filter_expr.rs | 115 +++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 + src/parallel.rs | 91 +++++++++++++++++++++++++++++++++++ 5 files changed, 226 insertions(+) create mode 100644 src/parallel.rs diff --git a/Cargo.lock b/Cargo.lock index ae56266..f362637 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -597,6 +597,7 @@ dependencies = [ "memmap2", "nix", "nucleo-matcher", + "num_cpus", "packet-dissector", "packet-dissector-core", "packet-dissector-pcap", @@ -765,6 +766,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "httparse" version = "1.10.1" @@ -1023,6 +1030,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "num_threads" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index 7407774..199b505 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,6 +149,7 @@ nucleo-matcher = { version = "0.3", optional = true } tempfile = { version = "3", optional = true } rustix = { version = "1", features = ["stdio"], optional = true } lru = { version = "0.16", optional = true } +num_cpus = "1" sqlparser = "0.61.0" [dev-dependencies] diff --git a/src/filter_expr.rs b/src/filter_expr.rs index 4e8c7a8..f0e9daf 100644 --- a/src/filter_expr.rs +++ b/src/filter_expr.rs @@ -90,6 +90,61 @@ impl FilterExpr { FilterExpr::Protocol(_) | FilterExpr::Where(_) => false, } } + + /// Returns `true` if this expression is safe to evaluate in parallel across + /// independently opened capture file chunks. + /// + /// # Why some filters are unsafe for parallel evaluation + /// + /// The `DissectorRegistry` contains a `TcpReassemblyService` (behind a + /// `Mutex`) that tracks TCP stream state across packets. Fields like + /// `tcp.stream_id` and `tcp.reassembly_in_progress` are assigned + /// sequentially in encounter order. Upper-layer protocols over TCP (HTTP, + /// TLS, SIP, DNS-over-TCP, etc.) are dissected from reassembled stream data, + /// which means their fields are only meaningful when packets are processed in + /// order. Splitting the capture into chunks and dissecting each chunk with + /// an independent registry changes `stream_id` assignment and may miss + /// reassembled data entirely. + /// + /// The conservative whitelist below contains only protocols whose dissection + /// is deterministic per-packet and never depends on reassembled TCP payload + /// or other cross-packet state: link layers (ethernet, sll, sll2), ARP, + /// LACP, IPv4, IPv6, ICMP, ICMPv6, IGMP, UDP, TCP itself (excluding its + /// stateful fields), and SCTP. + pub fn is_parallel_safe(&self) -> bool { + /// Protocols whose per-packet dissection is state-free and therefore + /// safe to run in any order across parallel chunks. + const SAFE_PROTOCOLS: &[&str] = &[ + "ethernet", "sll", "sll2", "arp", "lacp", "ipv4", "ipv6", "icmp", "icmpv6", "igmp", + "udp", "tcp", "sctp", + ]; + /// TCP fields that reflect cross-packet reassembly state; filtering on + /// these requires sequential processing. + const UNSAFE_TCP_FIELDS: &[&str] = &["stream_id", "reassembly_in_progress"]; + + match self { + FilterExpr::PacketNumber(_) => true, + FilterExpr::Not(e) => e.is_parallel_safe(), + FilterExpr::And(a, b) | FilterExpr::Or(a, b) => { + a.is_parallel_safe() && b.is_parallel_safe() + } + FilterExpr::Protocol(name) => { + let norm = crate::filter::normalize_protocol_name(name); + SAFE_PROTOCOLS.iter().any(|&s| s == norm) + } + FilterExpr::Where(clause) => { + if !SAFE_PROTOCOLS.iter().any(|&s| s == clause.protocol) { + return false; + } + // TCP stateful fields are unsafe even though TCP itself is safe. + if clause.protocol == "tcp" && UNSAFE_TCP_FIELDS.iter().any(|&f| f == clause.field) + { + return false; + } + true + } + } + } } #[cfg(test)] @@ -463,4 +518,64 @@ mod tests { let pkt = make_tcp_packet_ref(&buf); assert!(expr.matches(&pkt)); } + + // --- is_parallel_safe --- + + #[test] + fn parallel_safe_tcp() { + let expr = FilterExpr::parse("tcp").unwrap().unwrap(); + assert!(expr.is_parallel_safe()); + } + + #[test] + fn parallel_safe_udp_and_ipv4() { + let expr = FilterExpr::parse("udp AND ipv4.src = '10.0.0.1'") + .unwrap() + .unwrap(); + assert!(expr.is_parallel_safe()); + } + + #[test] + fn parallel_unsafe_http() { + let expr = FilterExpr::parse("http").unwrap().unwrap(); + assert!(!expr.is_parallel_safe()); + } + + #[test] + fn parallel_unsafe_dns() { + let expr = FilterExpr::parse("dns").unwrap().unwrap(); + assert!(!expr.is_parallel_safe()); + } + + #[test] + fn parallel_unsafe_tcp_stream_id() { + let expr = FilterExpr::parse("tcp.stream_id = 1").unwrap().unwrap(); + assert!(!expr.is_parallel_safe()); + } + + #[test] + fn parallel_safe_not_tcp() { + let expr = FilterExpr::parse("NOT tcp").unwrap().unwrap(); + assert!(expr.is_parallel_safe()); + } + + #[test] + fn parallel_unsafe_tcp_or_http() { + let expr = FilterExpr::parse("tcp OR http").unwrap().unwrap(); + assert!(!expr.is_parallel_safe()); + } + + #[test] + fn parallel_safe_packet_number_between() { + let expr = FilterExpr::parse("packet_number BETWEEN 1 AND 10") + .unwrap() + .unwrap(); + assert!(expr.is_parallel_safe()); + } + + #[test] + fn parallel_safe_tcp_dst_port() { + let expr = FilterExpr::parse("tcp.dst_port > 1024").unwrap().unwrap(); + assert!(expr.is_parallel_safe()); + } } diff --git a/src/lib.rs b/src/lib.rs index 31e1c13..0e6ca46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,8 @@ pub mod mcp; #[doc(hidden)] pub mod output; #[doc(hidden)] +pub mod parallel; +#[doc(hidden)] pub mod schema; #[doc(hidden)] pub mod serialize; diff --git a/src/parallel.rs b/src/parallel.rs new file mode 100644 index 0000000..1f03420 --- /dev/null +++ b/src/parallel.rs @@ -0,0 +1,91 @@ +//! Thread pool sizing helpers for parallel filter evaluation. + +/// Environment variable name for overriding the thread count. +pub const THREADS_ENV: &str = "DSCT_THREADS"; + +/// Resolve the number of worker threads to use for parallel operations. +/// +/// Precedence: explicit `flag` argument > [`THREADS_ENV`] environment variable > +/// physical CPU count (via `num_cpus::get_physical()`). +/// +/// Returns `Err` if: +/// - `flag` is `Some(0)` (zero is not a valid thread count) +/// - The environment variable is set but cannot be parsed as `usize` +/// - The environment variable is set to `"0"` +pub fn resolve_thread_count(flag: Option) -> Result { + resolve_thread_count_from(flag, std::env::var(THREADS_ENV).ok().as_deref()) +} + +/// Testable inner implementation that accepts the env value as a parameter. +/// +/// This avoids `std::env::set_var` in tests (which is not safe to call from +/// multi-threaded test processes). +pub fn resolve_thread_count_from( + flag: Option, + env_value: Option<&str>, +) -> Result { + if let Some(n) = flag { + if n == 0 { + return Err("thread count must be at least 1 (got 0)".to_string()); + } + return Ok(n); + } + if let Some(val) = env_value { + match val.parse::() { + Ok(0) => { + return Err(format!( + "{THREADS_ENV}=0 is not valid; thread count must be at least 1" + )); + } + Ok(n) => return Ok(n), + Err(_) => { + return Err(format!( + "cannot parse {THREADS_ENV}={val:?} as a thread count" + )); + } + } + } + Ok(num_cpus::get_physical()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn explicit_flag_takes_precedence() { + assert_eq!(resolve_thread_count_from(Some(4), Some("2")), Ok(4)); + } + + #[test] + fn env_used_when_no_flag() { + assert_eq!(resolve_thread_count_from(None, Some("3")), Ok(3)); + } + + #[test] + fn default_is_physical_cores() { + let n = resolve_thread_count_from(None, None).unwrap(); + assert!(n >= 1); + } + + #[test] + fn flag_zero_is_error() { + assert!(resolve_thread_count_from(Some(0), None).is_err()); + } + + #[test] + fn env_zero_is_error() { + assert!(resolve_thread_count_from(None, Some("0")).is_err()); + } + + #[test] + fn env_unparsable_is_error() { + assert!(resolve_thread_count_from(None, Some("abc")).is_err()); + } + + #[test] + fn env_negative_is_error() { + // "-1" cannot be parsed as usize + assert!(resolve_thread_count_from(None, Some("-1")).is_err()); + } +} From 6a645a85d87778991f74b3d356f20ffbc4e6a62f Mon Sep 17 00:00:00 2001 From: Yuya Kusakabe Date: Fri, 12 Jun 2026 02:20:54 +0000 Subject: [PATCH 3/6] feat(tui): parallelize filter scan with per-worker registries --- src/tui/app.rs | 39 +++- src/tui/event.rs | 8 +- src/tui/filter_apply.rs | 277 +++++++++++++++++++++-- src/tui/keys.rs | 1 + src/tui/mod.rs | 5 +- src/tui/parallel_scan.rs | 474 +++++++++++++++++++++++++++++++++++++++ src/tui/test_util.rs | 1 + src/tui/ui.rs | 6 +- 8 files changed, 789 insertions(+), 22 deletions(-) create mode 100644 src/tui/parallel_scan.rs diff --git a/src/tui/app.rs b/src/tui/app.rs index a9e857c..12b5938 100644 --- a/src/tui/app.rs +++ b/src/tui/app.rs @@ -49,6 +49,12 @@ pub struct App { pub filter: FilterState, /// In-progress filter scan (None = idle). pub filter_progress: Option, + /// In-progress parallel filter scan (None = idle). + pub parallel_scan: Option, + /// Path of the capture file (None in live mode). + pub capture_path: Option, + /// Decode-as arguments for rebuilding registries in parallel workers. + pub decode_as_args: Vec, /// Maximized pane (None = normal layout). pub maximized_pane: Option, /// Per-pane height weights [packet_list, detail_tree, hex_dump]. @@ -92,6 +98,7 @@ impl App { indices: Vec, registry: DissectorRegistry, file_path: &std::path::Path, + decode_as_args: Vec, ) -> Self { let file_name = file_path .file_name() @@ -115,6 +122,9 @@ impl App { hex_dump: HexDumpState::default(), filter: FilterState::default(), filter_progress: None, + parallel_scan: None, + capture_path: Some(file_path.to_path_buf()), + decode_as_args, maximized_pane: None, pane_weights: DEFAULT_PANE_WEIGHTS, pending_count: String::new(), @@ -144,6 +154,7 @@ impl App { indices: Vec, registry: DissectorRegistry, copier: StdinCopier, + decode_as_args: Vec, ) -> Self { let completion_engine = CompletionEngine::from_registry(®istry); // Start at 0 so the first live_tick() ingests any data that was @@ -166,6 +177,9 @@ impl App { hex_dump: HexDumpState::default(), filter: FilterState::default(), filter_progress: None, + parallel_scan: None, + capture_path: None, + decode_as_args, maximized_pane: None, pane_weights: DEFAULT_PANE_WEIGHTS, pending_count: String::new(), @@ -531,6 +545,7 @@ mod tests { indices, DissectorRegistry::default(), std::path::Path::new("test.pcap"), + vec![], ); let _ = std::fs::remove_file(&path); app @@ -1823,7 +1838,13 @@ mod tests { handle: None, }; - let app = App::new_live(capture, indices, DissectorRegistry::default(), copier); + let app = App::new_live( + capture, + indices, + DissectorRegistry::default(), + copier, + vec![], + ); (app, tmp) } @@ -2185,7 +2206,13 @@ mod tests { }; let indices = Vec::new(); - let mut app = App::new_live(capture, indices, DissectorRegistry::default(), copier); + let mut app = App::new_live( + capture, + indices, + DissectorRegistry::default(), + copier, + vec![], + ); assert_eq!(app.total_count(), 0); assert_eq!(app.displayed_count(), 0); @@ -2278,7 +2305,13 @@ mod tests { handle: None, }; - let mut app = App::new_live(capture, Vec::new(), DissectorRegistry::default(), copier); + let mut app = App::new_live( + capture, + Vec::new(), + DissectorRegistry::default(), + copier, + vec![], + ); assert_eq!(app.total_count(), 0); // 2. live_tick should ingest the pre-existing data. diff --git a/src/tui/event.rs b/src/tui/event.rs index 236af98..62b610d 100644 --- a/src/tui/event.rs +++ b/src/tui/event.rs @@ -83,12 +83,16 @@ fn event_loop(terminal: &mut Terminal>, app: &mut A continue; } - // If a filter scan is in progress, drive it in chunks. - if app.filter_progress.is_some() { + // If a filter scan is in progress (sequential or parallel), drive it. + if app.filter_progress.is_some() || app.parallel_scan.is_some() { if event::poll(std::time::Duration::from_millis(0))? && let Event::Key(key) = event::read()? && key.code == crossterm::event::KeyCode::Esc { + if let Some(scan) = &app.parallel_scan { + scan.cancel(); + } + app.parallel_scan = None; app.filter_progress = None; continue; } diff --git a/src/tui/filter_apply.rs b/src/tui/filter_apply.rs index bca0dd0..c6d3447 100644 --- a/src/tui/filter_apply.rs +++ b/src/tui/filter_apply.rs @@ -1,8 +1,11 @@ //! Filter application and chunked filter scanning. +use std::sync::Arc; + use packet_dissector_core::packet::{DissectBuffer, Packet}; use super::app::App; +use super::parallel_scan::{ParallelFilterScan, ScanPoll}; use super::state::FilterProgress; use crate::filter_expr::FilterExpr; @@ -21,7 +24,7 @@ impl App { self.filter.applied = self.filter.buf.input.clone(); if expr.is_none() { - // Empty filter — show all packets immediately. + // Empty filter ��� show all packets immediately. self.filtered_indices = (0..self.indices.len()).collect(); self.summary_cache.clear(); self.packet_list.selected = 0; @@ -31,12 +34,83 @@ impl App { return; } - // Start a chunked filter scan. - self.filter_progress = Some(FilterProgress { - expr, - cursor: 0, - results: Vec::new(), - }); + // Decide whether to use parallel or sequential scanning. + let use_parallel = self.try_start_parallel_scan(expr.as_ref()); + + if !use_parallel { + // Sequential path. + self.parallel_scan = None; + self.filter_progress = Some(FilterProgress { + expr, + cursor: 0, + results: Vec::new(), + }); + } + } + + /// Attempt to start a parallel filter scan. + /// + /// Returns `true` if parallel scanning was started, `false` if the filter + /// is ineligible or parallel scanning could not be initialised (caller + /// should fall back to sequential). + fn try_start_parallel_scan(&mut self, expr: Option<&FilterExpr>) -> bool { + let expr = match expr { + Some(e) => e, + None => return false, + }; + + // Conditions required for parallel scanning: + // 1. Filter is not packet-number-only (those don't need dissection at all). + if expr.is_packet_number_only() { + return false; + } + // 2. Filter expression is parallel-safe (no cross-packet state). + if !expr.is_parallel_safe() { + return false; + } + // 3. Static file mode only (live mode uses a growing file that workers + // cannot safely mmap independently). + let capture_path = match &self.capture_path { + Some(p) => p.clone(), + None => return false, + }; + if self.live_mode.is_some() { + return false; + } + // 4. At least one packet to scan. + if self.indices.is_empty() { + return false; + } + + // 5. Resolve thread count; fall back to sequential on error. + let thread_count = match crate::parallel::resolve_thread_count(None) { + Ok(n) => n, + Err(_) => return false, + }; + // With only one thread the overhead is not worth it. + if thread_count <= 1 { + return false; + } + + // Build index snapshot as an Arc<[PacketIndex]>. + let indices_arc: Arc<[super::state::PacketIndex]> = self.indices.as_slice().into(); + let filter_str = self.filter.buf.input.clone(); + let decode_as_args = self.decode_as_args.clone(); + + match ParallelFilterScan::new( + capture_path, + decode_as_args, + indices_arc, + filter_str, + thread_count, + ) { + Ok(scan) => { + self.filter_progress = None; + self.parallel_scan = Some(scan); + true + } + Err(_) => false, + } } /// Number of packets to scan per tick during filter progress. @@ -44,8 +118,54 @@ impl App { /// Process one chunk of the in-progress filter scan. /// - /// Returns `true` while the scan is still running. + /// Handles both the sequential ([`FilterProgress`]) and parallel + /// ([`ParallelFilterScan`]) paths. Returns `true` while a scan is + /// still running. pub fn filter_tick(&mut self) -> bool { + // Check parallel path first. + if self.parallel_scan.is_some() { + return self.parallel_filter_tick(); + } + self.sequential_filter_tick() + } + + /// Drive one tick of the parallel filter scan. + /// + /// If every worker exited before the scan completed (e.g. the capture file + /// could not be reopened), falls back to a sequential scan of the same + /// filter so the scan always terminates. + fn parallel_filter_tick(&mut self) -> bool { + let scan = match &mut self.parallel_scan { + Some(s) => s, + None => return false, + }; + + match scan.drain() { + ScanPoll::Complete(results) => { + self.parallel_scan = None; + self.finalize_filter(results); + false + } + ScanPoll::Running => true, + ScanPoll::Failed => { + // The applied filter parsed successfully in apply_filter(), so + // re-parsing cannot fail here; `Ok(None)` (empty input) cannot + // occur either because the parallel path requires a non-empty + // expression. + let expr = FilterExpr::parse(&self.filter.applied).ok().flatten(); + self.parallel_scan = None; + self.filter_progress = Some(FilterProgress { + expr, + cursor: 0, + results: Vec::new(), + }); + true + } + } + } + + /// Drive one chunk of the sequential filter scan. + fn sequential_filter_tick(&mut self) -> bool { let total = self.indices.len(); let progress = match &mut self.filter_progress { Some(p) => p, @@ -100,20 +220,47 @@ impl App { Some(fp) => fp.results, None => Vec::new(), }; - self.filtered_indices = results; - self.summary_cache.clear(); - self.packet_list.selected = 0; - self.packet_list.scroll_offset = 0; - self.load_selected(); - self.hex_dump.scroll_offset = 0; + self.finalize_filter(results); return false; } true } + + /// Apply completed filter results and update the UI state. + fn finalize_filter(&mut self, results: Vec) { + self.filtered_indices = results; + self.summary_cache.clear(); + self.packet_list.selected = 0; + self.packet_list.scroll_offset = 0; + self.load_selected(); + self.hex_dump.scroll_offset = 0; + } + + /// Returns the current filter scan fraction (0.0–1.0), or `None` if idle. + /// + /// Used by the UI to display a progress indicator for both sequential and + /// parallel scans. + pub fn filter_fraction(&self) -> Option { + if let Some(scan) = &self.parallel_scan { + return Some(scan.fraction()); + } + if let Some(progress) = &self.filter_progress { + let total = self.indices.len(); + return Some(progress.fraction(total)); + } + None + } } #[cfg(all(test, feature = "tui"))] mod tests { + use std::io::Write; + + use packet_dissector::registry::DissectorRegistry; + + use super::super::app::App; + use super::super::loader; + use super::super::state::CaptureMap; use super::super::test_util::make_test_app; #[test] @@ -156,4 +303,106 @@ mod tests { assert!(app.filter_progress.is_none()); assert!(!app.filter_tick()); } + + /// Drive a running filter scan (parallel or sequential) to completion. + /// + /// Uses a wall-clock deadline rather than a tick count: under heavy test + /// parallelism, worker threads may not be scheduled for a while, so a + /// fixed number of non-blocking ticks is racy. + fn drive_filter_to_completion(app: &mut App) { + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30); + loop { + if app.filter_progress.is_none() && app.parallel_scan.is_none() { + break; + } + app.filter_tick(); + if app.parallel_scan.is_some() { + assert!( + std::time::Instant::now() < deadline, + "filter scan did not complete" + ); + std::thread::sleep(std::time::Duration::from_millis(1)); + } + } + } + + /// Build an App backed by a real temp file path so `capture_path` is set. + fn make_test_app_with_path(n: usize) -> (App, tempfile::NamedTempFile) { + use std::sync::atomic::{AtomicU32, Ordering}; + static COUNTER: AtomicU32 = AtomicU32::new(0); + let c = COUNTER.fetch_add(1, Ordering::Relaxed); + + let pcap = loader::tests::build_pcap_for_test(n); + let mut tmp = tempfile::NamedTempFile::new().unwrap(); + tmp.write_all(&pcap).unwrap(); + tmp.flush().unwrap(); + let _ = c; + + let file = std::fs::File::open(tmp.path()).unwrap(); + let capture = CaptureMap::new(&file).unwrap(); + let indices = loader::build_index(capture.as_bytes()).unwrap(); + + let app = App::new( + capture, + indices, + DissectorRegistry::default(), + tmp.path(), + vec![], + ); + (app, tmp) + } + + #[test] + fn parallel_filter_completes_correctly() { + // Build a larger pcap so there's enough work for parallel to engage + // (assuming physical CPU count > 1 on CI; if only 1 CPU falls back to + // sequential — that path is tested by filter_tick_runs_to_completion). + let (mut app, _tmp) = make_test_app_with_path(100); + + app.filter.buf.input = "udp".into(); + app.filter.buf.cursor = 3; + app.apply_filter(); + + // Drive whichever path was chosen to completion. + drive_filter_to_completion(&mut app); + // All 100 test packets are UDP. + assert_eq!(app.displayed_count(), 100); + } + + #[test] + fn parallel_scan_failure_falls_back_to_sequential() { + // Force the parallel path to fail by pointing capture_path at a file + // that workers cannot open. filter_tick must fall back to the + // sequential scan and still terminate with correct results + // (regression test for an infinite filter_tick loop). + let (mut app, _tmp) = make_test_app_with_path(50); + app.capture_path = Some(std::path::PathBuf::from("/nonexistent/dsct_missing.pcap")); + + app.filter.buf.input = "udp".into(); + app.filter.buf.cursor = 3; + app.apply_filter(); + + drive_filter_to_completion(&mut app); + // All 50 test packets are UDP; the in-memory mmap is still valid. + assert_eq!(app.displayed_count(), 50); + } + + #[test] + fn unsafe_filter_uses_sequential_path() { + let (mut app, _tmp) = make_test_app_with_path(5); + + app.filter.buf.input = "http".into(); + app.filter.buf.cursor = 4; + app.apply_filter(); + + // "http" is not parallel-safe; must use sequential path. + assert!( + app.parallel_scan.is_none(), + "http filter must use sequential path" + ); + assert!( + app.filter_progress.is_some(), + "http filter must set filter_progress" + ); + } } diff --git a/src/tui/keys.rs b/src/tui/keys.rs index d282c7c..e1bdb04 100644 --- a/src/tui/keys.rs +++ b/src/tui/keys.rs @@ -64,6 +64,7 @@ impl App { // Ignore mouse during overlays. if self.stream_view.is_some() || self.filter_progress.is_some() + || self.parallel_scan.is_some() || self.stream_build_progress.is_some() { return; diff --git a/src/tui/mod.rs b/src/tui/mod.rs index bad06d9..a863971 100644 --- a/src/tui/mod.rs +++ b/src/tui/mod.rs @@ -20,6 +20,7 @@ mod live; #[doc(hidden)] pub mod loader; mod owned_packet; +mod parallel_scan; mod state; mod stats_collect; mod stream; @@ -68,7 +69,7 @@ pub fn run(file: PathBuf, decode_as_args: Vec) -> Result<()> { // Start the TUI immediately with an empty index; packets will appear // incrementally as the background thread delivers results. let indices = Vec::new(); - let mut app = app::App::new(capture, indices, registry, &file); + let mut app = app::App::new(capture, indices, registry, &file, decode_as_args); app.bg_indexer = Some(bg_indexer); let mut terminal = event::init_terminal()?; @@ -136,7 +137,7 @@ pub fn run_live(decode_as_args: Vec) -> Result<()> { let capture = state::CaptureMap::new_live(file)?; let indices = Vec::new(); - let app = app::App::new_live(capture, indices, registry, copier); + let app = app::App::new_live(capture, indices, registry, copier, decode_as_args); event::run_event_loop(&mut terminal, app) })(); diff --git a/src/tui/parallel_scan.rs b/src/tui/parallel_scan.rs new file mode 100644 index 0000000..a874383 --- /dev/null +++ b/src/tui/parallel_scan.rs @@ -0,0 +1,474 @@ +//! Parallel filter scan using per-worker capture file handles. +//! +//! Splits the packet index into chunks and evaluates the filter expression on +//! each chunk concurrently. Each worker opens the capture file independently +//! and creates its own [`DissectorRegistry`], avoiding shared mutable state. + +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc; + +use packet_dissector::registry::DissectorRegistry; +use packet_dissector_core::packet::{DissectBuffer, Packet}; + +use crate::filter_expr::FilterExpr; + +use super::state::{CaptureMap, PacketIndex}; + +/// Number of packets processed by each worker per chunk. +const CHUNK_SIZE: usize = 8192; + +/// A result chunk from a worker thread. +/// +/// Contains the chunk index (for ordering) and the matching packet indices +/// within the original snapshot. +type ChunkResult = (usize, Vec); + +/// Result of polling a [`ParallelFilterScan`] via [`ParallelFilterScan::drain`]. +pub(super) enum ScanPoll { + /// Workers are still producing results. + Running, + /// Scan finished; contains the ordered matching packet indices. + Complete(Vec), + /// All workers exited before the scan completed (e.g. the capture file + /// could not be reopened). The caller must fall back to sequential + /// scanning; the parallel scan can never finish. + Failed, +} + +/// A parallel filter scan that distributes work across N worker threads. +/// +/// Each worker opens the capture file independently, builds a fresh +/// [`DissectorRegistry`], and evaluates the filter over its assigned chunks. +/// Results arrive out of order via a channel and are reassembled in order +/// when the scan is complete. +pub(super) struct ParallelFilterScan { + receiver: mpsc::Receiver, + cancel: Arc, + /// Total number of packets being scanned. + pub total: usize, + scanned: Arc, + chunks_total: usize, + chunks_done: usize, + chunk_results: Vec>>, +} + +impl ParallelFilterScan { + /// Start a parallel filter scan. + /// + /// Spawns `thread_count` worker threads. Each worker opens `file_path`, + /// builds a [`DissectorRegistry`] configured with `decode_as_args`, parses + /// the filter string, and scans its assigned chunks. + /// + /// Returns `Err` if the first worker fails to open the capture file. + pub fn new( + file_path: PathBuf, + decode_as_args: Vec, + indices: Arc<[PacketIndex]>, + filter_str: String, + thread_count: usize, + ) -> std::io::Result { + let total = indices.len(); + let chunks_total = total.div_ceil(CHUNK_SIZE); + + let (tx, rx) = mpsc::channel::(); + let cancel = Arc::new(AtomicBool::new(false)); + let scanned = Arc::new(AtomicUsize::new(0)); + + // Work-stealing cursor: the next chunk index to process. + let next_chunk = Arc::new(AtomicUsize::new(0)); + + for worker_id in 0..thread_count { + let tx = tx.clone(); + let cancel = Arc::clone(&cancel); + let scanned = Arc::clone(&scanned); + let next_chunk = Arc::clone(&next_chunk); + let indices = Arc::clone(&indices); + let file_path = file_path.clone(); + let decode_as_args = decode_as_args.clone(); + let filter_str = filter_str.clone(); + + std::thread::Builder::new() + .name(format!("filter-worker-{worker_id}")) + .spawn(move || { + worker_thread(WorkerContext { + file_path, + decode_as_args, + indices, + filter_str, + next_chunk, + cancel, + scanned, + tx, + chunks_total, + }); + }) + .map_err(|e| std::io::Error::other(e.to_string()))?; + } + + // Drop our copy of the sender; workers hold theirs. + drop(tx); + + Ok(Self { + receiver: rx, + cancel, + total, + scanned, + chunks_total, + chunks_done: 0, + chunk_results: vec![None; chunks_total], + }) + } + + /// Progress fraction in `0.0..=1.0`. + pub fn fraction(&self) -> f64 { + if self.total == 0 { + return 1.0; + } + let done = self.scanned.load(Ordering::Relaxed); + (done as f64 / self.total as f64).min(1.0) + } + + /// Drain available results non-blockingly. + /// + /// Returns [`ScanPoll::Complete`] with the ordered `filtered_indices` when + /// the scan is complete, [`ScanPoll::Running`] while workers are still + /// producing results, or [`ScanPoll::Failed`] when every worker exited + /// (channel disconnected) before all chunks were delivered — for example + /// because the capture file could not be reopened. + pub fn drain(&mut self) -> ScanPoll { + // Collect all currently available results. + let mut disconnected = false; + loop { + match self.receiver.try_recv() { + Ok((chunk_id, matches)) => { + if chunk_id < self.chunk_results.len() { + self.chunk_results[chunk_id] = Some(matches); + self.chunks_done += 1; + } + } + Err(mpsc::TryRecvError::Empty) => break, + Err(mpsc::TryRecvError::Disconnected) => { + // All senders dropped: every worker has exited. Any + // results sent before the disconnect have already been + // received above. + disconnected = true; + break; + } + } + } + + if self.chunks_done >= self.chunks_total { + // All chunks received — concatenate in order. + let mut result = Vec::new(); + for matches in self.chunk_results.iter().flatten() { + result.extend_from_slice(matches); + } + ScanPoll::Complete(result) + } else if disconnected { + // Workers are gone but chunks are missing — the scan can never + // complete. Signal the caller to fall back to sequential scanning. + ScanPoll::Failed + } else { + ScanPoll::Running + } + } + + /// Signal worker threads to stop processing. + pub fn cancel(&self) { + self.cancel.store(true, Ordering::Release); + } +} + +impl Drop for ParallelFilterScan { + fn drop(&mut self) { + self.cancel.store(true, Ordering::Release); + } +} + +/// Shared context passed to each worker thread. +struct WorkerContext { + file_path: PathBuf, + decode_as_args: Vec, + indices: Arc<[PacketIndex]>, + filter_str: String, + next_chunk: Arc, + cancel: Arc, + scanned: Arc, + tx: mpsc::Sender, + chunks_total: usize, +} + +/// Entry point for a single worker thread. +fn worker_thread(ctx: WorkerContext) { + // Open an independent file handle and mmap for this worker. + let file = match std::fs::File::open(&ctx.file_path) { + Ok(f) => f, + Err(_) => return, + }; + let capture = match CaptureMap::new(&file) { + Ok(c) => c, + Err(_) => return, + }; + + // Build an independent registry for this worker. + let mut registry = DissectorRegistry::default(); + if crate::decode_as::parse_and_apply(&mut registry, &ctx.decode_as_args).is_err() { + return; + } + + // Parse the filter expression. + let expr = match FilterExpr::parse(&ctx.filter_str) { + Ok(Some(e)) => e, + _ => return, + }; + + let total = ctx.indices.len(); + let mut dissect_buf = DissectBuffer::new(); + + loop { + if ctx.cancel.load(Ordering::Acquire) { + return; + } + + let chunk_id = ctx.next_chunk.fetch_add(1, Ordering::AcqRel); + if chunk_id >= ctx.chunks_total { + return; + } + + let start = chunk_id * CHUNK_SIZE; + let end = (start + CHUNK_SIZE).min(total); + let mut matches = Vec::new(); + + for i in start..end { + let number = (i as u64) + 1; + let index = &ctx.indices[i]; + if let Some(data) = capture.packet_data(index) { + let buf = dissect_buf.clear_into(); + if registry + .dissect_with_link_type(data, index.link_type as u32, buf) + .is_ok() + { + let packet = Packet::new(buf, data); + if expr.matches_with_number(&packet, number) { + matches.push(i); + } + } + } + } + + ctx.scanned.fetch_add(end - start, Ordering::Release); + + if ctx.tx.send((chunk_id, matches)).is_err() { + return; + } + } +} + +#[cfg(all(test, feature = "tui"))] +mod tests { + use super::*; + use std::io::Write; + + use packet_dissector::registry::DissectorRegistry; + + use super::super::loader; + use super::super::state::CaptureMap; + + /// Build a pcap with `udp_count` UDP packets then `tcp_count` TCP packets. + fn build_mixed_pcap_for_test(udp_count: usize, tcp_count: usize) -> Vec { + let mut pcap_buf = Vec::new(); + // Global header: magic, version 2.4, Ethernet link type + pcap_buf.extend_from_slice(&0xA1B2C3D4u32.to_le_bytes()); + pcap_buf.extend_from_slice(&2u16.to_le_bytes()); + pcap_buf.extend_from_slice(&4u16.to_le_bytes()); + pcap_buf.extend_from_slice(&0i32.to_le_bytes()); + pcap_buf.extend_from_slice(&0u32.to_le_bytes()); + pcap_buf.extend_from_slice(&65535u32.to_le_bytes()); + pcap_buf.extend_from_slice(&1u32.to_le_bytes()); // Ethernet + + // Minimal Ethernet+IPv4+UDP packet (42 bytes) + let udp_pkt: &[u8] = &[ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x08, 0x00, + 0x45, 0x00, 0x00, 0x1C, 0x00, 0x00, 0x00, 0x00, 0x40, 0x11, 0x00, 0x00, 0x0A, 0x00, + 0x00, 0x01, 0x0A, 0x00, 0x00, 0x02, 0x10, 0x00, 0x10, 0x01, 0x00, 0x08, 0x00, 0x00, + ]; + + // Minimal Ethernet+IPv4+TCP packet (54 bytes) + let tcp_pkt: &[u8] = &[ + // Ethernet + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x08, 0x00, + // IPv4 + 0x45, 0x00, 0x00, 0x28, 0x00, 0x00, 0x40, 0x00, 0x40, 0x06, 0x00, 0x00, 0x0a, 0x00, + 0x00, 0x01, 0x0a, 0x00, 0x00, 0x02, + // TCP: src=80, dst=12345, seq/ack=0, flags=SYN + 0x00, 0x50, 0x30, 0x39, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x50, 0x02, + 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, + ]; + + let pkt_count = udp_count + tcp_count; + for i in 0..pkt_count { + let ts_sec = (i / 1000) as u32; + let ts_usec = ((i % 1000) * 1000) as u32; + let pkt = if i < udp_count { udp_pkt } else { tcp_pkt }; + pcap_buf.extend_from_slice(&ts_sec.to_le_bytes()); + pcap_buf.extend_from_slice(&ts_usec.to_le_bytes()); + pcap_buf.extend_from_slice(&(pkt.len() as u32).to_le_bytes()); + pcap_buf.extend_from_slice(&(pkt.len() as u32).to_le_bytes()); + pcap_buf.extend_from_slice(pkt); + } + pcap_buf + } + + fn write_temp_pcap(data: &[u8]) -> (tempfile::NamedTempFile, CaptureMap, Vec) { + let mut tmp = tempfile::NamedTempFile::new().unwrap(); + tmp.write_all(data).unwrap(); + tmp.flush().unwrap(); + let file = std::fs::File::open(tmp.path()).unwrap(); + let capture = CaptureMap::new(&file).unwrap(); + let indices = loader::build_index(capture.as_bytes()).unwrap(); + (tmp, capture, indices) + } + + #[test] + fn parallel_scan_udp_filter_matches_sequential() { + let pcap = build_mixed_pcap_for_test(10, 5); + let (tmp, capture, indices) = write_temp_pcap(&pcap); + let indices_arc: Arc<[PacketIndex]> = indices.into(); + + // Sequential reference result. + let mut seq_results = Vec::new(); + { + let registry = DissectorRegistry::default(); + let mut dissect_buf = DissectBuffer::new(); + let expr = FilterExpr::parse("udp").unwrap().unwrap(); + for (i, index) in indices_arc.iter().enumerate() { + if let Some(data) = capture.packet_data(index) { + let buf = dissect_buf.clear_into(); + if registry + .dissect_with_link_type(data, index.link_type as u32, buf) + .is_ok() + { + let packet = Packet::new(buf, data); + if expr.matches_with_number(&packet, (i as u64) + 1) { + seq_results.push(i); + } + } + } + } + } + + // Parallel result. + let mut scan = ParallelFilterScan::new( + tmp.path().to_path_buf(), + vec![], + indices_arc, + "udp".to_string(), + 2, + ) + .unwrap(); + + let par_results = loop { + match scan.drain() { + ScanPoll::Complete(r) => break r, + ScanPoll::Failed => panic!("parallel scan failed"), + ScanPoll::Running => std::thread::sleep(std::time::Duration::from_millis(10)), + } + }; + + assert_eq!( + par_results, seq_results, + "parallel and sequential must agree" + ); + assert_eq!(par_results.len(), 10, "expected 10 UDP packets"); + } + + #[test] + fn parallel_scan_all_match_ipv4_src() { + let pcap = loader::tests::build_pcap_for_test(20); + let (tmp, _capture, indices) = write_temp_pcap(&pcap); + let indices_arc: Arc<[PacketIndex]> = indices.into(); + + let mut scan = ParallelFilterScan::new( + tmp.path().to_path_buf(), + vec![], + indices_arc, + "ipv4.src = '10.0.0.1'".to_string(), + 2, + ) + .unwrap(); + + let results = loop { + match scan.drain() { + ScanPoll::Complete(r) => break r, + ScanPoll::Failed => panic!("parallel scan failed"), + ScanPoll::Running => std::thread::sleep(std::time::Duration::from_millis(10)), + } + }; + + assert_eq!(results.len(), 20, "all 20 packets should match ipv4.src"); + } + + #[test] + fn parallel_scan_fraction_advances() { + let pcap = loader::tests::build_pcap_for_test(100); + let (tmp, _capture, indices) = write_temp_pcap(&pcap); + let indices_arc: Arc<[PacketIndex]> = indices.into(); + + let mut scan = ParallelFilterScan::new( + tmp.path().to_path_buf(), + vec![], + indices_arc, + "udp".to_string(), + 1, + ) + .unwrap(); + + // Drive to completion. + loop { + match scan.drain() { + ScanPoll::Complete(_) => break, + ScanPoll::Failed => panic!("parallel scan failed"), + ScanPoll::Running => std::thread::sleep(std::time::Duration::from_millis(5)), + } + } + + let frac = scan.fraction(); + assert!((0.0..=1.0).contains(&frac), "fraction should be in [0,1]"); + } + + #[test] + fn parallel_scan_failed_when_file_missing() { + // Workers cannot open the capture file: every worker exits without + // delivering a chunk. drain() must report Failed instead of running + // forever (regression test for an infinite filter_tick loop). + let pcap = loader::tests::build_pcap_for_test(20); + let (_tmp, _capture, indices) = write_temp_pcap(&pcap); + let indices_arc: Arc<[PacketIndex]> = indices.into(); + + let mut scan = ParallelFilterScan::new( + std::path::PathBuf::from("/nonexistent/dsct_missing.pcap"), + vec![], + indices_arc, + "udp".to_string(), + 2, + ) + .unwrap(); + + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); + loop { + match scan.drain() { + ScanPoll::Failed => break, + ScanPoll::Complete(_) => panic!("scan must not complete without workers"), + ScanPoll::Running => { + assert!( + std::time::Instant::now() < deadline, + "drain() never reported Failed" + ); + std::thread::sleep(std::time::Duration::from_millis(5)); + } + } + } + } +} diff --git a/src/tui/test_util.rs b/src/tui/test_util.rs index f4f2d49..265b880 100644 --- a/src/tui/test_util.rs +++ b/src/tui/test_util.rs @@ -42,6 +42,7 @@ pub(super) fn make_test_app(n: usize) -> App { indices, DissectorRegistry::default(), std::path::Path::new("test.pcap"), + vec![], ); let _ = std::fs::remove_file(&path); app diff --git a/src/tui/ui.rs b/src/tui/ui.rs index 23018ff..06fbe2b 100644 --- a/src/tui/ui.rs +++ b/src/tui/ui.rs @@ -110,7 +110,7 @@ pub fn render(f: &mut Frame, app: &mut App) { ); } - // Overlay: centered progress bar while filter is scanning. + // Overlay: centered progress bar while filter is scanning (sequential or parallel). if let Some(progress) = &app.filter_progress { let total = app.indices.len(); render_progress_overlay( @@ -120,6 +120,10 @@ pub fn render(f: &mut Frame, app: &mut App) { total, progress.fraction(total), ); + } else if let Some(frac) = app.filter_fraction() { + let total = app.indices.len(); + let scanned = (frac * total as f64) as usize; + render_progress_overlay(f, "Filtering", scanned, total, frac); } // Overlay: help screen. From dfab2202eb2b89098ad335983cd37ce7b4d14af0 Mon Sep 17 00:00:00 2001 From: Yuya Kusakabe Date: Fri, 12 Jun 2026 02:35:22 +0000 Subject: [PATCH 4/6] feat(read): parallelize filter evaluation for file input Add --threads flag and DSCT_THREADS env var to dsct read. When the filter is parallel-safe (stateless L2-L4 protocols) and input is a file, packets are distributed round-robin to N worker threads each with their own DissectorRegistry. The merger re-assembles results in strict round-robin order so output is byte-identical to the sequential path. Reassembly-dependent filters (HTTP, DNS, TLS, tcp.stream_id, etc.) and stdin input always fall back to sequential processing. - src/parallel_read.rs: new library module with reader/worker/merger pipeline - src/field_config.rs: derive Clone on FieldConfig and internal types - src/main.rs: add --threads to ReadOptions; gate parallel path on is_parallel_safe + esp_sa_args.is_empty + resolved_threads > 1 - tests/cli_parallel_read_test.rs: TDD integration tests (12 tests) - README.md: document --threads and DSCT_THREADS --- README.md | 14 + src/field_config.rs | 6 +- src/lib.rs | 2 + src/main.rs | 85 +++++- src/parallel_read.rs | 474 ++++++++++++++++++++++++++++++++ tests/cli_parallel_read_test.rs | 445 ++++++++++++++++++++++++++++++ 6 files changed, 1018 insertions(+), 8 deletions(-) create mode 100644 src/parallel_read.rs create mode 100644 tests/cli_parallel_read_test.rs diff --git a/README.md b/README.md index 1ca1144..231b79a 100644 --- a/README.md +++ b/README.md @@ -157,6 +157,19 @@ Include the original packet bytes (link-layer included) as a hex string under dsct read capture.pcap --raw-bytes --count 1 ``` +Speed up filter evaluation on large files with `--threads`: + +```bash +dsct read capture.pcap -f "udp" --no-limit --threads 4 +DSCT_THREADS=4 dsct read capture.pcap -f "tcp.dst_port > 1024" --no-limit +``` + +`--threads` distributes dissection and filter evaluation across N worker +threads when the filter is stateless (L2–L4 protocols: `tcp`, `udp`, `ipv4`, +etc.). Filters that require TCP reassembly such as `http`, `dns`, `tls`, and +`tcp.stream_id` automatically fall back to sequential processing regardless of +`--threads`. Stdin input always uses the sequential path. + Inspect available fields and schemas: ```bash @@ -258,6 +271,7 @@ Resource limits can be tuned via environment variables: | `DSCT_MCP_TIMEOUT` | 300 | Timeout per tool execution in seconds | | `DSCT_MCP_WRITE_BUFFER_SIZE` | 65536 | Stdout write buffer size in bytes | | `DSCT_MCP_MAX_FILE_SIZE` | 10737418240 | Maximum capture file size in bytes | +| `DSCT_THREADS` | physical CPU count | Worker threads for `dsct read --filter` (see `--threads`) | ## Output diff --git a/src/field_config.rs b/src/field_config.rs index 4e3e81e..f692e3a 100644 --- a/src/field_config.rs +++ b/src/field_config.rs @@ -27,13 +27,13 @@ const DEFAULT_CONFIG: &str = include_str!("default_fields.toml"); /// /// When a container field is included but has no nested patterns defined, /// all of its sub-fields are shown (e.g., SRv6 `segments_structure`). -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FieldConfig { protocols: HashMap, } /// Per-protocol field filter with top-level and nested patterns. -#[derive(Debug)] +#[derive(Debug, Clone)] struct FieldFilter { /// Patterns for top-level field names (no dots). top_level: PatternSet, @@ -43,7 +43,7 @@ struct FieldFilter { } /// A set of patterns for matching field names. -#[derive(Debug)] +#[derive(Debug, Clone)] struct PatternSet { /// When `true`, all names match (used for `"parent.*"` patterns). match_all: bool, diff --git a/src/lib.rs b/src/lib.rs index 0e6ca46..aef53e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ pub mod output; #[doc(hidden)] pub mod parallel; #[doc(hidden)] +pub mod parallel_read; +#[doc(hidden)] pub mod schema; #[doc(hidden)] pub mod serialize; diff --git a/src/main.rs b/src/main.rs index fd37565..f9b125f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,8 @@ use dsct::filter_expr; use dsct::input; use dsct::limits; use dsct::mcp; +use dsct::parallel; +use dsct::parallel_read; use dsct::schema; use dsct::serialize; use dsct::stats; @@ -116,6 +118,15 @@ struct ReadOptions { /// lowercase hex string under the `raw_bytes` field of each record. #[arg(long)] raw_bytes: bool, + + /// Number of worker threads for parallel filter evaluation. + /// Default: physical CPU count. Honoured only for file input with a + /// parallel-safe `--filter`. The `DSCT_THREADS` environment variable + /// is also honoured (flag takes precedence). Filters that require TCP + /// reassembly (HTTP, DNS-over-TCP, TLS, `tcp.stream_id`, etc.) and + /// stdin input always fall back to sequential processing. + #[arg(long)] + threads: Option, } /// Options for the `dsct stats` command. @@ -309,6 +320,7 @@ fn cmd_read(opts: ReadOptions) -> Result<()> { decode_as: decode_as_args, esp_sa: esp_sa_args, raw_bytes, + threads, } = opts; // Resolve effective count: explicit --count, default limit, or unlimited. let (count, is_default_limit) = if no_limit { @@ -326,10 +338,6 @@ fn cmd_read(opts: ReadOptions) -> Result<()> { Some(FieldConfig::default_config()?) }; - let mut registry = DissectorRegistry::default(); - decode_as::parse_and_apply(&mut registry, &decode_as_args).invalid_argument()?; - esp_sa::parse_and_apply(®istry, &esp_sa_args).invalid_argument()?; - let sample_rate = match sample_rate { Some(0) => { return Err(DsctError::invalid_argument( @@ -346,7 +354,6 @@ fn cmd_read(opts: ReadOptions) -> Result<()> { .transpose() .context("invalid --packet-number expression") .invalid_argument()?; - let pn_max = pn_filter.as_ref().and_then(PacketNumberFilter::max); // Parse filter expression let filter_expr = match filter_str.as_deref() { @@ -355,6 +362,74 @@ fn cmd_read(opts: ReadOptions) -> Result<()> { }; let is_stdin = file.as_os_str() == "-"; + + // Validate --decode-as / --esp-sa up front so both the parallel and the + // sequential path reject malformed arguments with a structured error. + // The sequential path reuses this registry; parallel workers build their + // own from the already-validated argument strings. + let mut registry = DissectorRegistry::default(); + decode_as::parse_and_apply(&mut registry, &decode_as_args).invalid_argument()?; + esp_sa::parse_and_apply(®istry, &esp_sa_args).invalid_argument()?; + + // Resolve thread count — always validate when a filter is present and input + // is a file (so that DSCT_THREADS=abc / --threads 0 errors deterministically). + let resolved_threads = if !is_stdin && filter_str.is_some() { + parallel::resolve_thread_count(threads).map_err(DsctError::invalid_argument)? + } else { + 1 // not consulted; sequential path always used + }; + + // Determine whether the parallel path is eligible. + let use_parallel = !is_stdin + && resolved_threads > 1 + && filter_expr + .as_ref() + .is_some_and(|e| !e.is_packet_number_only() && e.is_parallel_safe()) + && esp_sa_args.is_empty(); + + if use_parallel { + // ------------------------------------------------------------------ + // Parallel path + // ------------------------------------------------------------------ + let filter_str_ref = filter_str.as_deref().unwrap_or(""); + let stdout = io::stdout(); + let mut writer = io::BufWriter::new(stdout.lock()); + let start_time = Instant::now(); + + let outcome = parallel_read::run( + ¶llel_read::ParallelReadOptions { + path: &file, + filter_str: filter_str_ref, + decode_as_args: &decode_as_args, + threads: resolved_threads, + sample_rate, + offset, + count, + pn_filter, + field_config: field_config.as_ref(), + raw_bytes, + progress_interval: progress.unwrap_or(0), + }, + &mut writer, + &mut |number, message| emit_warning(number, message), + &mut |packets_processed, packets_written| { + emit_progress(packets_processed, packets_written, &start_time); + }, + )?; + + if is_default_limit && outcome.truncated_by_limit { + emit_truncation_warning(limits::DEFAULT_PACKET_COUNT); + } + + writer.flush()?; + return Ok(()); + } + + // ------------------------------------------------------------------------ + // Sequential path (unchanged from before) + // ------------------------------------------------------------------------ + let pn_max = pn_filter.as_ref().and_then(PacketNumberFilter::max); + let stdout = io::stdout(); let mut writer: Box = if is_stdin { Box::new(io::LineWriter::new(stdout.lock())) diff --git a/src/parallel_read.rs b/src/parallel_read.rs new file mode 100644 index 0000000..59dca8d --- /dev/null +++ b/src/parallel_read.rs @@ -0,0 +1,474 @@ +//! Pipeline-parallel filter evaluation for the `dsct read` command. +//! +//! When the filter expression is +//! [`parallel-safe`](crate::filter_expr::FilterExpr::is_parallel_safe) +//! and input is a file (not stdin), packets are distributed across N worker +//! threads for dissection and filtering. The merger re-assembles results in +//! original packet order so the output is byte-identical to the sequential +//! path. +//! +//! # Architecture +//! +//! ```text +//! Reader thread ──batch──> Worker 0 channel ──results──> Merger +//! ──batch──> Worker 1 channel ──results──> (calling thread) +//! ──batch──> Worker N-1 channel ──results──> +//! ``` +//! +//! 1. **Reader** (dedicated thread): reads packets with +//! [`CaptureReader::for_each_packet`], applies the packet-number pre-filter +//! and early-exit, copies bytes into small arena batches, and sends batches +//! round-robin to per-worker bounded channels. A shared [`AtomicBool`] stop +//! flag lets the merger abort the reader when the count limit is reached. +//! +//! 2. **Workers** (N threads): each builds its own [`DissectorRegistry`] plus +//! optional `decode-as` overrides, parses its own copy of the filter string, +//! and reuses one [`DissectBuffer`]. For each packet it dissects, evaluates +//! the filter, and on match serialises via [`write_packet_json`] into bytes. +//! Results are sent in the same batch order they were received. +//! +//! 3. **Merger** (calling thread): receives result batches strictly in +//! round-robin worker order to preserve global packet order. Applies +//! `sample_rate`, `offset`, and `count` on the ordered match stream, writes +//! matched JSON to the supplied writer, and calls the progress and warning +//! callbacks. On reaching the count limit it sets the stop flag and drains +//! all threads cleanly before returning. +//! +//! # Batch format +//! +//! Each batch is a `Vec<(PacketMeta, Vec)>` — at most 256 packets or +//! 1 MiB of raw packet data, whichever comes first. Results for each batch +//! are a `Vec` of per-packet entries (matches or warnings). +//! +//! # Robustness +//! +//! * No `unwrap` / `expect` in production code paths. +//! * Worker threads exit cleanly when their input channel is disconnected +//! (reader dropped the sender). +//! * The merger handles worker channel disconnection by stopping cleanly. +//! * All threads are joined before returning so output is fully flushed. + +use std::io; +use std::ops::ControlFlow; +use std::path::Path; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc; + +use packet_dissector::registry::DissectorRegistry; +use packet_dissector_core::packet::DissectBuffer; + +use crate::decode_as; +use crate::error::{DsctError, Result}; +use crate::field_config::FieldConfig; +use crate::filter::PacketNumberFilter; +use crate::filter_expr::FilterExpr; +use crate::input::CaptureReader; +use crate::serialize::{PacketMeta, write_packet_json}; + +/// Maximum packets per batch sent to a worker. +const BATCH_PACKETS: usize = 256; + +/// Maximum raw bytes per batch (1 MiB). +const BATCH_BYTES: usize = 1 << 20; + +/// Bounded channel capacity per worker (number of pending batches). +const CHAN_CAPACITY: usize = 4; + +/// Options for the parallel read engine. +/// +/// All fields are derived from the `dsct read` CLI arguments and passed +/// through without mutation. +pub struct ParallelReadOptions<'a> { + /// Path to the capture file (must not be stdin `"-"`). + pub path: &'a Path, + /// Raw filter string (already verified as parallel-safe by the caller). + pub filter_str: &'a str, + /// `--decode-as` arguments to apply to each per-worker registry. + pub decode_as_args: &'a [String], + /// Number of worker threads (must be ≥ 2; caller enforces this). + pub threads: usize, + /// Emit every Nth filter-matching result; 1 = no sampling. + pub sample_rate: u64, + /// Skip the first `offset` filter-matching results. + pub offset: u64, + /// Stop after emitting this many results (`None` = unlimited). + pub count: Option, + /// Optional packet-number pre-filter (applied before dissection). + pub pn_filter: Option, + /// Field visibility configuration for JSON output; `None` = verbose. + pub field_config: Option<&'a FieldConfig>, + /// When `true`, include `raw_bytes` hex in each output record. + pub raw_bytes: bool, + /// Emit a progress callback every this many packets processed (0 = disabled). + pub progress_interval: u64, +} + +/// Outcome reported after a successful run. +/// +/// Provides enough information for the caller to emit a truncation warning. +pub struct ReadOutcome { + /// Total packets read from the file (after packet-number filtering). + /// Note: because workers do not report non-matching packets back to the + /// merger, this count reflects only matched-or-warned packets. It is + /// provided primarily for truncation detection. + pub packets_processed: u64, + /// Number of JSON records written to the writer. + pub packets_written: u64, + /// `true` if the run stopped because the count limit was reached (not EOF). + pub truncated_by_limit: bool, +} + +// --------------------------------------------------------------------------- +// Internal message types +// --------------------------------------------------------------------------- + +/// One batch of raw packets sent from the reader to a worker. +type InputBatch = Vec<(PacketMeta, Vec)>; + +/// One result entry produced by a worker. +enum WorkerEntry { + /// A packet that matched the filter; contains serialised JSON bytes + /// (no trailing newline — the merger adds it). + Match(Vec), + /// Dissection failed for this packet number. + Warning { number: u64, message: String }, +} + +/// One batch of results sent from a worker back to the merger. +type OutputBatch = Vec; + +// --------------------------------------------------------------------------- +// Public entry point +// --------------------------------------------------------------------------- + +/// Run parallel filter evaluation and write matching JSONL records to `writer`. +/// +/// The caller is responsible for ensuring that: +/// - `opts.path` is a regular file (not `"-"`). +/// - The filter has already been parsed and verified as +/// [`parallel_safe`](FilterExpr::is_parallel_safe). +/// - `opts.threads >= 2`. +/// +/// Callbacks: +/// - `warn(packet_number, message)` — called in packet order for per-packet +/// dissection warnings. +/// - `progress(packets_processed, packets_written)` — called approximately +/// every `opts.progress_interval` output records (at batch granularity). +pub fn run( + opts: &ParallelReadOptions<'_>, + writer: &mut W, + warn: &mut dyn FnMut(u64, &str), + progress: &mut dyn FnMut(u64, u64), +) -> Result { + let n = opts.threads; + + // Build per-worker input/output channels. + let mut input_txs: Vec> = Vec::with_capacity(n); + let mut input_rxs: Vec> = Vec::with_capacity(n); + let mut output_txs: Vec> = Vec::with_capacity(n); + let mut output_rxs: Vec> = Vec::with_capacity(n); + + for _ in 0..n { + let (itx, irx) = mpsc::sync_channel::(CHAN_CAPACITY); + let (otx, orx) = mpsc::sync_channel::(CHAN_CAPACITY); + input_txs.push(itx); + input_rxs.push(irx); + output_txs.push(otx); + output_rxs.push(orx); + } + + // Shared stop flag: set by merger when count limit reached. + let stop = Arc::new(AtomicBool::new(false)); + + // ----------------------------------------------------------------------- + // Spawn N workers + // ----------------------------------------------------------------------- + let mut worker_handles = Vec::with_capacity(n); + + let filter_str = opts.filter_str.to_owned(); + let decode_as_args_owned: Vec = opts.decode_as_args.to_vec(); + let field_config_clone: Option = opts.field_config.cloned(); + let raw_bytes = opts.raw_bytes; + + // output_txs will be drained into workers; move them one by one. + let mut output_txs_iter = output_txs.into_iter(); + + for irx in input_rxs { + let otx = output_txs_iter + .next() + .ok_or_else(|| DsctError::msg("internal: output_txs exhausted"))?; + let fs = filter_str.clone(); + let da = decode_as_args_owned.clone(); + let fc = field_config_clone.clone(); + + let handle = std::thread::Builder::new() + .name("dsct-worker".into()) + .spawn(move || worker_fn(irx, otx, fs, da, fc, raw_bytes)) + .map_err(|e| DsctError::msg(format!("failed to spawn worker thread: {e}")))?; + worker_handles.push(handle); + } + + // ----------------------------------------------------------------------- + // Spawn reader thread + // ----------------------------------------------------------------------- + let path_owned = opts.path.to_path_buf(); + let pn_filter_clone = opts.pn_filter.clone(); + let stop_reader = Arc::clone(&stop); + + let reader_handle = std::thread::Builder::new() + .name("dsct-reader".into()) + .spawn(move || reader_fn(path_owned, pn_filter_clone, input_txs, stop_reader)) + .map_err(|e| DsctError::msg(format!("failed to spawn reader thread: {e}")))?; + + // ----------------------------------------------------------------------- + // Merger (runs on the calling thread) + // ----------------------------------------------------------------------- + let outcome = merger_fn(opts, &mut output_rxs, writer, warn, progress, &stop); + + // ----------------------------------------------------------------------- + // Join all threads — must happen even on error to avoid resource leaks. + // Dropping output_rxs causes workers to exit their send loops, which drains + // their input channels, unblocking the reader. + // ----------------------------------------------------------------------- + drop(output_rxs); + + let reader_result = reader_handle + .join() + .map_err(|_| DsctError::msg("reader thread panicked"))?; + + // A panicked worker silently truncates the merged stream (its output + // channel just disconnects), so surface it as an explicit error instead + // of reporting partial output as success. + let mut worker_panicked = false; + for handle in worker_handles { + if handle.join().is_err() { + worker_panicked = true; + } + } + + let outcome = outcome?; + reader_result?; + if worker_panicked { + return Err(DsctError::msg( + "a worker thread panicked; output may be incomplete", + )); + } + Ok(outcome) +} + +// --------------------------------------------------------------------------- +// Reader thread +// --------------------------------------------------------------------------- + +fn reader_fn( + path: std::path::PathBuf, + pn_filter: Option, + input_txs: Vec>, + stop: Arc, +) -> Result<()> { + let reader = + CaptureReader::open(&path).map_err(|e| e.context("failed to open capture file"))?; + let n = input_txs.len(); + let pn_max = pn_filter.as_ref().and_then(PacketNumberFilter::max); + let mut worker_idx = 0usize; + let mut current_batch: InputBatch = Vec::with_capacity(BATCH_PACKETS); + let mut current_batch_bytes: usize = 0; + + reader.for_each_packet(|meta, data| { + if stop.load(Ordering::Relaxed) { + return Ok(ControlFlow::Break(())); + } + + // Packet-number pre-filter (mirrors sequential logic exactly). + if let Some(ref pnf) = pn_filter + && !pnf.contains(meta.number) + { + if pn_max.is_some_and(|m| meta.number > m) { + return Ok(ControlFlow::Break(())); + } + return Ok(ControlFlow::Continue(())); + } + + current_batch.push((meta, data.to_vec())); + current_batch_bytes += data.len(); + + if current_batch.len() >= BATCH_PACKETS || current_batch_bytes >= BATCH_BYTES { + let batch = std::mem::replace(&mut current_batch, Vec::with_capacity(BATCH_PACKETS)); + current_batch_bytes = 0; + if input_txs[worker_idx].send(batch).is_err() { + // Worker exited — stop flag should also be set. + return Ok(ControlFlow::Break(())); + } + worker_idx = (worker_idx + 1) % n; + } + + Ok(ControlFlow::Continue(())) + })?; + + // Flush last partial batch. + if !current_batch.is_empty() && !stop.load(Ordering::Relaxed) { + // Ignore send error — receiver may have disconnected if stop was set. + let _ = input_txs[worker_idx].send(current_batch); + } + + // Dropping input_txs closes all worker input channels → workers exit. + Ok(()) +} + +// --------------------------------------------------------------------------- +// Worker thread +// --------------------------------------------------------------------------- + +fn worker_fn( + irx: mpsc::Receiver, + otx: mpsc::SyncSender, + filter_str: String, + decode_as_args: Vec, + field_config: Option, + raw_bytes: bool, +) { + let mut registry = DissectorRegistry::default(); + if decode_as::parse_and_apply(&mut registry, &decode_as_args).is_err() { + // decode-as args were validated before spawning; this should not occur. + return; + } + + let expr: Option = match FilterExpr::parse(&filter_str) { + Ok(e) => e, + Err(_) => return, + }; + + let mut dissect_buf = DissectBuffer::new(); + let mut json_buf: Vec = Vec::with_capacity(4096); + + for batch in &irx { + let mut results: OutputBatch = Vec::with_capacity(batch.len()); + + for (meta, data) in &batch { + let dbuf = dissect_buf.clear_into(); + if let Err(e) = registry.dissect_with_link_type(data, meta.link_type, dbuf) { + results.push(WorkerEntry::Warning { + number: meta.number, + message: format!("{e}"), + }); + continue; + } + let packet = packet_dissector_core::packet::Packet::new(dbuf, data.as_slice()); + + if let Some(ref e) = expr + && !e.matches_with_number(&packet, meta.number) + { + continue; + } + + json_buf.clear(); + if write_packet_json( + &mut json_buf, + meta, + dbuf, + data.as_slice(), + field_config.as_ref(), + raw_bytes, + ) + .is_ok() + { + results.push(WorkerEntry::Match(json_buf.clone())); + } + } + + if otx.send(results).is_err() { + // Merger dropped the receiver (count limit reached); exit cleanly. + break; + } + } +} + +// --------------------------------------------------------------------------- +// Merger (runs on calling thread) +// --------------------------------------------------------------------------- + +fn merger_fn( + opts: &ParallelReadOptions<'_>, + output_rxs: &mut [mpsc::Receiver], + writer: &mut W, + warn: &mut dyn FnMut(u64, &str), + progress: &mut dyn FnMut(u64, u64), + stop: &AtomicBool, +) -> Result { + let n = output_rxs.len(); + let sample_rate = opts.sample_rate; + let offset = opts.offset; + let count = opts.count; + let progress_interval = opts.progress_interval; + + let mut packets_processed = 0u64; + let mut packets_written = 0u64; + let mut filter_matches = 0u64; + let mut results_matched = 0u64; + let mut truncated_by_limit = false; + let mut worker_idx = 0usize; + + // Receive from workers in strict round-robin order (same order the reader + // sent batches to them), preserving global packet order. + 'outer: loop { + match output_rxs[worker_idx].recv() { + Err(_) => { + // Channel closed: either stop flag is set (expected EOF / limit) + // or all workers finished (EOF). Either way, we are done. + break 'outer; + } + Ok(batch) => { + // Count packets represented in this batch for progress reporting. + let batch_count = batch.len() as u64; + packets_processed = packets_processed.saturating_add(batch_count); + + for entry in batch { + match entry { + WorkerEntry::Warning { number, message } => { + warn(number, &message); + } + WorkerEntry::Match(json_bytes) => { + filter_matches += 1; + if sample_rate > 1 && !(filter_matches - 1).is_multiple_of(sample_rate) + { + continue; + } + results_matched += 1; + if results_matched <= offset { + continue; + } + writer.write_all(&json_bytes)?; + writer.write_all(b"\n")?; + packets_written += 1; + + if let Some(max) = count + && packets_written >= max + { + truncated_by_limit = true; + stop.store(true, Ordering::Relaxed); + break 'outer; + } + } + } + } + + // Progress reporting at batch granularity. + if progress_interval > 0 + && packets_written > 0 + && packets_written.is_multiple_of(progress_interval) + { + progress(packets_processed, packets_written); + } + + worker_idx = (worker_idx + 1) % n; + } + } + } + + Ok(ReadOutcome { + packets_processed, + packets_written, + truncated_by_limit, + }) +} diff --git a/tests/cli_parallel_read_test.rs b/tests/cli_parallel_read_test.rs new file mode 100644 index 0000000..54b78c0 --- /dev/null +++ b/tests/cli_parallel_read_test.rs @@ -0,0 +1,445 @@ +//! Integration tests for `dsct read --threads` (parallel filter evaluation). +//! +//! These tests verify that the parallel path produces byte-identical output to +//! the sequential path, that fallback to sequential happens when required, and +//! that all limit/offset/sample-rate interactions are preserved. + +use assert_cmd::Command; +use std::io::Write; +use tempfile::NamedTempFile; + +// --------------------------------------------------------------------------- +// Pcap generation helpers +// --------------------------------------------------------------------------- + +/// A minimal valid Ethernet + IPv4 + UDP packet (42 bytes). +/// `src_ip` and `dst_ip` are the last octets only (first three = 10.0.0). +fn udp_pkt(src_ip_last: u8, dst_ip_last: u8, src_port: u16, dst_port: u16) -> [u8; 42] { + let mut p = [0u8; 42]; + // Ethernet (14 bytes) + p[0..6].copy_from_slice(&[0xff, 0xff, 0xff, 0xff, 0xff, 0xff]); // dst mac + p[6..12].copy_from_slice(&[0x00, 0x11, 0x22, 0x33, 0x44, 0x55]); // src mac + p[12..14].copy_from_slice(&[0x08, 0x00]); // ethertype IPv4 + // IPv4 header (20 bytes, starts at p[14]) + p[14] = 0x45; // version=4, IHL=5 + p[15] = 0x00; // DSCP/ECN + p[16..18].copy_from_slice(&28u16.to_be_bytes()); // total length (20 IP + 8 UDP) + // p[18..20]: identification = 0 + // p[20..22]: flags+fragment offset = 0 + p[22] = 0x40; // TTL = 64 + p[23] = 0x11; // protocol = 17 (UDP) + // p[24..26]: checksum = 0 (not validated) + p[26] = 10; + p[27] = 0; + p[28] = 0; + p[29] = src_ip_last; // src IP = 10.0.0.src_ip_last + p[30] = 10; + p[31] = 0; + p[32] = 0; + p[33] = dst_ip_last; // dst IP = 10.0.0.dst_ip_last + // UDP header (8 bytes, starts at p[34]) + p[34..36].copy_from_slice(&src_port.to_be_bytes()); + p[36..38].copy_from_slice(&dst_port.to_be_bytes()); + p[38..40].copy_from_slice(&8u16.to_be_bytes()); // UDP length + // p[40..42]: checksum = 0 + p +} + +/// A minimal Ethernet + IPv4 + TCP segment (54 bytes, SYN or DATA based on flags). +fn tcp_pkt( + src_ip_last: u8, + dst_ip_last: u8, + src_port: u16, + dst_port: u16, + flags: u8, + seq: u32, +) -> Vec { + let mut p = vec![0u8; 54]; + // Ethernet (14 bytes) + p[0..6].copy_from_slice(&[0xff, 0xff, 0xff, 0xff, 0xff, 0xff]); + p[6..12].copy_from_slice(&[0x00, 0x11, 0x22, 0x33, 0x44, 0x55]); + p[12..14].copy_from_slice(&[0x08, 0x00]); + // IPv4 (20 bytes, starts at p[14]) + p[14] = 0x45; // version=4, IHL=5 + p[15] = 0x00; + p[16..18].copy_from_slice(&40u16.to_be_bytes()); // total length = 20 IP + 20 TCP + // p[18..22]: identification + flags+frag = 0 + p[22] = 0x40; // TTL = 64 + p[23] = 0x06; // protocol = 6 (TCP) + // p[24..26]: checksum = 0 + p[26] = 10; + p[27] = 0; + p[28] = 0; + p[29] = src_ip_last; + p[30] = 10; + p[31] = 0; + p[32] = 0; + p[33] = dst_ip_last; + // TCP (20 bytes, starts at p[34]) + p[34..36].copy_from_slice(&src_port.to_be_bytes()); + p[36..38].copy_from_slice(&dst_port.to_be_bytes()); + p[38..42].copy_from_slice(&seq.to_be_bytes()); // seq number + p[42..46].copy_from_slice(&0u32.to_be_bytes()); // ack number + p[46] = 0x50; // data offset = 5 (20 bytes header), reserved bits = 0 + p[47] = flags; + p[48..50].copy_from_slice(&65535u16.to_be_bytes()); // window size + p +} + +/// Build a synthetic pcap with `n_rounds` rounds of: +/// - Several UDP packets (varying src/dst IPs and ports) +/// - A TCP packet with `tcp.dst_port > 1024` +/// +/// Total packets ≈ n_rounds * 5. +pub fn build_mixed_pcap(n_rounds: usize) -> Vec { + let mut pcap = Vec::new(); + // Global header + pcap.extend_from_slice(&0xA1B2C3D4u32.to_le_bytes()); + pcap.extend_from_slice(&2u16.to_le_bytes()); + pcap.extend_from_slice(&4u16.to_le_bytes()); + pcap.extend_from_slice(&0i32.to_le_bytes()); + pcap.extend_from_slice(&0u32.to_le_bytes()); + pcap.extend_from_slice(&65535u32.to_le_bytes()); + pcap.extend_from_slice(&1u32.to_le_bytes()); // Ethernet + + let mut pkt_idx = 0usize; + + for i in 0..n_rounds { + let ts = (i as u32) * 5; + + // UDP packet 1: 10.0.0.1 -> 10.0.0.2 port 4096->4097 + let u1 = udp_pkt(1, 2, 4096, 4097); + push_pkt(&mut pcap, ts, 0, &u1); + pkt_idx += 1; + + // UDP packet 2: 10.0.0.3 -> 10.0.0.4 port 5000->5001 + let u2 = udp_pkt(3, 4, 5000, 5001); + push_pkt(&mut pcap, ts + 1, 0, &u2); + pkt_idx += 1; + + // UDP packet 3: 10.0.0.1 -> 10.0.0.5 port 9000->9001 + let u3 = udp_pkt(1, 5, 9000, 9001); + push_pkt(&mut pcap, ts + 2, 0, &u3); + pkt_idx += 1; + + // TCP SYN: 10.0.0.10 -> 10.0.0.20 port 12345->2000 + let t1 = tcp_pkt(10, 20, 12345, 2000, 0x02, (pkt_idx as u32) * 100); + push_pkt(&mut pcap, ts + 3, 0, &t1); + pkt_idx += 1; + + // TCP data: 10.0.0.10 -> 10.0.0.20 port 12345->2000 + let t2 = tcp_pkt(10, 20, 12345, 2000, 0x18, (pkt_idx as u32) * 100); + push_pkt(&mut pcap, ts + 4, 0, &t2); + pkt_idx += 1; + } + let _ = pkt_idx; // suppress warning + pcap +} + +fn push_pkt(buf: &mut Vec, ts_sec: u32, ts_usec: u32, pkt: &[u8]) { + buf.extend_from_slice(&ts_sec.to_le_bytes()); + buf.extend_from_slice(&ts_usec.to_le_bytes()); + buf.extend_from_slice(&(pkt.len() as u32).to_le_bytes()); + buf.extend_from_slice(&(pkt.len() as u32).to_le_bytes()); + buf.extend_from_slice(pkt); +} + +fn write_mixed_pcap(n_rounds: usize) -> NamedTempFile { + let pcap = build_mixed_pcap(n_rounds); + let mut tmp = NamedTempFile::with_suffix(".pcap").unwrap(); + tmp.write_all(&pcap).unwrap(); + tmp +} + +// --------------------------------------------------------------------------- +// Helpers for running dsct read +// --------------------------------------------------------------------------- + +fn dsct_read_stdout(path: &str, extra_args: &[&str]) -> Vec { + let mut cmd = Command::cargo_bin("dsct").unwrap(); + cmd.arg("read").arg(path); + for arg in extra_args { + cmd.arg(arg); + } + let out = cmd.output().unwrap(); + assert!( + out.status.success(), + "dsct read failed (args={extra_args:?}): {}", + String::from_utf8_lossy(&out.stderr) + ); + out.stdout +} + +// --------------------------------------------------------------------------- +// Equivalence tests: parallel must produce byte-identical output to sequential +// --------------------------------------------------------------------------- + +/// Test that `--threads 4` and `--threads 1` produce the same output for a +/// given filter. +fn assert_parallel_equals_sequential(path: &str, filter: &str) { + let seq = dsct_read_stdout(path, &["-f", filter, "--no-limit", "--threads", "1"]); + let par = dsct_read_stdout(path, &["-f", filter, "--no-limit", "--threads", "4"]); + assert_eq!( + seq, par, + "parallel output differs from sequential for filter {filter:?}" + ); +} + +#[test] +fn parallel_udp_filter_equals_sequential() { + let tmp = write_mixed_pcap(200); // 1000 packets + let path = tmp.path().to_str().unwrap(); + assert_parallel_equals_sequential(path, "udp"); +} + +#[test] +fn parallel_tcp_dst_port_filter_equals_sequential() { + let tmp = write_mixed_pcap(200); + let path = tmp.path().to_str().unwrap(); + assert_parallel_equals_sequential(path, "tcp.dst_port > 1024"); +} + +#[test] +fn parallel_ipv4_src_filter_equals_sequential() { + let tmp = write_mixed_pcap(200); + let path = tmp.path().to_str().unwrap(); + // 10.0.0.1 is used in UDP packets in the generator + assert_parallel_equals_sequential(path, "ipv4.src = '10.0.0.1'"); +} + +// --------------------------------------------------------------------------- +// Fallback correctness: unsafe filter goes through sequential path +// --------------------------------------------------------------------------- + +/// `--threads 4` with an HTTP or DNS filter must succeed and equal +/// `--threads 1` (both go through the sequential path since these protocols +/// are not parallel-safe). +#[test] +fn fallback_http_filter_succeeds_and_equals_sequential() { + let tmp = write_mixed_pcap(100); + let path = tmp.path().to_str().unwrap(); + let seq = dsct_read_stdout(path, &["-f", "http", "--no-limit", "--threads", "1"]); + let par = dsct_read_stdout(path, &["-f", "http", "--no-limit", "--threads", "4"]); + assert_eq!(seq, par, "fallback http output should equal sequential"); +} + +#[test] +fn fallback_dns_filter_succeeds_and_equals_sequential() { + let tmp = write_mixed_pcap(100); + let path = tmp.path().to_str().unwrap(); + let seq = dsct_read_stdout(path, &["-f", "dns", "--no-limit", "--threads", "1"]); + let par = dsct_read_stdout(path, &["-f", "dns", "--no-limit", "--threads", "4"]); + assert_eq!(seq, par, "fallback dns output should equal sequential"); +} + +// --------------------------------------------------------------------------- +// Order/limit interplay +// --------------------------------------------------------------------------- + +#[test] +fn parallel_count_yields_first_n_matches() { + let tmp = write_mixed_pcap(200); + let path = tmp.path().to_str().unwrap(); + // Both should give exactly the first 10 UDP matches + let seq = dsct_read_stdout(path, &["-f", "udp", "--count", "10", "--threads", "1"]); + let par = dsct_read_stdout(path, &["-f", "udp", "--count", "10", "--threads", "4"]); + assert_eq!( + seq, par, + "count-limited parallel output differs from sequential" + ); + let lines: Vec<&[u8]> = seq + .split(|&b| b == b'\n') + .filter(|l| !l.is_empty()) + .collect(); + assert_eq!(lines.len(), 10, "expected exactly 10 lines"); +} + +#[test] +fn parallel_offset_skips_n_matches() { + let tmp = write_mixed_pcap(200); + let path = tmp.path().to_str().unwrap(); + let seq = dsct_read_stdout( + path, + &["-f", "udp", "--offset", "5", "--no-limit", "--threads", "1"], + ); + let par = dsct_read_stdout( + path, + &["-f", "udp", "--offset", "5", "--no-limit", "--threads", "4"], + ); + assert_eq!(seq, par, "offset parallel output differs from sequential"); +} + +#[test] +fn parallel_sample_rate_combined_offset_count() { + let tmp = write_mixed_pcap(400); + let path = tmp.path().to_str().unwrap(); + // sample every 3rd, offset 2, count 5 + let seq = dsct_read_stdout( + path, + &[ + "-f", + "udp", + "-s", + "3", + "--offset", + "2", + "--count", + "5", + "--threads", + "1", + ], + ); + let par = dsct_read_stdout( + path, + &[ + "-f", + "udp", + "-s", + "3", + "--offset", + "2", + "--count", + "5", + "--threads", + "4", + ], + ); + assert_eq!( + seq, par, + "combined sample/offset/count parallel output differs" + ); +} + +// --------------------------------------------------------------------------- +// Error cases +// --------------------------------------------------------------------------- + +#[test] +fn invalid_decode_as_on_parallel_path_exits_with_code_2() { + // --decode-as must be validated even on the parallel path; a silent + // empty-output success (exit 0) would violate the structured-error + // contract. + let tmp = write_mixed_pcap(10); + let path = tmp.path().to_str().unwrap(); + let out = Command::cargo_bin("dsct") + .unwrap() + .args([ + "read", + path, + "-f", + "udp", + "--threads", + "4", + "--decode-as", + "invalid_format", + ]) + .output() + .unwrap(); + assert_eq!( + out.status.code(), + Some(2), + "expected exit code 2 for invalid --decode-as on parallel path" + ); + let stderr = String::from_utf8_lossy(&out.stderr); + let v: serde_json::Value = serde_json::from_str(stderr.trim()).expect("stderr must be JSON"); + assert!( + v.get("error").is_some(), + "stderr must contain an 'error' key" + ); +} + +#[test] +fn threads_zero_exits_with_code_2() { + let tmp = write_mixed_pcap(10); + let path = tmp.path().to_str().unwrap(); + let out = Command::cargo_bin("dsct") + .unwrap() + .args(["read", path, "-f", "udp", "--threads", "0"]) + .output() + .unwrap(); + assert_eq!( + out.status.code(), + Some(2), + "expected exit code 2 for --threads 0" + ); + let stderr = String::from_utf8_lossy(&out.stderr); + let v: serde_json::Value = serde_json::from_str(stderr.trim()).expect("stderr must be JSON"); + assert!( + v.get("error").is_some(), + "stderr must contain an 'error' key" + ); +} + +#[test] +fn dsct_threads_env_unparsable_exits_with_code_2() { + let tmp = write_mixed_pcap(10); + let path = tmp.path().to_str().unwrap(); + let out = Command::cargo_bin("dsct") + .unwrap() + .args(["read", path, "-f", "udp"]) + .env("DSCT_THREADS", "abc") + .output() + .unwrap(); + assert_eq!( + out.status.code(), + Some(2), + "expected exit code 2 for DSCT_THREADS=abc: stderr={}", + String::from_utf8_lossy(&out.stderr) + ); + let stderr = String::from_utf8_lossy(&out.stderr); + let v: serde_json::Value = serde_json::from_str(stderr.trim()).expect("stderr must be JSON"); + assert!(v.get("error").is_some()); +} + +#[test] +fn dsct_threads_env_equals_flag() { + let tmp = write_mixed_pcap(200); + let path = tmp.path().to_str().unwrap(); + let via_flag = dsct_read_stdout(path, &["-f", "udp", "--no-limit", "--threads", "4"]); + // Use DSCT_THREADS env without --threads flag + let mut cmd = Command::cargo_bin("dsct").unwrap(); + cmd.arg("read") + .arg(path) + .arg("-f") + .arg("udp") + .arg("--no-limit"); + cmd.env("DSCT_THREADS", "4"); + let out = cmd.output().unwrap(); + assert!( + out.status.success(), + "DSCT_THREADS=4 should succeed: {}", + String::from_utf8_lossy(&out.stderr) + ); + assert_eq!( + via_flag, out.stdout, + "DSCT_THREADS=4 must equal --threads 4" + ); +} + +// --------------------------------------------------------------------------- +// Stdin still streams sequentially +// --------------------------------------------------------------------------- + +#[test] +fn stdin_with_threads_flag_succeeds_sequentially() { + let pcap = build_mixed_pcap(20); + // Run with file to get expected output + let tmp = write_mixed_pcap(20); + let path = tmp.path().to_str().unwrap(); + let file_out = dsct_read_stdout(path, &["-f", "udp", "--no-limit", "--threads", "4"]); + + // Run via stdin + let mut cmd = Command::cargo_bin("dsct").unwrap(); + cmd.args(["read", "-", "-f", "udp", "--no-limit", "--threads", "4"]); + cmd.write_stdin(pcap); + let out = cmd.output().unwrap(); + assert!( + out.status.success(), + "stdin + --threads should succeed: {}", + String::from_utf8_lossy(&out.stderr) + ); + assert_eq!( + file_out, out.stdout, + "stdin output must equal file output for --threads 4" + ); +} From 4654b29deba645b62cf16f0ace8bf602c5de5c90 Mon Sep 17 00:00:00 2001 From: Yuya Kusakabe Date: Fri, 12 Jun 2026 02:39:26 +0000 Subject: [PATCH 5/6] bench: add parallel filter scan benchmark Add benches/parallel_read.rs comparing parallel_read::run with threads=1 vs threads=4 on a 20k-packet mixed UDP/TCP pcap, writing to io::sink(). sample_size=10 keeps cargo bench fast. Verified via cargo bench --bench parallel_read -- --test. --- Cargo.toml | 4 ++ benches/parallel_read.rs | 152 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 benches/parallel_read.rs diff --git a/Cargo.toml b/Cargo.toml index 199b505..6980a08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -174,6 +174,10 @@ name = "tui_index" harness = false required-features = ["tui"] +[[bench]] +name = "parallel_read" +harness = false + # The profile that 'dist' will build with [profile.dist] inherits = "release" diff --git a/benches/parallel_read.rs b/benches/parallel_read.rs new file mode 100644 index 0000000..c532d25 --- /dev/null +++ b/benches/parallel_read.rs @@ -0,0 +1,152 @@ +//! Benchmark for parallel filter evaluation (`dsct read --threads`). +//! +//! Generates a synthetic pcap with mixed UDP and TCP packets and measures +//! the throughput of the parallel read engine at threads=1 vs threads=4, +//! writing matched records to [`io::sink()`]. + +use std::io; +use std::path::Path; + +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; + +use dsct::parallel_read::{ParallelReadOptions, run}; + +// --------------------------------------------------------------------------- +// Pcap generation +// --------------------------------------------------------------------------- + +/// Build a synthetic pcap with `n` packets alternating UDP and TCP. +/// +/// Even-indexed packets: Ethernet + IPv4 + UDP (42 bytes). +/// Odd-indexed packets: Ethernet + IPv4 + TCP (54 bytes). +fn build_bench_pcap(n: usize) -> Vec { + let mut buf = Vec::with_capacity(24 + n * 60); + + // Global header + buf.extend_from_slice(&0xA1B2C3D4u32.to_le_bytes()); + buf.extend_from_slice(&2u16.to_le_bytes()); + buf.extend_from_slice(&4u16.to_le_bytes()); + buf.extend_from_slice(&0i32.to_le_bytes()); + buf.extend_from_slice(&0u32.to_le_bytes()); + buf.extend_from_slice(&65535u32.to_le_bytes()); + buf.extend_from_slice(&1u32.to_le_bytes()); // Ethernet + + // UDP packet template (42 bytes): Eth + IPv4 (UDP, 10.0.0.1→10.0.0.2) + let udp: &[u8] = &[ + // Ethernet (14 bytes) + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // dst mac + 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, // src mac + 0x08, 0x00, // ethertype IPv4 + // IPv4 (20 bytes) + 0x45, 0x00, 0x00, 0x1c, // version/IHL, DSCP, total length (28) + 0x00, 0x00, 0x00, 0x00, // id, flags+frag + 0x40, 0x11, 0x00, 0x00, // TTL=64, proto=UDP, checksum=0 + 0x0a, 0x00, 0x00, 0x01, // src 10.0.0.1 + 0x0a, 0x00, 0x00, 0x02, // dst 10.0.0.2 + // UDP (8 bytes) + 0x10, 0x00, // src port 4096 + 0x10, 0x01, // dst port 4097 + 0x00, 0x08, // length 8 + 0x00, 0x00, // checksum + ]; + + // TCP SYN packet template (54 bytes): Eth + IPv4 (TCP, 10.0.0.3→10.0.0.4) + let tcp: &[u8] = &[ + // Ethernet (14 bytes) + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x08, 0x00, + // IPv4 (20 bytes) + 0x45, 0x00, 0x00, 0x28, // total length = 40 + 0x00, 0x00, 0x00, 0x00, 0x40, 0x06, 0x00, 0x00, // TTL=64, proto=TCP + 0x0a, 0x00, 0x00, 0x03, // src 10.0.0.3 + 0x0a, 0x00, 0x00, 0x04, // dst 10.0.0.4 + // TCP (20 bytes) + 0x30, 0x39, // src port 12345 + 0x07, 0xd0, // dst port 2000 + 0x00, 0x00, 0x00, 0x01, // seq + 0x00, 0x00, 0x00, 0x00, // ack + 0x50, 0x02, // data offset=5, SYN + 0xff, 0xff, // window + 0x00, 0x00, // checksum + 0x00, 0x00, // urgent + ]; + + for i in 0..n { + let pkt = if i % 2 == 0 { udp } else { tcp }; + let ts_sec = (i / 1_000_000) as u32; + let ts_usec = (i % 1_000_000) as u32; + buf.extend_from_slice(&ts_sec.to_le_bytes()); + buf.extend_from_slice(&ts_usec.to_le_bytes()); + buf.extend_from_slice(&(pkt.len() as u32).to_le_bytes()); + buf.extend_from_slice(&(pkt.len() as u32).to_le_bytes()); + buf.extend_from_slice(pkt); + } + + buf +} + +/// Write a bench pcap to `path` and return the file size in bytes. +fn write_bench_pcap(path: &Path, n: usize) -> u64 { + let data = build_bench_pcap(n); + std::fs::write(path, &data).expect("write bench pcap"); + data.len() as u64 +} + +// --------------------------------------------------------------------------- +// Benchmark +// --------------------------------------------------------------------------- + +fn bench_parallel_read(c: &mut Criterion) { + const N: usize = 20_000; // ~20 k packets; each is 42 or 54 bytes + + let path = std::env::temp_dir().join(format!( + "dsct_bench_parallel_{}_{}.pcap", + N, + std::process::id() + )); + let file_size = write_bench_pcap(&path, N); + + let mut group = c.benchmark_group("parallel_read"); + group.sample_size(10); + group.throughput(Throughput::Bytes(file_size)); + + // Filter that is parallel-safe: match all UDP packets. + let filter = "udp"; + + for &threads in &[1usize, 4usize] { + group.bench_with_input( + BenchmarkId::new("udp_filter", format!("threads={threads}")), + &threads, + |b, &t| { + b.iter(|| { + let mut sink = io::sink(); + run( + &ParallelReadOptions { + path: &path, + filter_str: filter, + decode_as_args: &[], + threads: t, + sample_rate: 1, + offset: 0, + count: None, + pn_filter: None, + field_config: None, // verbose mode + raw_bytes: false, + progress_interval: 0, + }, + &mut sink, + &mut |_, _| {}, + &mut |_, _| {}, + ) + .expect("parallel_read::run failed in benchmark"); + }); + }, + ); + } + + group.finish(); + + let _ = std::fs::remove_file(&path); +} + +criterion_group!(benches, bench_parallel_read); +criterion_main!(benches); From ef2022d6edd96472c7f18a1eb0c4b4e346337326 Mon Sep 17 00:00:00 2001 From: Yuya Kusakabe Date: Fri, 12 Jun 2026 03:28:09 +0000 Subject: [PATCH 6/6] fix(read): align progress and serialization error semantics with sequential path Workers now report true per-batch packet counts so --progress reflects all processed packets, and serialization failures abort the run instead of silently dropping matched packets. Also normalize the protocol name in the Where arm of is_parallel_safe for symmetry with the Protocol arm. --- src/filter_expr.rs | 9 +- src/parallel_read.rs | 159 +++++++++++++++++++++++++++----- tests/cli_parallel_read_test.rs | 43 +++++++++ 3 files changed, 186 insertions(+), 25 deletions(-) diff --git a/src/filter_expr.rs b/src/filter_expr.rs index f0e9daf..15644a5 100644 --- a/src/filter_expr.rs +++ b/src/filter_expr.rs @@ -133,12 +133,15 @@ impl FilterExpr { SAFE_PROTOCOLS.iter().any(|&s| s == norm) } FilterExpr::Where(clause) => { - if !SAFE_PROTOCOLS.iter().any(|&s| s == clause.protocol) { + // WhereClause::new already normalises the protocol name, but + // normalise again so this check does not silently depend on + // that constructor invariant (mirrors the Protocol arm). + let norm = crate::filter::normalize_protocol_name(&clause.protocol); + if !SAFE_PROTOCOLS.iter().any(|&s| s == norm) { return false; } // TCP stateful fields are unsafe even though TCP itself is safe. - if clause.protocol == "tcp" && UNSAFE_TCP_FIELDS.iter().any(|&f| f == clause.field) - { + if norm == "tcp" && UNSAFE_TCP_FIELDS.iter().any(|&f| f == clause.field) { return false; } true diff --git a/src/parallel_read.rs b/src/parallel_read.rs index 59dca8d..f78a964 100644 --- a/src/parallel_read.rs +++ b/src/parallel_read.rs @@ -108,10 +108,8 @@ pub struct ParallelReadOptions<'a> { /// /// Provides enough information for the caller to emit a truncation warning. pub struct ReadOutcome { - /// Total packets read from the file (after packet-number filtering). - /// Note: because workers do not report non-matching packets back to the - /// merger, this count reflects only matched-or-warned packets. It is - /// provided primarily for truncation detection. + /// Total packets read from the file (after packet-number filtering), + /// matching or not — same semantics as the sequential path. pub packets_processed: u64, /// Number of JSON records written to the writer. pub packets_written: u64, @@ -133,10 +131,20 @@ enum WorkerEntry { Match(Vec), /// Dissection failed for this packet number. Warning { number: u64, message: String }, + /// Serialisation failed for a matched packet. The merger treats this as + /// fatal, mirroring the sequential path where `write_packet_json` errors + /// propagate via `?`. + Fatal { number: u64, message: String }, } /// One batch of results sent from a worker back to the merger. -type OutputBatch = Vec; +struct OutputBatch { + /// Number of input packets this batch represents (matching or not), used + /// by the merger for `packets_processed` accounting. + packets: u64, + /// Per-packet entries (matches and warnings) in original packet order. + entries: Vec, +} // --------------------------------------------------------------------------- // Public entry point @@ -153,8 +161,10 @@ type OutputBatch = Vec; /// Callbacks: /// - `warn(packet_number, message)` — called in packet order for per-packet /// dissection warnings. -/// - `progress(packets_processed, packets_written)` — called approximately -/// every `opts.progress_interval` output records (at batch granularity). +/// - `progress(packets_processed, packets_written)` — called whenever +/// `packets_processed` crosses a multiple of `opts.progress_interval`. +/// Unlike the sequential path this fires at batch granularity, so at most +/// once per batch even when a batch crosses several interval boundaries. pub fn run( opts: &ParallelReadOptions<'_>, writer: &mut W, @@ -343,12 +353,13 @@ fn worker_fn( let mut json_buf: Vec = Vec::with_capacity(4096); for batch in &irx { - let mut results: OutputBatch = Vec::with_capacity(batch.len()); + let packets = batch.len() as u64; + let mut entries: Vec = Vec::with_capacity(batch.len()); for (meta, data) in &batch { let dbuf = dissect_buf.clear_into(); if let Err(e) = registry.dissect_with_link_type(data, meta.link_type, dbuf) { - results.push(WorkerEntry::Warning { + entries.push(WorkerEntry::Warning { number: meta.number, message: format!("{e}"), }); @@ -363,21 +374,30 @@ fn worker_fn( } json_buf.clear(); - if write_packet_json( + match write_packet_json( &mut json_buf, meta, dbuf, data.as_slice(), field_config.as_ref(), raw_bytes, - ) - .is_ok() - { - results.push(WorkerEntry::Match(json_buf.clone())); + ) { + Ok(()) => entries.push(WorkerEntry::Match(json_buf.clone())), + Err(e) => { + // Fatal: the merger aborts the whole run on this entry, + // mirroring the sequential path. Send what we have and + // exit the worker. + entries.push(WorkerEntry::Fatal { + number: meta.number, + message: format!("{e}"), + }); + let _ = otx.send(OutputBatch { packets, entries }); + return; + } } } - if otx.send(results).is_err() { + if otx.send(OutputBatch { packets, entries }).is_err() { // Merger dropped the receiver (count limit reached); exit cleanly. break; } @@ -408,6 +428,9 @@ fn merger_fn( let mut results_matched = 0u64; let mut truncated_by_limit = false; let mut worker_idx = 0usize; + // packets_processed value at the last progress report, for interval + // boundary-crossing detection. + let mut progress_marker = 0u64; // Receive from workers in strict round-robin order (same order the reader // sent batches to them), preserving global packet order. @@ -419,15 +442,19 @@ fn merger_fn( break 'outer; } Ok(batch) => { - // Count packets represented in this batch for progress reporting. - let batch_count = batch.len() as u64; - packets_processed = packets_processed.saturating_add(batch_count); + packets_processed = packets_processed.saturating_add(batch.packets); - for entry in batch { + for entry in batch.entries { match entry { WorkerEntry::Warning { number, message } => { warn(number, &message); } + WorkerEntry::Fatal { number, message } => { + stop.store(true, Ordering::Relaxed); + return Err(DsctError::msg(format!( + "failed to serialize packet {number}: {message}" + ))); + } WorkerEntry::Match(json_bytes) => { filter_matches += 1; if sample_rate > 1 && !(filter_matches - 1).is_multiple_of(sample_rate) @@ -453,11 +480,13 @@ fn merger_fn( } } - // Progress reporting at batch granularity. + // Progress reporting at batch granularity: fire when + // packets_processed crossed a multiple of the interval since + // the last report (sequential semantics, batched). if progress_interval > 0 - && packets_written > 0 - && packets_written.is_multiple_of(progress_interval) + && packets_processed / progress_interval > progress_marker / progress_interval { + progress_marker = packets_processed; progress(packets_processed, packets_written); } @@ -472,3 +501,89 @@ fn merger_fn( truncated_by_limit, }) } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_opts(progress_interval: u64) -> ParallelReadOptions<'static> { + ParallelReadOptions { + path: Path::new("unused"), + filter_str: "udp", + decode_as_args: &[], + threads: 2, + sample_rate: 1, + offset: 0, + count: None, + pn_filter: None, + field_config: None, + raw_bytes: false, + progress_interval, + } + } + + #[test] + fn merger_aborts_on_fatal_entry() { + let (tx, rx) = mpsc::sync_channel::(2); + tx.send(OutputBatch { + packets: 1, + entries: vec![WorkerEntry::Fatal { + number: 7, + message: "boom".into(), + }], + }) + .unwrap(); + drop(tx); + + let opts = test_opts(0); + let mut rxs = vec![rx]; + let stop = AtomicBool::new(false); + let mut out: Vec = Vec::new(); + let result = merger_fn( + &opts, + &mut rxs, + &mut out, + &mut |_, _| {}, + &mut |_, _| {}, + &stop, + ); + + assert!(result.is_err(), "Fatal entry must abort the merge"); + assert!(stop.load(Ordering::Relaxed), "stop flag must be set"); + } + + #[test] + fn merger_progress_counts_all_packets() { + // Two batches of 300 packets with no matching entries: progress must + // fire once the 500-packet interval boundary is crossed, with + // packets_processed counting all packets (not just matches). + let (tx, rx) = mpsc::sync_channel::(2); + for _ in 0..2 { + tx.send(OutputBatch { + packets: 300, + entries: Vec::new(), + }) + .unwrap(); + } + drop(tx); + + let opts = test_opts(500); + let mut rxs = vec![rx]; + let stop = AtomicBool::new(false); + let mut out: Vec = Vec::new(); + let mut reports: Vec<(u64, u64)> = Vec::new(); + let outcome = merger_fn( + &opts, + &mut rxs, + &mut out, + &mut |_, _| {}, + &mut |processed, written| reports.push((processed, written)), + &stop, + ) + .unwrap(); + + assert_eq!(outcome.packets_processed, 600); + assert_eq!(outcome.packets_written, 0); + assert_eq!(reports, vec![(600, 0)]); + } +} diff --git a/tests/cli_parallel_read_test.rs b/tests/cli_parallel_read_test.rs index 54b78c0..f8bf64a 100644 --- a/tests/cli_parallel_read_test.rs +++ b/tests/cli_parallel_read_test.rs @@ -314,6 +314,49 @@ fn parallel_sample_rate_combined_offset_count() { // Error cases // --------------------------------------------------------------------------- +#[test] +fn parallel_progress_reports_total_processed_packets() { + // --progress must report packets_processed counting ALL packets read + // (like the sequential path), not just filter-matching ones. + // 400 rounds = 2000 packets, 1200 of which are UDP matches. + let tmp = write_mixed_pcap(400); + let path = tmp.path().to_str().unwrap(); + let out = Command::cargo_bin("dsct") + .unwrap() + .args([ + "read", + path, + "-f", + "udp", + "--no-limit", + "--threads", + "4", + "--progress", + "500", + ]) + .output() + .unwrap(); + assert!(out.status.success()); + + let stderr = String::from_utf8_lossy(&out.stderr); + let processed: Vec = stderr + .lines() + .filter_map(|l| serde_json::from_str::(l).ok()) + .filter_map(|v| v.pointer("/progress/packets_processed")?.as_u64()) + .collect(); + assert!( + !processed.is_empty(), + "expected at least one progress report on stderr, got: {stderr}" + ); + let max = processed.iter().copied().max().unwrap(); + // Only 1200 packets match; reaching >= 1500 proves the count covers all + // processed packets rather than matches only. + assert!( + max >= 1500, + "packets_processed must count all packets (got max {max})" + ); +} + #[test] fn invalid_decode_as_on_parallel_path_exits_with_code_2() { // --decode-as must be validated even on the parallel path; a silent