diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5f9ecc6806..3766272c19 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -301,7 +301,7 @@ jobs: build_ciphernode_image: needs: [detect_changes] if: needs.detect_changes.outputs.docker_ciphernode == 'true' - timeout-minutes: 30 + timeout-minutes: 60 runs-on: 'ubuntu-latest' steps: - uses: actions/checkout@v6 diff --git a/Cargo.lock b/Cargo.lock index aa076f755c..9abc007adb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,7 +131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -248,7 +248,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -259,7 +259,7 @@ checksum = "b6ac1e58cded18cb28ddc17143c4dea5345b3ad575e14f32f66e4054a56eb271" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -804,7 +804,7 @@ checksum = "ce8849c74c9ca0f5a03da1c865e3eb6f768df816e67dd3721a398a8a7e398011" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1002,7 +1002,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1019,7 +1019,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", "syn-solidity", "tiny-keccak", ] @@ -1038,7 +1038,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.114", + "syn 2.0.116", "syn-solidity", ] @@ -1166,7 +1166,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1403,7 +1403,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62945a2f7e6de02a31fe400aa489f0e0f5b2502e69f95f853adb82a96c7a6b60" dependencies = [ "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1441,7 +1441,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1538,7 +1538,7 @@ checksum = "213888f660fddcca0d257e88e54ac05bca01885f258ccdf695bafd77031bb69d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1610,7 +1610,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", "synstructure", ] @@ -1622,7 +1622,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1724,7 +1724,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1773,7 +1773,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1790,7 +1790,7 @@ checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -1851,7 +1851,7 @@ checksum = "ffdcb70bdbc4d478427380519163274ac86e52916e10f0a8889adf0f96d3fee7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -2351,7 +2351,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -2746,7 +2746,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -2771,7 +2771,7 @@ dependencies = [ "quote", "serde", "strsim", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -2782,7 +2782,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -2822,7 +2822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 2.0.114", + "syn 1.0.109", ] [[package]] @@ -2890,7 +2890,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version 0.4.1", - "syn 2.0.114", + "syn 2.0.116", "unicode-xid", ] @@ -2996,7 +2996,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -3125,6 +3125,7 @@ dependencies = [ "actix", "alloy", "anyhow", + "chrono", "clap", "compile-time", "dialoguer", @@ -3133,13 +3134,16 @@ dependencies = [ "e3-config", "e3-console", "e3-crypto", + "e3-daemon-server", "e3-entrypoint", "e3-events", "e3-evm", + "e3-fhe-params", "e3-init", - "e3-socket-server", "e3-support-scripts", + "e3-trbfv", "e3-utils", + "e3-zk-helpers", "e3-zk-prover", "hex", "opentelemetry", @@ -3224,6 +3228,20 @@ dependencies = [ "zeroize", ] +[[package]] +name = "e3-daemon-server" +version = "0.1.15" +dependencies = [ + "anyhow", + "e3-config", + "e3-console", + "reqwest", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "e3-data" version = "0.1.15" @@ -3259,6 +3277,7 @@ dependencies = [ "e3-ciphernode-builder", "e3-config", "e3-crypto", + "e3-daemon-server", "e3-data", "e3-events", "e3-evm", @@ -3267,7 +3286,6 @@ dependencies = [ "e3-logger", "e3-net", "e3-request", - "e3-socket-server", "e3-sortition", "e3-test-helpers", "e3-zk-prover", @@ -3651,19 +3669,6 @@ dependencies = [ "e3-indexer", ] -[[package]] -name = "e3-socket-server" -version = "0.1.15" -dependencies = [ - "anyhow", - "e3-config", - "e3-console", - "serde", - "serde_json", - "tokio", - "tracing", -] - [[package]] name = "e3-sortition" version = "0.1.15" @@ -3834,6 +3839,8 @@ dependencies = [ "alloy", "anyhow", "derivative", + "e3-utils-derive", + "hex", "rand 0.8.5", "rand_chacha 0.3.1", "regex", @@ -3842,6 +3849,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "e3-utils-derive" +version = "0.1.15" +dependencies = [ + "hex", + "proc-macro2", + "quote", + "syn 2.0.116", +] + [[package]] name = "e3-wasm" version = "0.1.15" @@ -3981,7 +3998,7 @@ dependencies = [ "enum-ordinalize", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -4055,7 +4072,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -4075,7 +4092,7 @@ checksum = "8ca9601fb2d62598ee17836250842873a413586e5d7ed88b356e38ddbb0ec631" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -4461,7 +4478,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -5233,7 +5250,7 @@ checksum = "a0eb5a3343abf848c0984fe4604b2b105da9539376e24fc0a3b0007411ae4fd9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -5940,7 +5957,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -6138,7 +6155,7 @@ checksum = "1b27834086c65ec3f9387b096d66e99f221cf081c2b738042aa252bcd41204e3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -6149,7 +6166,7 @@ checksum = "757aee279b8bdbb9f9e676796fd459e4207a1f986e87886700abf589f5abf771" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -6743,7 +6760,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -6831,7 +6848,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -7017,7 +7034,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -7117,7 +7134,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -7231,7 +7248,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -7260,7 +7277,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -7393,7 +7410,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -7455,7 +7472,7 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -7475,7 +7492,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", "version_check", "yansi", ] @@ -7500,7 +7517,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -7559,7 +7576,7 @@ dependencies = [ "prost 0.12.6", "prost-types 0.12.6", "regex", - "syn 2.0.114", + "syn 2.0.116", "tempfile", ] @@ -7569,8 +7586,8 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ - "heck 0.4.1", - "itertools 0.10.5", + "heck 0.5.0", + "itertools 0.14.0", "log", "multimap", "once_cell", @@ -7579,7 +7596,7 @@ dependencies = [ "prost 0.13.5", "prost-types 0.13.5", "regex", - "syn 2.0.114", + "syn 2.0.116", "tempfile", ] @@ -7593,7 +7610,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -7606,7 +7623,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -8011,7 +8028,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -8258,7 +8275,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.114", + "syn 2.0.116", "walkdir", ] @@ -8662,7 +8679,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -8727,7 +8744,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -8775,7 +8792,7 @@ checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -9095,7 +9112,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -9117,9 +9134,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.114" +version = "2.0.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a" +checksum = "3df424c70518695237746f84cede799c9c58fcb37450d7b23716568cc8bc69cb" dependencies = [ "proc-macro2", "quote", @@ -9135,7 +9152,7 @@ dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -9155,7 +9172,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -9257,7 +9274,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -9268,7 +9285,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -9392,7 +9409,7 @@ checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -9663,7 +9680,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -9752,7 +9769,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" dependencies = [ "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -10083,7 +10100,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", "wasm-bindgen-shared", ] @@ -10239,7 +10256,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -10250,7 +10267,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -10678,7 +10695,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", "synstructure", ] @@ -10699,7 +10716,7 @@ checksum = "1328722bbf2115db7e19d69ebcc15e795719e2d66b60827c6a69a117365e37a0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -10719,7 +10736,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", "synstructure", ] @@ -10740,7 +10757,7 @@ checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] @@ -10773,7 +10790,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.116", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f32c173941..b1c4ab9fe4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ members = [ "crates/request", "crates/safe", "crates/sdk", - "crates/socket-server", + "crates/daemon-server", "crates/sortition", "crates/support-scripts", "crates/sync", @@ -38,6 +38,7 @@ members = [ "crates/tests", "crates/trbfv", "crates/utils", + "crates/utils-derive", "crates/wasm", "crates/zk-helpers", "crates/zk-prover", @@ -99,7 +100,7 @@ e3-logger = { version = "0.1.15", path = "./crates/logger" } e3-net = { version = "0.1.15", path = "./crates/net" } e3-compute-provider = { version = "0.1.15", path = "./crates/compute-provider" } e3-sortition = { version = "0.1.15", path = "./crates/sortition" } -e3-socket-server = { version = "0.1.15", path = "./crates/socket-server" } +e3-daemon-server = { version = "0.1.15", path = "./crates/daemon-server" } e3-program-server = { version = "0.1.15", path = "./crates/program-server" } e3-polynomial = { version = "0.1.15", path = "./crates/polynomial" } e3-support-scripts = { version = "0.1.15", path = "./crates/support-scripts" } @@ -108,6 +109,7 @@ e3-test-helpers = { version = "0.1.15", path = "./crates/test-helpers" } e3-tests = { version = "0.1.15", path = "./crates/tests" } e3-trbfv = { version = "0.1.15", path = "./crates/trbfv" } e3-utils = { version = "0.1.15", path = "./crates/utils" } +e3-utils-derive = { version = "0.1.15", path = "./crates/utils-derive" } e3-safe = { version = "0.1.15", path = "./crates/safe" } e3-zk-prover = { version = "0.1.15", path = "./crates/zk-prover" } e3-zk-helpers = { version = "0.1.15", path = "./crates/zk-helpers" } @@ -169,6 +171,8 @@ path-clean = "=1.0.1" petname = "=2.0.2" phf = { version = "=0.11.3", features = ["macros"] } proptest = "=1.9.0" +proc-macro2 = "=1.0.106" +quote = "=1.0.44" rand_chacha = "=0.3.1" rand = "=0.8.5" rayon = "=1.10.0" @@ -183,6 +187,7 @@ sha2 = "=0.10.9" shellexpand = "=3.1.1" sled = "=0.34.7" strum = { version = "=0.27.2", features = ["derive"] } +syn = { version = "=2.0.116", features = ["full"] } tempfile = "=3.20.0" thiserror = { version = "=1.0.69" } tokio = { version = "=1.46.1", features = ["full"] } diff --git a/crates/Dockerfile b/crates/Dockerfile index 5c9f19b8b1..ca289fe2aa 100644 --- a/crates/Dockerfile +++ b/crates/Dockerfile @@ -75,13 +75,14 @@ COPY crates/request/Cargo.toml ./request/Cargo.toml COPY crates/safe/Cargo.toml ./safe/Cargo.toml COPY crates/sdk/Cargo.toml ./sdk/Cargo.toml COPY crates/sortition/Cargo.toml ./sortition/Cargo.toml -COPY crates/socket-server/Cargo.toml ./socket-server/Cargo.toml +COPY crates/daemon-server/Cargo.toml ./daemon-server/Cargo.toml COPY crates/support-scripts/Cargo.toml ./support-scripts/Cargo.toml COPY crates/sync/Cargo.toml ./sync/Cargo.toml COPY crates/test-helpers/Cargo.toml ./test-helpers/Cargo.toml COPY crates/tests/Cargo.toml ./tests/Cargo.toml COPY crates/trbfv/Cargo.toml ./trbfv/Cargo.toml COPY crates/utils/Cargo.toml ./utils/Cargo.toml +COPY crates/utils-derive/Cargo.toml ./utils-derive/Cargo.toml COPY crates/wasm/Cargo.toml ./wasm/Cargo.toml COPY crates/zk-helpers/Cargo.toml ./zk-helpers/Cargo.toml @@ -92,6 +93,9 @@ RUN for d in ./*/ ; do \ if [ "$d" = "./cli/" ] || [ "$d" = "./enclaveup/" ]; then \ mkdir -p "$d/src" && \ echo "fn main() {}" > "$d/src/main.rs"; \ + elif [ "$d" = "./utils-derive/" ]; then \ + mkdir -p "$d/src" && \ + echo "" > "$d/src/lib.rs"; \ else \ mkdir -p "$d/src" && \ echo "pub fn main() {}" > "$d/src/lib.rs"; \ diff --git a/crates/ciphernode-builder/src/ciphernode_builder.rs b/crates/ciphernode-builder/src/ciphernode_builder.rs index 00c2b88008..8c0e0bbbbf 100644 --- a/crates/ciphernode-builder/src/ciphernode_builder.rs +++ b/crates/ciphernode-builder/src/ciphernode_builder.rs @@ -407,17 +407,20 @@ impl CiphernodeBuilder { .with_event_bus(local_bus) .with_aggregate_config(aggregate_config.clone()) .with_global_shared_store(self.global_shared_store) + .with_global_shared_eventstore(self.global_shared_eventstore) } else { if let Some(ref store) = self.in_mem_store { EventSystem::in_mem_from_store(store) .with_event_bus(local_bus) .with_aggregate_config(aggregate_config.clone()) .with_global_shared_store(self.global_shared_store) + .with_global_shared_eventstore(self.global_shared_eventstore) } else { EventSystem::in_mem() .with_event_bus(local_bus) .with_aggregate_config(aggregate_config.clone()) .with_global_shared_store(self.global_shared_store) + .with_global_shared_eventstore(self.global_shared_eventstore) } }; let store = event_system.store()?; diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 8a4492cab1..96c229d69b 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -26,9 +26,13 @@ e3-crypto = { workspace = true } e3-entrypoint = { workspace = true } e3-events = { workspace = true } e3-evm = { workspace = true } +e3-trbfv = { workspace = true } +e3-fhe-params = { workspace = true } +e3-zk-helpers = { workspace = true } +chrono = { workspace = true } e3-init = { workspace = true } e3-support-scripts = { workspace = true } -e3-socket-server = { workspace = true } +e3-daemon-server = { workspace = true } e3-utils = { workspace = true } e3-zk-prover = { workspace = true } hex = { workspace = true } diff --git a/crates/cli/src/cli.rs b/crates/cli/src/cli.rs index d42eed453c..0c22d0f3e3 100644 --- a/crates/cli/src/cli.rs +++ b/crates/cli/src/cli.rs @@ -5,6 +5,8 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use crate::ciphernode::{self, ChainArgs, CiphernodeCommands}; +use crate::config::{self, ConfigCommands}; +use crate::events::{self, EventsCommands}; use crate::helpers::telemetry::{setup_simple_tracing, setup_tracing}; use crate::net::{self, NetCommands}; use crate::nodes::{self, NodeCommands}; @@ -181,7 +183,9 @@ impl Cli { Commands::Ciphernode { command } => ciphernode::execute(out, command, &config).await?, Commands::Noir { command } => noir::execute(out, command, &config).await?, Commands::Net { command } => net::execute(&out, command, &config).await?, + Commands::Events { command } => events::execute(out, command, &config).await?, Commands::Rev => rev::execute(out).await?, + Commands::Config { command } => config::execute(out, command, &config).await?, } close_all_connections(); @@ -297,14 +301,31 @@ pub enum Commands { #[command(subcommand)] command: NetCommands, }, + + /// Query events + Events { + #[command(subcommand)] + command: EventsCommands, + }, + + /// Get config values + Config { + #[command(subcommand)] + command: ConfigCommands, + }, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RemoteCli { + #[serde(default)] name: Option, + #[serde(default)] otel: Option, + #[serde(default)] quiet: bool, + #[serde(default)] config: Option, + #[serde(default)] verbose: u8, command: RemoteCommand, } @@ -312,11 +333,24 @@ pub struct RemoteCli { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum RemoteCommand { NetGetPeerId, - CiphernodeStatus { chain: ChainArgs }, + CiphernodeStatus { + chain: ChainArgs, + }, NoirStatus, WalletGet, + EventsQuery { + agg: Option, + since: Option, + limit: Option, + }, Rev, - PrintEnv { vite: bool, chain: String }, + PrintEnv { + vite: bool, + chain: String, + }, + ConfigGet { + param: Option, + }, } impl TryFrom for RemoteCommand { @@ -335,9 +369,15 @@ impl TryFrom for RemoteCommand { command: CiphernodeCommands::Status { chain }, } => Ok(RemoteCommand::CiphernodeStatus { chain }), Commands::PrintEnv { chain, vite } => Ok(RemoteCommand::PrintEnv { vite, chain }), + Commands::Events { + command: EventsCommands::Query { agg, since, limit }, + } => Ok(RemoteCommand::EventsQuery { agg, since, limit }), Commands::Wallet { command: WalletCommands::Get, } => Ok(RemoteCommand::WalletGet), + Commands::Config { + command: ConfigCommands::Get { param }, + } => Ok(RemoteCommand::ConfigGet { param }), _ => bail!("Command not allowed while node is running."), } } @@ -389,6 +429,12 @@ impl TryFrom for Commands { RemoteCommand::NetGetPeerId => Commands::Net { command: NetCommands::GetPeerId, }, + RemoteCommand::EventsQuery { agg, since, limit } => Commands::Events { + command: EventsCommands::Query { agg, since, limit }, + }, + RemoteCommand::ConfigGet { param } => Commands::Config { + command: ConfigCommands::Get { param }, + }, }; // We might have to hold this stuff on RemoteCommand Ok(command) diff --git a/crates/cli/src/config.rs b/crates/cli/src/config.rs new file mode 100644 index 0000000000..23579bd1de --- /dev/null +++ b/crates/cli/src/config.rs @@ -0,0 +1,113 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::Result; +use clap::Subcommand; +use e3_config::AppConfig; +use e3_console::{log, Console}; + +#[derive(Subcommand, Clone, Debug)] +pub enum ConfigCommands { + /// Get a config parameter + Get { + /// The config parameter to get. If not provided, prints all config values + param: Option, + }, +} + +pub async fn execute(out: Console, command: ConfigCommands, config: &AppConfig) -> Result<()> { + let ConfigCommands::Get { param } = command; + match param.as_deref() { + Some("name") => { + log!(out, "{}", config.name()); + } + Some("peers") => { + for peer in config.peers() { + log!(out, "{}", peer); + } + } + Some("quic_port") => { + log!(out, "{}", config.quic_port()); + } + Some("ctrl_port") => { + log!(out, "{}", config.ctrl_port()); + } + Some("address") => { + if let Some(addr) = config.address() { + log!(out, "{}", addr); + } + } + Some("autonetkey") => { + log!(out, "{}", config.autonetkey()); + } + Some("autopassword") => { + log!(out, "{}", config.autopassword()); + } + Some("autowallet") => { + log!(out, "{}", config.autowallet()); + } + Some("otel") => { + if let Some(otel) = config.otel() { + log!(out, "{}", otel); + } + } + Some("config_file") => { + log!(out, "{}", config.config_file().display()); + } + Some("config_yaml") => { + log!(out, "{}", config.config_yaml().display()); + } + Some("db_file") => { + log!(out, "{}", config.db_file().display()); + } + Some("key_file") => { + log!(out, "{}", config.key_file().display()); + } + Some("log_file") => { + log!(out, "{}", config.log_file().display()); + } + Some("work_dir") => { + log!(out, "{}", config.work_dir().display()); + } + Some("chains") => { + for chain in config.chains() { + log!(out, "{}", chain.name); + } + } + Some("nodes") => { + for (name, node_def) in config.nodes() { + log!(out, "{}: {:?}", name, node_def); + } + } + Some("program") => { + log!(out, "{:?}", config.program()); + } + Some(param) => { + anyhow::bail!("Unknown config parameter: {}", param); + } + None => { + log!(out, "name: {}", config.name()); + log!(out, "peers: {:?}", config.peers()); + log!(out, "quic_port: {}", config.quic_port()); + log!(out, "ctrl_port: {}", config.ctrl_port()); + log!(out, "address: {:?}", config.address()); + log!(out, "autonetkey: {}", config.autonetkey()); + log!(out, "autopassword: {}", config.autopassword()); + log!(out, "autowallet: {}", config.autowallet()); + log!(out, "otel: {:?}", config.otel()); + log!(out, "config_file: {}", config.config_file().display()); + log!(out, "config_yaml: {}", config.config_yaml().display()); + log!(out, "db_file: {}", config.db_file().display()); + log!(out, "key_file: {}", config.key_file().display()); + log!(out, "log_file: {}", config.log_file().display()); + log!(out, "work_dir: {}", config.work_dir().display()); + log!(out, "chains: {:?}", config.chains()); + log!(out, "nodes: {:?}", config.nodes()); + log!(out, "program: {:?}", config.program()); + } + } + Ok(()) +} diff --git a/crates/cli/src/events.rs b/crates/cli/src/events.rs new file mode 100644 index 0000000000..867cc30732 --- /dev/null +++ b/crates/cli/src/events.rs @@ -0,0 +1,90 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::Result; +use clap::Subcommand; +use e3_ciphernode_builder::global_eventstore_cache::EventStoreReader; +use e3_config::AppConfig; +use e3_console::{log, Console}; +use e3_entrypoint::helpers::datastore::get_eventstore_reader; +use e3_events::{compute_seq_cursor, CorrelationId, EnclaveEvent, SeqAgg, SeqCursor}; +use e3_events::{AggregateId, EventStoreQueryBy, EventStoreQueryResponse}; +use e3_utils::actix::channel as actix_toolbox; +use std::collections::HashMap; + +#[derive(Subcommand, Clone, Debug)] +pub enum EventsCommands { + /// Query events + Query { + /// Aggregate ID - will default to 0 + #[arg(long)] + agg: Option, + + /// Sequence to read from will read from 0 if absent + #[arg(long)] + since: Option, + + /// Max limit to read at a time. If this is greater than the internal limit the internal + /// limit will be respected. + #[arg(long)] + limit: Option, + }, +} + +pub async fn execute(out: Console, command: EventsCommands, config: &AppConfig) -> Result<()> { + match command { + EventsCommands::Query { agg, since, limit } => { + query_events(out, config, agg, since, limit).await? + } + } + Ok(()) +} + +async fn query_events( + out: Console, + config: &AppConfig, + aggregate: Option, + since: Option, + limit: Option, +) -> Result<()> { + let eventstore = get_eventstore_reader(config)?; + let (events, next) = fetch_events(eventstore, aggregate, since, limit).await?; + print_events(out.clone(), events)?; + log!(out, "{}", serde_json::to_string(&next)?); + Ok(()) +} + +async fn fetch_events( + eventstore: EventStoreReader, + aggregate: Option, + since: Option, + limit: Option, +) -> Result<(Vec, SeqCursor)> { + let aggregate = aggregate.unwrap_or(0); + let since = since.unwrap_or(0); + let limit = limit.unwrap_or(10); + let (addr, rx) = actix_toolbox::oneshot::(); + + let msg = EventStoreQueryBy::::new( + CorrelationId::new(), + HashMap::from([(AggregateId::new(aggregate), since)]), + addr, + ) + .with_limit(limit); + + eventstore.seq().do_send(msg); + let events = rx.await?.into_events(); + let next = compute_seq_cursor(&events, limit as usize); + + Ok((events, next)) +} + +fn print_events(out: Console, events: Vec) -> Result<()> { + for event in events { + log!(out, "{}", serde_json::to_string(&event)?); + } + Ok(()) +} diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 56e50674e8..1b0d8c6801 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -8,12 +8,14 @@ use anyhow::Result; use clap::Parser; use cli::{Cli, RemoteCli}; use e3_console::Console; -use e3_socket_server::{connect_socket, run_on_socket}; +use e3_daemon_server::{connect_daemon, run_on_daemon}; use e3_utils::{colorize, Color}; use tracing::info; mod ciphernode; mod cli; +mod config; +mod events; pub mod helpers; mod init; mod net; @@ -69,13 +71,13 @@ pub async fn main() -> Result<()> { let cli = Cli::parse(); let config_result = cli.load_config(); - let maybe_stream = connect_socket(config_result.as_ref().ok()).await; + let maybe_server = connect_daemon(config_result.as_ref().ok()).await; let maybe_remote_command = TryInto::::try_into(cli.clone()).ok(); // If the socket exists and the command can be parsed as remote - if let Err(err) = if let (Some(stream), Some(command)) = (maybe_stream, maybe_remote_command) { + if let Err(err) = if let (Some(server), Some(command)) = (maybe_server, maybe_remote_command) { // Run the command over the socket - run_on_socket(out, stream, command).await + run_on_daemon(out, server, command).await } else { // Run the command locally cli.execute(out, config_result).await diff --git a/crates/cli/src/start.rs b/crates/cli/src/start.rs index ae7d6c13f6..df323737b5 100644 --- a/crates/cli/src/start.rs +++ b/crates/cli/src/start.rs @@ -14,10 +14,9 @@ use anyhow::Result; use e3_ciphernode_builder::CiphernodeHandle; use e3_config::AppConfig; use e3_console::Console; +use e3_daemon_server::start_daemon_server; use e3_events::{prelude::*, Shutdown}; -use e3_socket_server::start_socket_server; use e3_utils::{colorize, Color}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::signal::unix::{signal, SignalKind}; use tracing::{error, info, instrument}; @@ -56,24 +55,19 @@ pub async fn execute(mut config: AppConfig, peers: Vec) -> Result<()> { /// Launch a socket server to read RemoteCli commands pub fn launch_socket_server(ctrl_port: u16) { // Setup socket server for daemon - tokio::task::spawn_local(start_socket_server(ctrl_port, |stream| async move { - let (reader, mut writer) = stream.into_split(); - let mut lines = BufReader::new(reader).lines(); - - if let Some(line) = lines.next_line().await? { - let (out, mut rx) = Console::channel(); - info!("CMD: {}", &colorize(&line, Color::Blue)); - let remote_cli: RemoteCli = serde_json::from_str(&line)?; - let cli: Cli = remote_cli.try_into()?; - let config_result = cli.load_config(); - cli.execute(out, config_result).await?; - while let Some(msg) = rx.recv().await { - writer.write_all(format!("{msg}\n").as_bytes()).await?; - } + tokio::task::spawn_local(start_daemon_server(ctrl_port, |body| async move { + let (out, mut rx) = Console::channel(); + info!("CMD: {}", &colorize(&body, Color::Blue)); + let remote_cli: RemoteCli = serde_json::from_str(&body)?; + let cli: Cli = remote_cli.try_into()?; + let config_result = cli.load_config(); + cli.execute(out, config_result).await?; + + let mut output = String::new(); + while let Some(msg) = rx.recv().await { + output.push_str(&format!("{msg}\n")); } - - writer.shutdown().await?; - Ok(()) + Ok(output) })); } diff --git a/crates/config/src/app_config.rs b/crates/config/src/app_config.rs index d5d88328e2..149505ebe1 100644 --- a/crates/config/src/app_config.rs +++ b/crates/config/src/app_config.rs @@ -178,6 +178,16 @@ impl BBPath { BBPath::Default(p) => p.clone(), } } + + /// Check the environment variable and if found use that otherwise use the given path + pub fn check(default_path: PathBuf) -> Result { + let bb_path = if let Some(bb_path) = env::var("E3_CUSTOM_BB").ok() { + BBPath::Custom(bb_path.try_into()?) + } else { + BBPath::Default(default_path) + }; + Ok(bb_path) + } } impl AppConfig { diff --git a/crates/crypto/src/sensitive.rs b/crates/crypto/src/sensitive.rs index 78516d1069..9a7700af08 100644 --- a/crates/crypto/src/sensitive.rs +++ b/crates/crypto/src/sensitive.rs @@ -35,6 +35,13 @@ impl SensitiveBytes { .collect::>() } + /// Helper method mainly for testing + pub fn from_encrypted(encrypted: &[u8]) -> Self { + SensitiveBytes { + encrypted: ArcBytes::from_bytes(encrypted), + } + } + /// Access the decrypted data, wrapped in a ZeroizeOnDrop container // TODO: rename try_access pub fn access(&self, cipher: &Cipher) -> Result>> { diff --git a/crates/socket-server/Cargo.toml b/crates/daemon-server/Cargo.toml similarity index 86% rename from crates/socket-server/Cargo.toml rename to crates/daemon-server/Cargo.toml index c21bbc1420..021cc3b8d7 100644 --- a/crates/socket-server/Cargo.toml +++ b/crates/daemon-server/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "e3-socket-server" +name = "e3-daemon-server" version.workspace = true edition.workspace = true license.workspace = true @@ -12,5 +12,6 @@ e3-console.workspace = true e3-config.workspace = true serde.workspace = true serde_json.workspace = true +reqwest.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/daemon-server/src/lib.rs b/crates/daemon-server/src/lib.rs new file mode 100644 index 0000000000..9905d70eae --- /dev/null +++ b/crates/daemon-server/src/lib.rs @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::Result; +use e3_config::AppConfig; +use e3_console::{log, Console}; +use serde::Serialize; +use std::future::Future; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tracing::error; + +const TCP_ADDRESS: &str = "127.0.0.1"; // using localhost specifically so that it is not mounted + // externally. We might change this if we need to control + // externally and add authentication or TLS + +pub struct ServerInfo { + pub port: u16, +} + +pub async fn connect_daemon(maybe_config: Option<&AppConfig>) -> Option { + let config = maybe_config?; + let port = config.ctrl_port(); + let url = format!("http://{}:{}", TCP_ADDRESS, port); + reqwest::Client::new() + .head(&url) + .timeout(std::time::Duration::from_secs(1)) + .send() + .await + .ok()?; + Some(ServerInfo { port }) +} + +pub async fn run_on_daemon( + out: Console, + server: ServerInfo, + cli: T, +) -> anyhow::Result<()> { + let url = format!("http://{}:{}", TCP_ADDRESS, server.port); + let client = reqwest::Client::new(); + let resp = client + .post(&url) + .json(&cli) + .send() + .await? + .error_for_status()?; + + let text = resp.text().await?; + for line in text.lines() { + log!(out, "{}", line); + } + Ok(()) +} + +pub async fn start_daemon_server(tcp_port: u16, handler: F) +where + F: Fn(String) -> Fut + 'static, + Fut: Future> + 'static, +{ + let addr = format!("{}:{}", TCP_ADDRESS, tcp_port); + let listener = match tokio::net::TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + error!("Failed to bind socket: {e}"); + return; + } + }; + let handler = Arc::new(handler); + loop { + match listener.accept().await { + Ok((stream, _)) => { + let handler = Arc::clone(&handler); + tokio::task::spawn_local(async move { + if let Err(e) = handle_http(stream, &handler.as_ref()).await { + error!("Connection error: {e}"); + } + }); + } + Err(e) => { + error!("Accept error: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } +} + +async fn handle_http(stream: TcpStream, handler: &F) -> Result<()> +where + F: Fn(String) -> Fut, + Fut: Future>, +{ + // We do manual http parsing as actix-web requires running on a separate thread and is too heavy + let (reader, mut writer) = stream.into_split(); + let mut buf_reader = BufReader::new(reader); + + // Read headers until blank line + let mut content_length: usize = 0; + loop { + let mut line = String::new(); + buf_reader.read_line(&mut line).await?; + if line.trim().is_empty() { + break; + } + if let Some(val) = line.to_ascii_lowercase().strip_prefix("content-length:") { + content_length = val.trim().parse().unwrap_or(0); + } + } + + // Read body + let mut body = vec![0u8; content_length]; + tokio::io::AsyncReadExt::read_exact(&mut buf_reader, &mut body).await?; + let body = String::from_utf8(body)?; + + // Run the existing logic + let (status, response_body) = match handler(body).await { + Ok(output) => ("200 OK", output), + Err(e) => ("500 Internal Server Error", e.to_string()), + }; + + let response = format!( + "HTTP/1.1 {status}\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + response_body.len(), + response_body + ); + writer.write_all(response.as_bytes()).await?; + writer.shutdown().await?; + Ok(()) +} diff --git a/crates/entrypoint/Cargo.toml b/crates/entrypoint/Cargo.toml index a7e090a404..f819ca753c 100644 --- a/crates/entrypoint/Cargo.toml +++ b/crates/entrypoint/Cargo.toml @@ -23,7 +23,7 @@ dirs = { workspace = true } e3-events = { workspace = true } e3-evm = { workspace = true } e3-fhe = { workspace = true } -e3-socket-server = { workspace = true } +e3-daemon-server = { workspace = true } hex = { workspace = true } e3-keyshare = { workspace = true } e3-logger = { workspace = true } diff --git a/crates/events/src/cursor.rs b/crates/events/src/cursor.rs new file mode 100644 index 0000000000..153ffe02b1 --- /dev/null +++ b/crates/events/src/cursor.rs @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use crate::traits::EventContextSeq; + +#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)] +pub enum SeqCursor { + Done, + Next(u64), +} + +pub fn compute_seq_cursor(events: &[T], limit: usize) -> SeqCursor { + if events.len() == limit { + let last_seq = events.last().map(|e| e.seq()).unwrap_or(0); + SeqCursor::Next(last_seq) + } else { + SeqCursor::Done + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::traits::EventContextSeq; + + struct MockEvent(u64); + impl EventContextSeq for MockEvent { + fn seq(&self) -> u64 { + self.0 + } + } + + #[test] + fn test_seq_cursor_done_when_under_limit() { + let events = vec![MockEvent(1), MockEvent(2)]; + let cursor = compute_seq_cursor(&events, 10); + assert!(matches!(cursor, SeqCursor::Done)); + } + + #[test] + fn test_seq_cursor_next_when_at_limit() { + let events = vec![MockEvent(1), MockEvent(2)]; + let cursor = compute_seq_cursor(&events, 2); + match cursor { + SeqCursor::Next(seq) => assert_eq!(seq, 2), + SeqCursor::Done => panic!("Expected Next, got Done"), + } + } + + #[test] + fn test_seq_cursor_uses_last_event_seq() { + let events = vec![MockEvent(100), MockEvent(200), MockEvent(300)]; + let cursor = compute_seq_cursor(&events, 3); + match cursor { + SeqCursor::Next(seq) => assert_eq!(seq, 300), + SeqCursor::Done => panic!("Expected Next, got Done"), + } + } + + #[test] + fn test_seq_cursor_empty_returns_done() { + let events: Vec = vec![]; + let cursor = compute_seq_cursor(&events, 10); + assert!(matches!(cursor, SeqCursor::Done)); + } + + #[test] + fn test_seq_cursor_single_event_at_limit() { + let events = vec![MockEvent(42)]; + let cursor = compute_seq_cursor(&events, 1); + match cursor { + SeqCursor::Next(seq) => assert_eq!(seq, 42), + SeqCursor::Done => panic!("Expected Next, got Done"), + } + } +} diff --git a/crates/events/src/enclave_event/accusation_vote.rs b/crates/events/src/enclave_event/accusation_vote.rs index e2ff7fba6b..c07a49731a 100644 --- a/crates/events/src/enclave_event/accusation_vote.rs +++ b/crates/events/src/enclave_event/accusation_vote.rs @@ -7,6 +7,7 @@ use crate::E3id; use actix::Message; use alloy::primitives::Address; +use e3_utils::ArcBytes; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; @@ -27,7 +28,7 @@ pub struct AccusationVote { /// keccak256 hash of the data as this node received it — for equivocation detection. pub data_hash: [u8; 32], /// ECDSA signature of the voter over the vote fields. - pub signature: Vec, + pub signature: ArcBytes, } impl Display for AccusationVote { diff --git a/crates/events/src/enclave_event/committee_published.rs b/crates/events/src/enclave_event/committee_published.rs index b39afb7f2e..22785e1c15 100644 --- a/crates/events/src/enclave_event/committee_published.rs +++ b/crates/events/src/enclave_event/committee_published.rs @@ -6,6 +6,7 @@ use crate::E3id; use actix::Message; +use e3_utils::ArcBytes; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; @@ -14,8 +15,8 @@ use std::fmt::{self, Display}; pub struct CommitteePublished { pub e3_id: E3id, pub nodes: Vec, - pub public_key: Vec, - pub proof: Vec, + pub public_key: ArcBytes, + pub proof: ArcBytes, } impl Display for CommitteePublished { diff --git a/crates/events/src/enclave_event/plaintext_output_published.rs b/crates/events/src/enclave_event/plaintext_output_published.rs index 94a6b13541..b415fd6a2c 100644 --- a/crates/events/src/enclave_event/plaintext_output_published.rs +++ b/crates/events/src/enclave_event/plaintext_output_published.rs @@ -6,6 +6,7 @@ use crate::E3id; use actix::Message; +use e3_utils::ArcBytes; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; @@ -13,8 +14,8 @@ use std::fmt::{self, Display}; #[rtype(result = "()")] pub struct PlaintextOutputPublished { pub e3_id: E3id, - pub plaintext_output: Vec, - pub proof: Vec, + pub plaintext_output: ArcBytes, + pub proof: ArcBytes, } impl Display for PlaintextOutputPublished { diff --git a/crates/events/src/enclave_event/proof_failure_accusation.rs b/crates/events/src/enclave_event/proof_failure_accusation.rs index 6871e4e15b..185b5830e9 100644 --- a/crates/events/src/enclave_event/proof_failure_accusation.rs +++ b/crates/events/src/enclave_event/proof_failure_accusation.rs @@ -7,6 +7,7 @@ use crate::{E3id, ProofType, SignedProofPayload}; use actix::Message; use alloy::primitives::Address; +use e3_utils::ArcBytes; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; @@ -36,7 +37,7 @@ pub struct ProofFailureAccusation { /// `None` for proofs that all nodes already received. pub signed_payload: Option, /// ECDSA signature of the accuser over the accusation fields. - pub signature: Vec, + pub signature: ArcBytes, } impl Display for ProofFailureAccusation { diff --git a/crates/events/src/event_id.rs b/crates/events/src/event_id.rs index cec880d5fc..353e153106 100644 --- a/crates/events/src/event_id.rs +++ b/crates/events/src/event_id.rs @@ -5,14 +5,14 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use derivative::Derivative; -use serde::{Deserialize, Serialize}; +use e3_utils::{AsBytesSerde, BytesSerde}; use sha2::{Digest, Sha256}; use std::{ fmt, hash::{DefaultHasher, Hash, Hasher}, }; -#[derive(Derivative, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Derivative, BytesSerde, Clone, Copy, PartialEq, Eq, Hash)] #[derivative(Debug)] pub struct EventId(#[derivative(Debug(format_with = "e3_utils::formatters::hexf"))] pub [u8; 32]); @@ -34,6 +34,17 @@ impl fmt::Display for EventId { } } +impl AsBytesSerde for EventId { + fn as_bytes(&self) -> &[u8] { + &self.0 + } + fn try_from_bytes(bytes: Vec) -> Result { + Ok(EventId( + bytes.try_into().map_err(|_| "EventId requires 32 bytes")?, + )) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/events/src/events.rs b/crates/events/src/events.rs index 24d6b33142..bad4d1c3eb 100644 --- a/crates/events/src/events.rs +++ b/crates/events/src/events.rs @@ -75,12 +75,14 @@ pub trait QueryKind { } /// Query by aggregated sequence +#[derive(Debug)] pub struct SeqAgg; impl QueryKind for SeqAgg { type Shape = HashMap; } /// Query by aggregated timestamp +#[derive(Debug)] pub struct TsAgg; impl QueryKind for TsAgg { type Shape = HashMap; diff --git a/crates/events/src/eventstore.rs b/crates/events/src/eventstore.rs index b4c999d3a9..1ad3bc3cde 100644 --- a/crates/events/src/eventstore.rs +++ b/crates/events/src/eventstore.rs @@ -160,7 +160,7 @@ impl Handler> for EventSto #[cfg(test)] mod tests { - use crate::{EventConstructorWithTimestamp, EventSource, TestEvent}; + use crate::{EventConstructorWithTimestamp, EventContextSeq, EventSource, TestEvent}; use super::*; use anyhow::Result; @@ -502,4 +502,133 @@ mod tests { assert_eq!(events.len(), 2); } + + // =========================================================================== + // Pagination / Cursor boundary tests + // =========================================================================== + + #[test] + fn ts_query_is_inclusive_at_boundary() { + let store = populated_store(&[ + make_local_event(100), + make_local_event(200), + make_local_event(300), + ]); + + let page1 = store.query_by_ts(100, None, Some(2)).unwrap(); + assert_eq!(page1.len(), 2); + assert_eq!(page1[0].get_ctx().ts(), 100); + assert_eq!(page1[1].get_ctx().ts(), 200); + + let page2 = store.query_by_ts(200, None, Some(2)).unwrap(); + assert_eq!(page2.len(), 2); + assert_eq!(page2[0].get_ctx().ts(), 200); + assert_eq!(page2[1].get_ctx().ts(), 300); + } + + #[test] + fn ts_query_cursor_off_by_one_causes_duplicates() { + let store = populated_store(&[ + make_local_event(100), + make_local_event(200), + make_local_event(300), + ]); + + let page1 = store.query_by_ts(100, None, Some(2)).unwrap(); + assert_eq!(page1.len(), 2); + + let cursor_ts = page1.last().unwrap().get_ctx().ts(); + assert_eq!(cursor_ts, 200); + + let page2 = store.query_by_ts(cursor_ts, None, None).unwrap(); + assert_eq!(page2.len(), 2); + + let total: Vec<_> = page1.iter().chain(page2.iter()).collect(); + let ts_values: Vec<_> = total.iter().map(|e| e.get_ctx().ts()).collect(); + let has_duplicates = ts_values.len() + != ts_values + .iter() + .collect::>() + .len(); + + assert!( + has_duplicates, + "BUG: ts=200 appears in both pages (inclusive query with cursor=last_ts)" + ); + } + + #[test] + fn ts_query_pagination_without_duplicates() { + let store = populated_store(&[ + make_local_event(100), + make_local_event(200), + make_local_event(300), + make_local_event(400), + ]); + + let page1 = store.query_by_ts(100, None, Some(2)).unwrap(); + assert_eq!(page1.len(), 2); + + let cursor_ts = page1.last().unwrap().get_ctx().ts() + 1; + let page2 = store.query_by_ts(cursor_ts, None, Some(2)).unwrap(); + + let all_ts: Vec<_> = page1 + .iter() + .chain(page2.iter()) + .map(|e| e.get_ctx().ts()) + .collect(); + + assert_eq!(all_ts.len(), 4); + assert_eq!(all_ts, vec![100, 200, 300, 400]); + } + + #[test] + fn seq_query_is_inclusive_at_boundary() { + let store = populated_store(&[ + make_local_event(100), + make_local_event(200), + make_local_event(300), + ]); + + let page1 = store.query_by_seq(0, None, Some(2)); + assert_eq!(page1.len(), 2); + assert_eq!(page1[0].seq(), 0); + assert_eq!(page1[1].seq(), 1); + + let page2 = store.query_by_seq(1, None, Some(2)); + assert_eq!(page2.len(), 2); + assert_eq!(page2[0].seq(), 1); + assert_eq!(page2[1].seq(), 2); + } + + #[test] + fn seq_query_cursor_off_by_one_causes_duplicates() { + let store = populated_store(&[ + make_local_event(100), + make_local_event(200), + make_local_event(300), + ]); + + let page1 = store.query_by_seq(0, None, Some(2)); + assert_eq!(page1.len(), 2); + + let cursor_seq = page1.last().unwrap().seq(); + assert_eq!(cursor_seq, 1); + + let page2 = store.query_by_seq(cursor_seq, None, None); + assert_eq!(page2.len(), 2); + + let total: Vec<_> = page1.iter().chain(page2.iter()).collect(); + let seq_values: Vec<_> = total.iter().map(|e| e.seq()).collect(); + let has_duplicates = seq_values.len() + != seq_values + .iter() + .collect::>() + .len(); + + assert!( + has_duplicates, + "BUG: seq=1 appears in both pages (inclusive query with cursor=last_seq)" + ); + } } diff --git a/crates/events/src/lib.rs b/crates/events/src/lib.rs index bd0fe091dc..a435677474 100644 --- a/crates/events/src/lib.rs +++ b/crates/events/src/lib.rs @@ -7,6 +7,7 @@ mod bus_handle; mod committee; mod correlation_id; +mod cursor; mod data_events; mod e3id; mod enclave_event; @@ -32,6 +33,7 @@ mod traits; pub use bus_handle::*; pub use committee::*; pub use correlation_id::*; +pub use cursor::*; pub use data_events::*; pub use e3id::*; pub use enclave_event::*; diff --git a/crates/events/src/seed.rs b/crates/events/src/seed.rs index a43e88d087..e49934de73 100644 --- a/crates/events/src/seed.rs +++ b/crates/events/src/seed.rs @@ -7,10 +7,10 @@ use alloy::hex; use alloy_primitives::Uint; use derivative::Derivative; -use serde::{Deserialize, Serialize}; +use e3_utils::{AsBytesSerde, BytesSerde}; use std::fmt::{self, Display}; -#[derive(Derivative, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Derivative, BytesSerde, Clone, Copy, PartialEq, Eq, Hash)] #[derivative(Debug)] pub struct Seed(#[derivative(Debug(format_with = "e3_utils::formatters::hexf"))] pub [u8; 32]); impl From for u64 { @@ -31,6 +31,18 @@ impl From> for Seed { } } +impl AsBytesSerde for Seed { + fn as_bytes(&self) -> &[u8] { + &self.0 + } + + fn try_from_bytes(bytes: Vec) -> Result { + Ok(Seed( + bytes.try_into().map_err(|_| "Seed requires 32 bytes")?, + )) + } +} + impl Display for Seed { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Seed(0x{})", hex::encode(self.0)) diff --git a/crates/evm/src/slashing_manager_sol_writer.rs b/crates/evm/src/slashing_manager_sol_writer.rs index 7a67b5277a..adf1ea3e83 100644 --- a/crates/evm/src/slashing_manager_sol_writer.rs +++ b/crates/evm/src/slashing_manager_sol_writer.rs @@ -206,7 +206,7 @@ fn encode_attestation_evidence(data: &AccusationQuorumReached) -> Vec { let data_hashes: Vec<[u8; 32]> = votes.iter().map(|v| v.data_hash).collect(); let signatures: Vec = votes .iter() - .map(|v| Bytes::from(v.signature.clone())) + .map(|v| Bytes::from(v.signature.extract_bytes())) .collect(); (proof_type, voters, agrees, data_hashes, signatures).abi_encode() diff --git a/crates/net/src/net_event_batch.rs b/crates/net/src/net_event_batch.rs index b1b934c02c..cd0ab5d280 100644 --- a/crates/net/src/net_event_batch.rs +++ b/crates/net/src/net_event_batch.rs @@ -194,4 +194,173 @@ mod tests { vec![b"event1".to_vec(), b"event2".to_vec(), b"event3".to_vec(),] ); } + + #[tokio::test] + async fn test_no_duplicate_events_across_batches() { + let (net_cmds_tx, net_cmds_rx) = mpsc::channel::(16); + let (net_events_tx, net_events_rx) = broadcast::channel::(16); + let net_events = Arc::new(net_events_rx); + + let requester = DirectRequester::builder(net_cmds_tx, net_events).build(); + + let batch1 = EventBatch { + events: vec![b"event1".to_vec(), b"event2".to_vec()], + next: BatchCursor::Next(2), + aggregate_id: AggregateId::new(1), + }; + let batch2 = EventBatch { + events: vec![b"event3".to_vec()], + next: BatchCursor::Done, + aggregate_id: AggregateId::new(1), + }; + + let handle = DirectRequesterTester::new(net_cmds_rx, net_events_tx) + .expect_request(FetchEventsSince::new(AggregateId::new(1), 0, 100)) + .respond_with(batch1) + .expect_request(FetchEventsSince::new(AggregateId::new(1), 2, 100)) + .respond_with(batch2) + .spawn(); + + let events: Vec> = + fetch_all_batched_events(requester, PeerTarget::Random, AggregateId::new(1), 0, 100) + .await + .unwrap(); + + handle.await.unwrap(); + + let unique: Vec<_> = events + .iter() + .collect::>() + .into_iter() + .collect(); + assert_eq!( + events.len(), + unique.len(), + "Found duplicate events: total={}, unique={}", + events.len(), + unique.len() + ); + assert_eq!( + events, + vec![b"event1".to_vec(), b"event2".to_vec(), b"event3".to_vec()] + ); + } + + #[tokio::test] + async fn test_cursor_correctly_excludes_last_event_on_batch_boundary() { + let (net_cmds_tx, net_cmds_rx) = mpsc::channel::(16); + let (net_events_tx, net_events_rx) = broadcast::channel::(16); + let net_events = Arc::new(net_events_rx); + + let requester = DirectRequester::builder(net_cmds_tx, net_events).build(); + + let batch1 = EventBatch { + events: vec![b"event1".to_vec()], + next: BatchCursor::Next(100), + aggregate_id: AggregateId::new(1), + }; + let batch2 = EventBatch { + events: vec![b"event2".to_vec()], + next: BatchCursor::Done, + aggregate_id: AggregateId::new(1), + }; + + let handle = DirectRequesterTester::new(net_cmds_rx, net_events_tx) + .expect_request(FetchEventsSince::new(AggregateId::new(1), 0, 1)) + .respond_with(batch1) + .expect_request(FetchEventsSince::new(AggregateId::new(1), 100, 1)) + .respond_with(batch2) + .spawn(); + + let events: Vec> = + fetch_all_batched_events(requester, PeerTarget::Random, AggregateId::new(1), 0, 1) + .await + .unwrap(); + + handle.await.unwrap(); + + assert_eq!(events.len(), 2); + assert_eq!(events[0], b"event1"); + assert_eq!(events[1], b"event2"); + } + + #[tokio::test] + async fn test_single_batch_with_limit_exactly_matched_returns_done() { + let (net_cmds_tx, net_cmds_rx) = mpsc::channel::(16); + let (net_events_tx, net_events_rx) = broadcast::channel::(16); + let net_events = Arc::new(net_events_rx); + + let requester = DirectRequester::builder(net_cmds_tx, net_events).build(); + + let batch = EventBatch { + events: vec![b"event1".to_vec()], + next: BatchCursor::Done, + aggregate_id: AggregateId::new(1), + }; + + let handle = DirectRequesterTester::new(net_cmds_rx, net_events_tx) + .expect_request(FetchEventsSince::new(AggregateId::new(1), 0, 1)) + .respond_with(batch) + .spawn(); + + let events: Vec> = + fetch_all_batched_events(requester, PeerTarget::Random, AggregateId::new(1), 0, 1) + .await + .unwrap(); + + handle.await.unwrap(); + + assert_eq!(events, vec![b"event1".to_vec()]); + } + + #[tokio::test] + async fn test_three_batches_with_cursor_continuity() { + let (net_cmds_tx, net_cmds_rx) = mpsc::channel::(16); + let (net_events_tx, net_events_rx) = broadcast::channel::(16); + let net_events = Arc::new(net_events_rx); + + let requester = DirectRequester::builder(net_cmds_tx, net_events).build(); + + let batch1 = EventBatch { + events: vec![b"a".to_vec(), b"b".to_vec()], + next: BatchCursor::Next(200), + aggregate_id: AggregateId::new(1), + }; + let batch2 = EventBatch { + events: vec![b"c".to_vec(), b"d".to_vec()], + next: BatchCursor::Next(400), + aggregate_id: AggregateId::new(1), + }; + let batch3 = EventBatch { + events: vec![b"e".to_vec()], + next: BatchCursor::Done, + aggregate_id: AggregateId::new(1), + }; + + let handle = DirectRequesterTester::new(net_cmds_rx, net_events_tx) + .expect_request(FetchEventsSince::new(AggregateId::new(1), 0, 2)) + .respond_with(batch1) + .expect_request(FetchEventsSince::new(AggregateId::new(1), 200, 2)) + .respond_with(batch2) + .expect_request(FetchEventsSince::new(AggregateId::new(1), 400, 2)) + .respond_with(batch3) + .spawn(); + + let events: Vec> = + fetch_all_batched_events(requester, PeerTarget::Random, AggregateId::new(1), 0, 2) + .await + .unwrap(); + + handle.await.unwrap(); + + let expected = vec![ + b"a".to_vec(), + b"b".to_vec(), + b"c".to_vec(), + b"d".to_vec(), + b"e".to_vec(), + ]; + assert_eq!(events.len(), expected.len()); + assert_eq!(events, expected); + } } diff --git a/crates/socket-server/src/lib.rs b/crates/socket-server/src/lib.rs deleted file mode 100644 index c3ff33ae64..0000000000 --- a/crates/socket-server/src/lib.rs +++ /dev/null @@ -1,78 +0,0 @@ -// SPDX-License-Identifier: LGPL-2.0-only -// -// This file is provided WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY -// or FITNESS FOR A PARTICULAR PURPOSE. - -use anyhow::Result; -use e3_config::AppConfig; -use e3_console::{log, Console}; -use serde::Serialize; -use std::future::Future; -use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::net::TcpStream; -use tracing::error; - -const TCP_ADDRESS: &str = "127.0.0.1"; // using localhost specifically so that it is not mounted - // externally. We might change this if we need to control - // externally and add authentication or TLS - -pub async fn connect_socket(maybe_config: Option<&AppConfig>) -> Option { - let config = maybe_config?; - let addr = format!("{}:{}", TCP_ADDRESS, config.ctrl_port()); - TcpStream::connect(addr).await.ok() -} - -pub async fn run_on_socket( - out: Console, - stream: TcpStream, - cli: T, -) -> anyhow::Result<()> { - let (reader, mut writer) = stream.into_split(); - let payload = serde_json::to_string(&cli)?; - writer.write_all(payload.as_bytes()).await?; - writer.write_all(b"\n").await?; - writer.shutdown().await?; - - let mut lines = BufReader::new(reader).lines(); - while let Some(line) = lines.next_line().await? { - log!(out, "{}", line); - } - - Ok(()) -} - -pub async fn start_socket_server(tcp_port: u16, handler: F) -where - F: Fn(TcpStream) -> Fut + Send + Sync + 'static, - Fut: Future> + 'static, -{ - let addr = format!("{}:{}", TCP_ADDRESS, tcp_port); - let listener = match tokio::net::TcpListener::bind(addr).await { - Ok(l) => l, - Err(e) => { - error!("Failed to bind socket: {e}"); - return; - } - }; - - let handler = Arc::new(handler); - loop { - match listener.accept().await { - Ok((stream, _)) => { - let handler = Arc::clone(&handler); - - tokio::task::spawn_local(async move { - if let Err(e) = handler(stream).await { - error!("Connection error: {e}"); - } - }); - } - Err(e) => { - error!("Accept error: {e}"); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } - } -} diff --git a/crates/tests/tests/integration.rs b/crates/tests/tests/integration.rs index 0bb5fd5c78..21a64e1abc 100644 --- a/crates/tests/tests/integration.rs +++ b/crates/tests/tests/integration.rs @@ -58,7 +58,7 @@ use tokio::{ /// Create a ZkBackend for integration tests. /// If a local bb binary is found, uses it with fixture files (fast path). /// Otherwise, calls `ensure_installed()` to download bb + circuits (CI path). -async fn setup_test_zk_backend() -> (ZkBackend, tempfile::TempDir) { +async fn setup_test_zk_backend() -> Result<(ZkBackend, tempfile::TempDir)> { let temp = get_tempdir().unwrap(); let temp_path = temp.path(); let noir_dir = temp_path.join("noir"); @@ -350,7 +350,7 @@ async fn setup_test_zk_backend() -> (ZkBackend, tempfile::TempDir) { .await .expect("write noir/version.json for integration ZK fixtures"); - (backend, temp) + Ok((backend, temp)) } else { println!("bb binary not found locally, downloading via ensure_installed()..."); let backend = ZkBackend::new(BBPath::Default(bb_binary), circuits_dir, work_dir); @@ -358,7 +358,7 @@ async fn setup_test_zk_backend() -> (ZkBackend, tempfile::TempDir) { .ensure_installed() .await .expect("Failed to download and install ZK backend"); - (backend, temp) + Ok((backend, temp)) } } @@ -786,7 +786,7 @@ async fn test_trbfv_actor() -> Result<()> { let multithread_report = MultithreadReport::new(max_threadroom, concurrent_jobs).start(); // Setup ZK backend for proof generation/verification - let (zk_backend, _zk_temp) = setup_test_zk_backend().await; + let (zk_backend, _zk_temp) = setup_test_zk_backend().await?; let nodes = CiphernodeSystemBuilder::new() // All nodes run the same binary under the aggregator-committee model. @@ -1484,7 +1484,7 @@ async fn test_stopped_keyshares_retain_state() -> Result<()> { Ok(result) } - let (zk_backend, _zk_temp) = setup_test_zk_backend().await; + let (zk_backend, _zk_temp) = setup_test_zk_backend().await?; let e3_id = E3id::new("1234", 1); let (rng, cn1_address, cn1_data, cn2_address, cn2_data, cipher, history, params, crpoly) = { @@ -1739,7 +1739,7 @@ async fn test_duplicate_e3_id_with_different_chain_id() -> Result<()> { // Setup let (bus, rng, seed, params, crpoly, _, _) = get_common_setup(None)?; let cipher = Arc::new(Cipher::from_password("Don't tell anyone my secret").await?); - let (zk_backend, _zk_temp) = setup_test_zk_backend().await; + let (zk_backend, _zk_temp) = setup_test_zk_backend().await?; // Setup actual ciphernodes and dispatch add events let ciphernodes = create_local_ciphernodes(&bus, &rng, 3, &cipher, zk_backend.clone()).await?; diff --git a/crates/utils-derive/Cargo.toml b/crates/utils-derive/Cargo.toml new file mode 100644 index 0000000000..835ef2e226 --- /dev/null +++ b/crates/utils-derive/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "e3-utils-derive" +version.workspace = true +edition.workspace = true +license.workspace = true +description.workspace = true +repository.workspace = true + +[lib] +proc-macro = true + +[dependencies] +hex.workspace = true +syn.workspace = true +proc-macro2.workspace = true +quote.workspace = true diff --git a/crates/utils-derive/src/lib.rs b/crates/utils-derive/src/lib.rs new file mode 100644 index 0000000000..6c3b6e9423 --- /dev/null +++ b/crates/utils-derive/src/lib.rs @@ -0,0 +1,96 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use proc_macro::TokenStream; +use quote::quote; +use syn::{parse_macro_input, Data, DeriveInput, Error, Fields}; + +/// Derives `Serialize` and `Deserialize` for types that implement `BytesSerde`. +/// +/// - Human-readable formats (JSON, TOML): hex string with `0x` prefix +/// - Binary formats (bincode, postcard): raw bytes +/// +/// The type must also implement `e3_utils::serde_bytes::AsBytesSerde`. +/// +/// # Example +/// +/// ```ignore +/// use e3_utils::BytesSerde; +/// use e3_utils::AsBytesSerde; +/// +/// #[derive(BytesSerde)] +/// pub struct EventId(pub [u8; 32]); +/// +/// impl AsBytesSerde for EventId { +/// fn as_bytes(&self) -> &[u8] { &self.0 } +/// fn try_from_bytes(bytes: Vec) -> Result { +/// Ok(EventId(bytes.try_into().map_err(|_| "requires 32 bytes".to_string())?)) +/// } +/// } +/// ``` +#[proc_macro_derive(BytesSerde)] +pub fn derive_bytes_serde(input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input as DeriveInput); + let name = &input.ident; + let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl(); + + // Sanity check: must be a struct + match &input.data { + Data::Struct(data) => match &data.fields { + Fields::Unnamed(fields) if fields.unnamed.len() == 1 => {} + _ => { + return Error::new_spanned( + &input, + "BytesSerde can only be derived for newtype structs (e.g., `struct Foo(pub [u8; 32])`)", + ) + .to_compile_error() + .into(); + } + }, + _ => { + return Error::new_spanned(&input, "BytesSerde can only be derived for structs") + .to_compile_error() + .into(); + } + } + + let expanded = quote! { + impl #impl_generics ::serde::Serialize for #name #ty_generics #where_clause { + fn serialize(&self, serializer: S) -> ::core::result::Result + where + S: ::serde::Serializer, + { + use ::e3_utils::serde_bytes::AsBytesSerde; + if serializer.is_human_readable() { + serializer.serialize_str(&::std::format!("0x{}", ::hex::encode(self.as_bytes()))) + } else { + serializer.serialize_bytes(self.as_bytes()) + } + } + } + + impl<'de> ::serde::Deserialize<'de> for #name #ty_generics #where_clause { + fn deserialize(deserializer: D) -> ::core::result::Result + where + D: ::serde::Deserializer<'de>, + { + use ::e3_utils::serde_bytes::AsBytesSerde; + let bytes = if deserializer.is_human_readable() { + let s = <::std::string::String as ::serde::Deserialize>::deserialize( + deserializer, + )?; + let stripped = s.strip_prefix("0x").unwrap_or(&s); + ::hex::decode(stripped).map_err(::serde::de::Error::custom)? + } else { + <::std::vec::Vec as ::serde::Deserialize>::deserialize(deserializer)? + }; + Self::try_from_bytes(bytes).map_err(::serde::de::Error::custom) + } + } + }; + + expanded.into() +} diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index b031a29e86..cebe484273 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -12,7 +12,9 @@ alloy.workspace = true anyhow.workspace = true derivative.workspace = true rand.workspace = true +hex.workspace = true regex.workspace = true +e3-utils-derive.workspace = true rand_chacha.workspace = true serde.workspace = true tokio.workspace = true diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 857daf3a6d..d76f7b6b5e 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -4,6 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +extern crate self as e3_utils; // need this for e3_utils_derive to reference this crate pub mod actix; pub mod alloy; pub mod constants; @@ -12,13 +13,16 @@ pub mod formatters; pub mod helpers; pub mod path; pub mod retry; +pub mod serde_bytes; pub mod utility_types; pub use actix::NotifySync; pub use alloy::*; pub use constants::*; +pub use e3_utils_derive::BytesSerde; pub use error::*; pub use formatters::*; pub use helpers::*; pub use path::*; pub use retry::*; +pub use serde_bytes::AsBytesSerde; pub use utility_types::*; diff --git a/crates/utils/src/serde_bytes.rs b/crates/utils/src/serde_bytes.rs new file mode 100644 index 0000000000..6119b1d028 --- /dev/null +++ b/crates/utils/src/serde_bytes.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +/// A trait for types that can be serialized to and deserialized from raw bytes. +/// +/// This trait is used by the [`BytesSerde`] derive macro to implement `serde::Serialize` +/// and `serde::Deserialize`. Human-readable formats (e.g. JSON) encode the bytes as +/// a `0x`-prefixed hex string, while binary formats (e.g. bincode) use raw bytes directly. +/// +/// # Implementors +/// +/// Implementations should ensure that `try_from_bytes(self.as_bytes().to_vec())` +/// round-trips successfully. +/// +/// # Example +/// +/// ```rust +/// use e3_utils::AsBytesSerde; +/// +/// pub struct EventId(pub [u8; 32]); +/// +/// impl AsBytesSerde for EventId { +/// fn as_bytes(&self) -> &[u8] { +/// &self.0 +/// } +/// +/// fn try_from_bytes(bytes: Vec) -> Result { +/// let arr: [u8; 32] = bytes +/// .try_into() +/// .map_err(|_| "EventId requires exactly 32 bytes".to_string())?; +/// Ok(EventId(arr)) +/// } +/// } +/// ``` +pub trait AsBytesSerde: Sized { + /// Returns the byte representation of this type. + fn as_bytes(&self) -> &[u8]; + + /// Attempts to construct an instance from a byte vector. + /// + /// # Errors + /// + /// Returns a descriptive error string if the bytes are invalid for this type + /// (e.g. wrong length or malformed content). + fn try_from_bytes(bytes: Vec) -> Result; +} diff --git a/crates/utils/src/utility_types.rs b/crates/utils/src/utility_types.rs index 59b2fedd8e..a7ff97a4bf 100644 --- a/crates/utils/src/utility_types.rs +++ b/crates/utils/src/utility_types.rs @@ -11,14 +11,14 @@ use std::{ sync::{Arc, Mutex}, }; +use e3_utils_derive::BytesSerde; use rand_chacha::ChaCha20Rng; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use crate::formatters::hexf; +use crate::{formatters::hexf, AsBytesSerde}; pub type SharedRng = Arc>; -#[derive(Clone, Default, PartialEq, Eq, Hash)] +#[derive(BytesSerde, Clone, Default, PartialEq, Eq, Hash)] pub struct ArcBytes(Arc>); impl ArcBytes { @@ -49,22 +49,12 @@ impl fmt::Debug for ArcBytes { } } -impl Serialize for ArcBytes { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - self.0.serialize(serializer) +impl AsBytesSerde for ArcBytes { + fn as_bytes(&self) -> &[u8] { + &self.0.as_ref() } -} - -impl<'de> Deserialize<'de> for ArcBytes { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let vec: Vec = Vec::deserialize(deserializer)?; - Ok(ArcBytes(Arc::new(vec))) + fn try_from_bytes(bytes: Vec) -> Result { + Ok(ArcBytes(Arc::new(bytes))) } } diff --git a/crates/zk-prover/src/actors/accusation_manager.rs b/crates/zk-prover/src/actors/accusation_manager.rs index cb20b9abfd..162a21e703 100644 --- a/crates/zk-prover/src/actors/accusation_manager.rs +++ b/crates/zk-prover/src/actors/accusation_manager.rs @@ -42,7 +42,7 @@ use e3_events::{ ProofType, ProofVerificationFailed, ProofVerificationPassed, Sequenced, SignedProofPayload, SlashExecuted, TypedEvent, VerifyShareProofsRequest, ZkRequest, ZkResponse, }; -use e3_utils::NotifySync; +use e3_utils::{ArcBytes, NotifySync}; use tracing::{error, info, warn}; /// How long to wait for votes before declaring the accusation inconclusive. @@ -254,7 +254,9 @@ impl AccusationManager { fn verify_accusation_signature(&self, accusation: &ProofFailureAccusation) -> bool { let digest = Self::accusation_digest(accusation); - let sig = match alloy::primitives::Signature::try_from(accusation.signature.as_slice()) { + let sig = match alloy::primitives::Signature::try_from( + accusation.signature.extract_bytes().as_ref(), + ) { Ok(s) => s, Err(_) => return false, }; @@ -305,10 +307,11 @@ impl AccusationManager { fn verify_vote_signature(&self, vote: &AccusationVote) -> bool { let digest = Self::vote_digest(vote); - let sig = match alloy::primitives::Signature::try_from(vote.signature.as_slice()) { - Ok(s) => s, - Err(_) => return false, - }; + let sig = + match alloy::primitives::Signature::try_from(vote.signature.extract_bytes().as_ref()) { + Ok(s) => s, + Err(_) => return false, + }; match sig.recover_address_from_msg(&digest) { Ok(addr) => addr == vote.voter, Err(_) => false, @@ -440,9 +443,9 @@ impl AccusationManager { proof_type, data_hash, signed_payload: forwarded_payload, - signature: Vec::new(), + signature: ArcBytes::default(), }; - accusation.signature = self.sign_accusation_digest(&accusation); + accusation.signature = ArcBytes::from_bytes(&self.sign_accusation_digest(&accusation)); let accusation_id = Self::accusation_id(&accusation); @@ -464,9 +467,9 @@ impl AccusationManager { voter: self.my_address, agrees: true, data_hash, - signature: Vec::new(), + signature: ArcBytes::default(), }; - own_vote.signature = self.sign_vote_digest(&own_vote); + own_vote.signature = ArcBytes::from_bytes(&self.sign_vote_digest(&own_vote)); if let Err(err) = self.bus.publish(own_vote.clone(), ec.clone()) { error!("Failed to broadcast own AccusationVote: {err}"); @@ -670,9 +673,9 @@ impl AccusationManager { voter: self.my_address, agrees, data_hash: our_data_hash, - signature: Vec::new(), + signature: ArcBytes::default(), }; - vote.signature = self.sign_vote_digest(&vote); + vote.signature = ArcBytes::from_bytes(&self.sign_vote_digest(&vote)); info!( "Voting {} on accusation against {} for {:?}", @@ -1108,9 +1111,9 @@ impl AccusationManager { voter: self.my_address, agrees, data_hash: reverif.data_hash, - signature: Vec::new(), + signature: ArcBytes::default(), }; - vote.signature = self.sign_vote_digest(&vote); + vote.signature = ArcBytes::from_bytes(&self.sign_vote_digest(&vote)); info!( "C3a/C3b re-verification complete — voting {} on accusation against {:?}", diff --git a/crates/zk-prover/src/backend/tests.rs b/crates/zk-prover/src/backend/tests.rs index b9620795cf..b9a3138746 100644 --- a/crates/zk-prover/src/backend/tests.rs +++ b/crates/zk-prover/src/backend/tests.rs @@ -4,6 +4,8 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use std::env; + use super::*; use crate::{config::VersionInfo, error::ZkError, test_utils::get_tempdir}; use sha2::{Digest, Sha256}; @@ -11,7 +13,7 @@ use tokio::fs; fn test_backend(temp_path: &std::path::Path, config: ZkConfig) -> ZkBackend { let noir_dir = temp_path.join("noir"); - let bb_binary = BBPath::Default(noir_dir.join("bin").join("bb")); + let bb_binary = BBPath::check(noir_dir.join("bin").join("bb")).unwrap(); let circuits_dir = noir_dir.join("circuits"); let work_dir = noir_dir.join("work").join("test_node"); ZkBackend::with_config(bb_binary, circuits_dir, work_dir, config) @@ -59,6 +61,10 @@ async fn test_version_info_roundtrip() { #[tokio::test] async fn test_check_status_full_setup_needed() { + if let Some(_) = env::var("E3_CUSTOM_BB").ok() { + return; + } + let temp = get_tempdir().unwrap(); let backend = test_backend(temp.path(), ZkConfig::default()); @@ -72,6 +78,10 @@ async fn test_check_status_full_setup_needed() { #[tokio::test] async fn test_check_status_ready_when_installed() { + if let Some(_) = env::var("E3_CUSTOM_BB").ok() { + return; + } + let temp = get_tempdir().unwrap(); let config = ZkConfig::default(); let backend = test_backend(temp.path(), config.clone()); @@ -101,6 +111,10 @@ async fn test_check_status_ready_when_installed() { #[tokio::test] async fn test_check_status_bb_needs_update() { + if let Some(_) = env::var("E3_CUSTOM_BB").ok() { + return; + } + let temp = get_tempdir().unwrap(); let config = ZkConfig::default(); let backend = test_backend(temp.path(), config.clone()); diff --git a/crates/zk-prover/tests/common/helpers.rs b/crates/zk-prover/tests/common/helpers.rs index dbe24c178f..3fb0223031 100644 --- a/crates/zk-prover/tests/common/helpers.rs +++ b/crates/zk-prover/tests/common/helpers.rs @@ -130,7 +130,7 @@ pub async fn setup_test_prover(bb: &PathBuf) -> (ZkBackend, TempDir) { let temp_path = temp.path(); let noir_dir = temp_path.join("noir"); - let bb_binary = BBPath::Default(noir_dir.join("bin").join("bb")); + let bb_binary = BBPath::check(noir_dir.join("bin").join("bb")).unwrap(); let circuits_dir = noir_dir.join("circuits"); let work_dir = noir_dir.join("work").join("test_node"); let backend = ZkBackend::new(bb_binary.clone(), circuits_dir.clone(), work_dir.clone()); diff --git a/examples/CRISP/Cargo.lock b/examples/CRISP/Cargo.lock index 12df3eb34f..c8cafbff19 100644 --- a/examples/CRISP/Cargo.lock +++ b/examples/CRISP/Cargo.lock @@ -2521,6 +2521,8 @@ dependencies = [ "alloy", "anyhow", "derivative", + "e3-utils-derive", + "hex", "rand 0.8.5", "rand_chacha 0.3.1", "regex", @@ -2529,6 +2531,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "e3-utils-derive" +version = "0.1.15" +dependencies = [ + "hex", + "proc-macro2 1.0.106", + "quote 1.0.44", + "syn 2.0.116", +] + [[package]] name = "e3-zk-helpers" version = "0.1.15" diff --git a/examples/CRISP/server/Dockerfile b/examples/CRISP/server/Dockerfile index 888feee3f5..c8fd2bc4b3 100644 --- a/examples/CRISP/server/Dockerfile +++ b/examples/CRISP/server/Dockerfile @@ -85,13 +85,14 @@ COPY crates/program-server/Cargo.toml crates/program-server/Cargo.toml COPY crates/request/Cargo.toml crates/request/Cargo.toml COPY crates/sdk/Cargo.toml crates/sdk/Cargo.toml COPY crates/sortition/Cargo.toml crates/sortition/Cargo.toml -COPY crates/socket-server/Cargo.toml crates/socket-server/Cargo.toml +COPY crates/daemon-server/Cargo.toml crates/daemon-server/Cargo.toml COPY crates/support-scripts/Cargo.toml crates/support-scripts/Cargo.toml COPY crates/sync/Cargo.toml crates/sync/Cargo.toml COPY crates/test-helpers/Cargo.toml crates/test-helpers/Cargo.toml COPY crates/tests/Cargo.toml crates/tests/Cargo.toml COPY crates/trbfv/Cargo.toml crates/trbfv/Cargo.toml COPY crates/utils/Cargo.toml crates/utils/Cargo.toml +COPY crates/utils-derive/Cargo.toml crates/utils-derive/Cargo.toml COPY crates/wasm/Cargo.toml crates/wasm/Cargo.toml COPY crates/zk-helpers/Cargo.toml crates/zk-helpers/Cargo.toml diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1a727acc4e..15d8294313 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -731,7 +731,7 @@ importers: version: 5.3.0 '@risc0/ethereum': specifier: file:lib/risc0-ethereum - version: file:templates/default/lib/risc0-ethereum + version: risc0-ethereum@file:templates/default/lib/risc0-ethereum '@types/chai': specifier: ^4.2.0 version: 4.3.20 @@ -3103,9 +3103,6 @@ packages: '@reown/appkit@1.7.8': resolution: {integrity: sha512-51kTleozhA618T1UvMghkhKfaPcc9JlKwLJ5uV+riHyvSoWPKPRIa5A6M1Wano5puNyW0s3fwywhyqTHSilkaA==} - '@risc0/ethereum@file:templates/default/lib/risc0-ethereum': - resolution: {directory: templates/default/lib/risc0-ethereum, type: directory} - '@rolldown/pluginutils@1.0.0-beta.27': resolution: {integrity: sha512-+d0F4MKMCbeVUJwG96uQ4SgAznZNSq93I3V+9NHA4OpvqG8mRCpGdKmK8l/dl02h2CCDHwW2FqilnTyDcAnqjA==} @@ -8789,6 +8786,9 @@ packages: resolution: {integrity: sha512-5Di9UC0+8h1L6ZD2d7awM7E/T4uA1fJRlx6zk/NvdCCVEoAnFqvHmCuNeIKoCeIixBX/q8uM+6ycDvF8woqosA==} engines: {node: '>= 0.8'} + risc0-ethereum@file:templates/default/lib/risc0-ethereum: + resolution: {directory: templates/default/lib/risc0-ethereum, type: directory} + robust-predicates@3.0.2: resolution: {integrity: sha512-IXgzBWvWQwE6PrDI05OvmXUIruQTcoMDzRsOd5CDvHCVLcLHMTSYvOK5Cm46kWqlV3yAbuSpBZdJ5oP5OUoStg==} @@ -13244,8 +13244,6 @@ snapshots: - utf-8-validate - zod - '@risc0/ethereum@file:templates/default/lib/risc0-ethereum': {} - '@rolldown/pluginutils@1.0.0-beta.27': {} '@rollup/plugin-inject@5.0.5(rollup@4.52.5)': @@ -20117,7 +20115,7 @@ snapshots: '@noble/hashes': 1.8.0 '@scure/bip32': 1.7.0 '@scure/bip39': 1.6.0 - abitype: 1.1.1(typescript@5.8.3)(zod@3.25.76) + abitype: 1.1.1(typescript@5.8.3)(zod@3.22.4) eventemitter3: 5.0.1 optionalDependencies: typescript: 5.8.3 @@ -20971,6 +20969,8 @@ snapshots: hash-base: 3.1.2 inherits: 2.0.4 + risc0-ethereum@file:templates/default/lib/risc0-ethereum: {} + robust-predicates@3.0.2: {} rollup@4.52.5: @@ -22171,7 +22171,7 @@ snapshots: '@noble/hashes': 1.8.0 '@scure/bip32': 1.7.0 '@scure/bip39': 1.6.0 - abitype: 1.1.0(typescript@5.8.3)(zod@3.25.76) + abitype: 1.1.0(typescript@5.8.3)(zod@3.22.4) isows: 1.0.7(ws@8.18.3(bufferutil@4.0.9)(utf-8-validate@5.0.10)) ox: 0.9.6(typescript@5.8.3) ws: 8.18.3(bufferutil@4.0.9)(utf-8-validate@5.0.10) diff --git a/tests/integration/base.sh b/tests/integration/base.sh index 5134ab0748..4ab21fa397 100755 --- a/tests/integration/base.sh +++ b/tests/integration/base.sh @@ -76,6 +76,11 @@ pnpm committee:new \ wait_for_committee_pubkey 0 "$SCRIPT_DIR/output/pubkey.bin" +heading "Query events via daemon REST API" +daemon_query_events cn1 "$SCRIPT_DIR/output/events.txt" + +check_last_line "$SCRIPT_DIR/output/events.txt" '{"Next":10}' + heading "Mock encrypted plaintext" $SCRIPT_DIR/lib/fake_encrypt.sh --input "$SCRIPT_DIR/output/pubkey.bin" --output "$SCRIPT_DIR/output/output.bin" --plaintext $PLAINTEXT --params "$ENCODED_PARAMS" diff --git a/tests/integration/fns.sh b/tests/integration/fns.sh index f4594e5741..76afc0029a 100644 --- a/tests/integration/fns.sh +++ b/tests/integration/fns.sh @@ -276,6 +276,31 @@ gracefull_shutdown() { kill_em_all } +daemon_query_events() { + local name="$1" + local output_file="${2:-$SCRIPT_DIR/output/events.txt}" + + local ctrl_port=$($ENCLAVE_BIN config get ctrl_port \ + --name $name \ + --config "$SCRIPT_DIR/enclave.config.yaml") + + local json_payload='{"command":{"EventsQuery":{"since":0,"limit":10}}}' + curl -sf -X POST "http://127.0.0.1:${ctrl_port}" \ + -H "Content-Type: application/json" \ + -d "$json_payload" > "$output_file" + +} + +check_last_line() { + local file="$1" + local expected="$2" + if [[ "$(tail -1 "$file")" == "$expected" ]]; then + return 0 + else + return 1 + fi +} + # Run this at the start of every test to ensure we start with a clean slate kill_em_all