diff --git a/Cargo.lock b/Cargo.lock index c7b3ed8..f2babdb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,12 +26,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "allocator-api2" -version = "0.2.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" - [[package]] name = "anstream" version = "0.6.15" @@ -81,6 +75,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "anyhow" +version = "1.0.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" + [[package]] name = "arbitrary" version = "1.3.2" @@ -102,21 +102,15 @@ dependencies = [ "password-hash", ] -[[package]] -name = "async-ffi" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4de21c0feef7e5a556e51af767c953f0501f7f300ba785cc99c47bdc8081a50" - [[package]] name = "async-trait" version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -133,7 +127,7 @@ checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core", - "base64 0.21.7", + "base64", "bytes", "futures-util", "http", @@ -198,24 +192,12 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "base16ct" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" - [[package]] name = "base64" version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" -[[package]] -name = "base64" -version = "0.22.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" - [[package]] name = "base64ct" version = "1.6.0" @@ -246,6 +228,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "build-target" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "832133bbabbbaa9fbdba793456a2827627a7d2b8fb96032fa1e7666d7895832b" + [[package]] name = "bumpalo" version = "3.16.0" @@ -269,12 +257,14 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.8" +version = "1.2.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504bdec147f2cc13c8b57ed9401fd8a147cc66b67ad5cb241394244f2c947549" +checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" dependencies = [ + "find-msvc-tools", "jobserver", "libc", + "shlex", ] [[package]] @@ -283,12 +273,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" - [[package]] name = "clap" version = "4.5.13" @@ -318,9 +302,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "501d359d5f3dcaf6ecdeee48833ae73ec6e42723a1e52419c79abf9507eec0a0" dependencies = [ "heck", - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -329,6 +313,15 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +[[package]] +name = "cmake" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +dependencies = [ + "cc", +] + [[package]] name = "color-eyre" version = "0.6.3" @@ -376,27 +369,16 @@ dependencies = [ ] [[package]] -name = "const-oid" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" - -[[package]] -name = "core-foundation" -version = "0.10.0" +name = "copy_to_output" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b55271e5c8c478ad3f38ad24ef34923091e0548492a266d19b3c0b4d82574c63" +checksum = "e43a9eb1b6ba30353653c07512fd8d4eeee3c8800d84194316cbc7e148d2ebae" dependencies = [ - "core-foundation-sys", - "libc", + "anyhow", + "build-target", + "fs_extra", ] -[[package]] -name = "core-foundation-sys" -version = "0.8.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" - [[package]] name = "cpufeatures" version = "0.2.12" @@ -406,18 +388,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crypto-bigint" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" -dependencies = [ - "generic-array", - "rand_core", - "subtle", - "zeroize", -] - [[package]] name = "crypto-common" version = "0.1.6" @@ -440,10 +410,7 @@ version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" dependencies = [ - "const-oid", "der_derive", - "flagset", - "pem-rfc7468", "zeroize", ] @@ -453,9 +420,9 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8034092389675178f570469e6c3b0465d3d30b4505c294a6550db47f3c17ad18" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -473,9 +440,9 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -498,7 +465,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", - "const-oid", "crypto-common", "subtle", ] @@ -509,81 +475,12 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59f8e79d1fbf76bdfbde321e902714bf6c49df88a7dda6fc682fc2979226962d" -[[package]] -name = "dlopen" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e80ad39f814a9abe68583cd50a2d45c8a67561c3361ab8da240587dda80937" -dependencies = [ - "dlopen_derive", - "lazy_static", - "libc", - "winapi", -] - -[[package]] -name = "dlopen_derive" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f236d9e1b1fbd81cea0f9cbdc8dcc7e8ebcd80e6659cd7cb2ad5f6c05946c581" -dependencies = [ - "libc", - "quote 0.6.13", - "syn 0.15.44", -] - [[package]] name = "dotenvy" version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" -[[package]] -name = "ease-off" -version = "0.1.0" -source = "git+https://github.com/abonander/ease-off?rev=2593663be62491803d8db27c92cd8fa5146b7504#2593663be62491803d8db27c92cd8fa5146b7504" -dependencies = [ - "rand", - "thiserror 1.0.69", - "tokio", -] - -[[package]] -name = "ecdsa" -version = "0.16.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" -dependencies = [ - "der", - "digest", - "elliptic-curve", - "rfc6979", - "serdect", - "signature", - "spki", -] - -[[package]] -name = "elliptic-curve" -version = "0.13.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47" -dependencies = [ - "base16ct", - "crypto-bigint", - "digest", - "ff", - "generic-array", - "group", - "pem-rfc7468", - "pkcs8", - "rand_core", - "sec1", - "serdect", - "subtle", - "zeroize", -] - [[package]] name = "encode_unicode" version = "0.3.6" @@ -626,12 +523,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "fastcdc" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a71061d097bfa9a5a4d2efdec57990d9a88745020b365191d37e48541a1628f2" - [[package]] name = "fastrand" version = "2.1.0" @@ -639,20 +530,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] -name = "ff" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449" -dependencies = [ - "rand_core", - "subtle", -] - -[[package]] -name = "flagset" -version = "0.4.6" +name = "find-msvc-tools" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "fnv" @@ -692,7 +573,7 @@ dependencies = [ "rustls-pemfile", "serde", "slotmap", - "tashi-consensus-engine", + "tashi-vertex", "thiserror 1.0.69", "time", "tokio", @@ -715,6 +596,12 @@ dependencies = [ "rumqttd", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures" version = "0.3.30" @@ -769,9 +656,9 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -812,7 +699,6 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", - "zeroize", ] [[package]] @@ -834,26 +720,11 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" -[[package]] -name = "group" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" -dependencies = [ - "ff", - "rand_core", - "subtle", -] - [[package]] name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -dependencies = [ - "allocator-api2", - "serde", -] [[package]] name = "heck" @@ -873,21 +744,6 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" -[[package]] -name = "hex-literal" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" - -[[package]] -name = "hmac" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" -dependencies = [ - "digest", -] - [[package]] name = "http" version = "1.1.0" @@ -951,25 +807,6 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", - "want", -] - -[[package]] -name = "hyper-rustls" -version = "0.27.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" -dependencies = [ - "futures-util", - "http", - "hyper", - "hyper-util", - "rustls", - "rustls-native-certs", - "rustls-pki-types", - "tokio", - "tokio-rustls", - "tower-service", ] [[package]] @@ -979,17 +816,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" dependencies = [ "bytes", - "futures-channel", "futures-util", "http", "http-body", "hyper", "pin-project-lite", - "socket2", "tokio", - "tower", - "tower-service", - "tracing", ] [[package]] @@ -1018,12 +850,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "ipnet" -version = "2.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" - [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1179,9 +1005,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ "proc-macro-crate", - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1199,12 +1025,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" -[[package]] -name = "openssl-probe" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" - [[package]] name = "overload" version = "0.1.1" @@ -1217,19 +1037,6 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" -[[package]] -name = "p256" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b" -dependencies = [ - "ecdsa", - "elliptic-curve", - "primeorder", - "serdect", - "sha2", -] - [[package]] name = "parking_lot" version = "0.12.3" @@ -1264,15 +1071,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "pem-rfc7468" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" -dependencies = [ - "base64ct", -] - [[package]] name = "percent-encoding" version = "2.3.1" @@ -1294,9 +1092,9 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1311,16 +1109,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "pkcs8" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" -dependencies = [ - "der", - "spki", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -1336,16 +1124,6 @@ dependencies = [ "zerocopy", ] -[[package]] -name = "primeorder" -version = "0.13.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6" -dependencies = [ - "elliptic-curve", - "serdect", -] - [[package]] name = "proc-macro-crate" version = "3.1.0" @@ -1355,15 +1133,6 @@ dependencies = [ "toml_edit 0.21.1", ] -[[package]] -name = "proc-macro2" -version = "0.4.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" -dependencies = [ - "unicode-xid", -] - [[package]] name = "proc-macro2" version = "1.0.92" @@ -1373,24 +1142,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "quinn" -version = "0.11.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" -dependencies = [ - "bytes", - "pin-project-lite", - "quinn-proto", - "quinn-udp", - "rustc-hash", - "rustls", - "socket2", - "thiserror 2.0.3", - "tokio", - "tracing", -] - [[package]] name = "quinn-proto" version = "0.11.9" @@ -1412,36 +1163,13 @@ dependencies = [ "web-time", ] -[[package]] -name = "quinn-udp" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d5a626c6807713b15cac82a6acaccd6043c9a5408c24baae07611fec3f243da" -dependencies = [ - "cfg_aliases", - "libc", - "once_cell", - "socket2", - "tracing", - "windows-sys 0.59.0", -] - -[[package]] -name = "quote" -version = "0.6.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce23b6b870e8f94f81fb0a363d65d86675884b34a09043c81e5562f11c1f8e1" -dependencies = [ - "proc-macro2 0.4.30", -] - [[package]] name = "quote" version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ - "proc-macro2 1.0.92", + "proc-macro2", ] [[package]] @@ -1474,15 +1202,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "rand_pcg" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59cad018caf63deb318e5a4586d99a24424a364f40f1e5778c29aca23f4fc73e" -dependencies = [ - "rand_core", -] - [[package]] name = "redox_syscall" version = "0.5.3" @@ -1536,58 +1255,6 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" -[[package]] -name = "reqwest" -version = "0.12.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" -dependencies = [ - "base64 0.22.1", - "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-body-util", - "hyper", - "hyper-rustls", - "hyper-util", - "ipnet", - "js-sys", - "log", - "mime", - "once_cell", - "percent-encoding", - "pin-project-lite", - "quinn", - "rustls", - "rustls-native-certs", - "rustls-pemfile", - "rustls-pki-types", - "serde", - "serde_json", - "serde_urlencoded", - "sync_wrapper 1.0.1", - "tokio", - "tokio-rustls", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", - "windows-registry", -] - -[[package]] -name = "rfc6979" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2" -dependencies = [ - "hmac", - "subtle", -] - [[package]] name = "ring" version = "0.17.8" @@ -1642,7 +1309,6 @@ version = "0.23.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" dependencies = [ - "log", "once_cell", "ring", "rustls-pki-types", @@ -1651,18 +1317,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" -dependencies = [ - "openssl-probe", - "rustls-pki-types", - "schannel", - "security-framework", -] - [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -1704,74 +1358,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - -[[package]] -name = "schannel" -version = "0.1.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" -dependencies = [ - "windows-sys 0.52.0", -] - [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sec1" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc" -dependencies = [ - "base16ct", - "der", - "generic-array", - "pkcs8", - "serdect", - "subtle", - "zeroize", -] - -[[package]] -name = "security-framework" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1415a607e92bec364ea2cf9264646dcce0f91e6d65281bd6f2819cca3bf39c8" -dependencies = [ - "bitflags", - "core-foundation", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - -[[package]] -name = "security-framework-sys" -version = "2.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" -dependencies = [ - "core-foundation-sys", - "libc", -] - -[[package]] -name = "semver" -version = "1.0.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" - [[package]] name = "serde" version = "1.0.204" @@ -1787,9 +1379,9 @@ version = "1.0.204" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1835,16 +1427,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serdect" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a84f14a19e9a014bb9f4512488d9829a68e04ecabffb0f9904cd1ace94598177" -dependencies = [ - "base16ct", - "serde", -] - [[package]] name = "sha1" version = "0.10.6" @@ -1856,17 +1438,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sha2" -version = "0.10.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest", -] - [[package]] name = "sharded-slab" version = "0.1.7" @@ -1882,6 +1453,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -1891,16 +1468,6 @@ dependencies = [ "libc", ] -[[package]] -name = "signature" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" -dependencies = [ - "digest", - "rand_core", -] - [[package]] name = "slab" version = "0.4.9" @@ -1941,22 +1508,6 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -[[package]] -name = "spki" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" -dependencies = [ - "base64ct", - "der", -] - -[[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - [[package]] name = "strsim" version = "0.11.1" @@ -1969,25 +1520,14 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" -[[package]] -name = "syn" -version = "0.15.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca4b3b69a77cbe1ffc9e198781b7acb0c7365a883670e8f1c1bc66fba79a5c5" -dependencies = [ - "proc-macro2 0.4.30", - "quote 0.6.13", - "unicode-xid", -] - [[package]] name = "syn" version = "2.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44d46482f1c1c87acd84dea20c1bf5ebff4c757009ed6bf19cfd36fb10e92c4e" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", + "proc-macro2", + "quote", "unicode-ident", ] @@ -2002,93 +1542,16 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" -dependencies = [ - "futures-core", -] - -[[package]] -name = "tashi-collections" -version = "0.3.0" -source = "git+ssh://git@github.com/tashigg/tashi-consensus-engine?rev=aaf0206f2a71c710bafe6c5c89f172e5fae2b277#aaf0206f2a71c710bafe6c5c89f172e5fae2b277" -dependencies = [ - "fnv", - "hashbrown", -] [[package]] -name = "tashi-consensus-engine" -version = "0.3.0" -source = "git+ssh://git@github.com/tashigg/tashi-consensus-engine?rev=aaf0206f2a71c710bafe6c5c89f172e5fae2b277#aaf0206f2a71c710bafe6c5c89f172e5fae2b277" -dependencies = [ - "bytes", - "der", - "dlopen", - "ease-off", - "eyre", - "fastcdc", - "futures-util", - "hex", - "hex-literal", - "libc", - "p256", - "quinn-proto", - "rand", - "rand_pcg", - "reqwest", - "rustls", - "serde", - "slotmap", - "socket2", - "tashi-collections", - "tashi-crypto", - "tce_plugin_common", - "thiserror 1.0.69", - "tokio", - "tokio-util", - "tracing", - "triomphe", - "uuid", - "walkdir", - "windows-sys 0.52.0", - "x509-cert", -] - -[[package]] -name = "tashi-crypto" -version = "0.3.0" -source = "git+ssh://git@github.com/tashigg/tashi-consensus-engine?rev=aaf0206f2a71c710bafe6c5c89f172e5fae2b277#aaf0206f2a71c710bafe6c5c89f172e5fae2b277" -dependencies = [ - "der", - "eyre", - "hex", - "p256", - "pem-rfc7468", - "rand", - "rand_pcg", - "rustls", - "rustls-pemfile", - "sec1", - "serde", - "sha2", - "thiserror 1.0.69", - "tracing", - "triomphe", - "uuid", - "x509-cert", -] - -[[package]] -name = "tce_plugin_common" +name = "tashi-vertex" version = "0.1.0" -source = "git+ssh://git@github.com/tashigg/tce-plugin-common.git?rev=08d10efd4a897bc8278517bb0e5fd8f29cd0b27b#08d10efd4a897bc8278517bb0e5fd8f29cd0b27b" dependencies = [ - "async-ffi", + "anyhow", + "cmake", + "copy_to_output", "der", - "dlopen", - "p256", - "semver", - "tokio", - "triomphe", + "serde", ] [[package]] @@ -2128,9 +1591,9 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -2139,9 +1602,9 @@ version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -2200,27 +1663,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" -[[package]] -name = "tls_codec" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e78c9c330f8c85b2bae7c8368f2739157db9991235123aa1b15ef9502bfb6a" -dependencies = [ - "tls_codec_derive", - "zeroize", -] - -[[package]] -name = "tls_codec_derive" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d9ef545650e79f30233c0003bcc2504d7efac6dad25fca40744de773fe2049c" -dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", -] - [[package]] name = "tokio" version = "1.41.1" @@ -2236,7 +1678,6 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "tracing", "windows-sys 0.52.0", ] @@ -2246,9 +1687,9 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -2379,9 +1820,9 @@ version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -2446,22 +1887,6 @@ dependencies = [ "tracing-serde", ] -[[package]] -name = "triomphe" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6631e42e10b40c0690bf92f404ebcfe6e1fdb480391d15f17cc8e96eeed5369" -dependencies = [ - "serde", - "stable_deref_trait", -] - -[[package]] -name = "try-lock" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" - [[package]] name = "tungstenite" version = "0.21.0" @@ -2514,12 +1939,6 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" -[[package]] -name = "unicode-xid" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" - [[package]] name = "untrusted" version = "0.9.0" @@ -2549,16 +1968,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" -[[package]] -name = "uuid" -version = "1.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" -dependencies = [ - "getrandom", - "serde", -] - [[package]] name = "valuable" version = "0.1.0" @@ -2571,25 +1980,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "walkdir" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" -dependencies = [ - "same-file", - "winapi-util", -] - -[[package]] -name = "want" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" -dependencies = [ - "try-lock", -] - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2615,31 +2005,19 @@ dependencies = [ "bumpalo", "log", "once_cell", - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", "wasm-bindgen-shared", ] -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.42" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "wasm-bindgen-macro" version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" dependencies = [ - "quote 1.0.36", + "quote", "wasm-bindgen-macro-support", ] @@ -2649,9 +2027,9 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2662,16 +2040,6 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" -[[package]] -name = "web-sys" -version = "0.3.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - [[package]] name = "web-time" version = "1.1.0" @@ -2698,51 +2066,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows-registry" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" -dependencies = [ - "windows-result", - "windows-strings", - "windows-targets", -] - -[[package]] -name = "windows-result" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" -dependencies = [ - "windows-targets", -] - -[[package]] -name = "windows-strings" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" -dependencies = [ - "windows-result", - "windows-targets", -] - [[package]] name = "windows-sys" version = "0.52.0" @@ -2843,20 +2172,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "x509-cert" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1301e935010a701ae5f8655edc0ad17c44bad3ac5ce8c39185f75453b720ae94" -dependencies = [ - "const-oid", - "der", - "sha1", - "signature", - "spki", - "tls_codec", -] - [[package]] name = "zerocopy" version = "0.7.35" @@ -2873,9 +2188,9 @@ version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -2883,17 +2198,3 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" -dependencies = [ - "proc-macro2 1.0.92", - "quote 1.0.36", - "syn 2.0.89", -] diff --git a/Cargo.toml b/Cargo.toml index 6ea4de5..a56a55f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,9 +16,9 @@ edition = "2021" publish = false default-run = "foxmq" -[dependencies.tashi-consensus-engine] -git = "ssh://git@github.com/tashigg/tashi-consensus-engine" -rev = "aaf0206f2a71c710bafe6c5c89f172e5fae2b277" +[dependencies.tashi-vertex] +git = "ssh://git@github.com/tashigg/tashi-vertex-rs.git" +branch = "main" [dependencies] fnv = "1.0.7" @@ -36,8 +36,8 @@ serde = { version = "1.0.196", features = ["derive", "std"] } thiserror = "1.0.56" toml = { version = "0.8.10", features = ["parse"] } tracing = { version = "0.1.40", features = ["attributes"] } -tokio-util = "0.7.10" -der = { version = "0.7.9", features = ["derive"] } +tokio-util = { version = "0.7.10", features = ["time"] } +der = { version = "0.7.9", features = ["derive", "alloc", "std"] } futures = { version = "0.3.30", features = ["std", "alloc"] } slotmap = "1.0.7" arbitrary = { version = "1", optional = true, features = ["derive"] } diff --git a/README.md b/README.md index 8452d5e..be01e0e 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,9 @@ # foxmq Message queue MeshApp utilizing Tashi Consensus Engine +### Quick Start +See [example.md](example.md) for a step-by-step guide on running and testing the message queue in WSL. + ### Checking Out When checking out, be sure to initialize and update submodules: diff --git a/example.md b/example.md new file mode 100644 index 0000000..98f0a8a --- /dev/null +++ b/example.md @@ -0,0 +1,39 @@ +# FoxMQ Usage Example + +This example demonstrates how to run the FoxMQ broker and test it using standard MQTT clients in a WSL environment. + +## Prerequisites + +Ensure you are in a **WSL** terminal, as the Tashi Vertex native libraries require Linux for linking and execution. + +## Step-by-Step Demo + +### 1. Terminal 1: Start the Broker +Start the FoxMQ broker with anonymous login enabled for testing: + +```bash +cd /mnt/e/projects/tashi/message-queue +cargo run -- run foxmq.d --allow-anonymous-login +``` + +### 2. Terminal 2: Subscribe to a Topic +Open a second WSL terminal and subscribe to a test topic. This client will wait for messages ordered by the Tashi Consensus Engine. + +```bash +mosquitto_sub -h 127.0.0.1 -p 1883 -v -t "tashi/mesh/demo" +``` + +### 3. Terminal 3: Publish a Message +Open a third WSL terminal and send a message. This message is wrapped in a Tashi transaction, ordered by the consensus layer, and then delivered to all subscribers. + +```bash +mosquitto_pub -h 127.0.0.1 -p 1883 -t "tashi/mesh/demo" -m "Message ordered by Tashi Consensus!" +``` + +## What's Happening? + +1. **MQTT Publish**: The `mosquitto_pub` command sends the message to FoxMQ. +2. **Consensus Layer**: FoxMQ submits the message to the `tashi-vertex`, which assigns it a global order and a consensus timestamp. +3. **MQTT Delivery**: FoxMQ receives the ordered event back and delivers it to `mosquitto_sub`. + +This ensures that in a multi-node cluster, every node sees the exact same sequence of messages at the exact same time. diff --git a/src/cli/address_book.rs b/src/cli/address_book.rs index 63ff7f7..cab6136 100644 --- a/src/cli/address_book.rs +++ b/src/cli/address_book.rs @@ -6,7 +6,7 @@ use std::{fs, io}; use color_eyre::eyre; use color_eyre::eyre::{eyre, WrapErr}; use serde::Serialize; -use tashi_consensus_engine::SecretKey; +use tashi_vertex::KeySecret; use crate::cli::LogFormat; use crate::collections::HashSet; @@ -150,15 +150,15 @@ fn generate_address_book( for (i, address) in addresses.enumerate() { eyre::ensure!(address_set.insert(address), "Duplicate address: {address}"); - let key = SecretKey::generate(); - let pubkey = key.public_key(); + let key = KeySecret::generate(); + let pubkey = key.public(); - let pem = key.to_pem(); + let key_str = key.to_string(); - let pem_filename = format!("key_{i}.pem"); + let pem_filename = format!("key_{i}.key"); let pem_path = output_dir.join(&pem_filename); - write_new_file(&pem_path, pem, force)?; + write_new_file(&pem_path, key_str, force)?; // Mark each entry with a comment referencing its key file. writeln!(address_book, "# {pem_filename}").expect("writing to a String cannot fail"); diff --git a/src/cli/run.rs b/src/cli/run.rs index 9191a23..c7d3254 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -1,22 +1,19 @@ use crate::cli::LogFormat; -use crate::collections::HashMap; use crate::config; use crate::config::addresses::Addresses; use crate::config::permissions::PermissionsConfig; use crate::config::users::{AuthConfig, UsersConfig}; use crate::mqtt::broker::{self, MqttBroker}; use crate::mqtt::{KeepAlive, TceState}; -use crate::transaction::AddNodeTransaction; +// use crate::transaction::AddNodeTransaction; use color_eyre::eyre; use color_eyre::eyre::Context; use std::net::SocketAddr; use std::path::{Path, PathBuf}; +use std::str::FromStr; use std::sync::Arc; -use tashi_consensus_engine::quic::QuicSocket; -use tashi_consensus_engine::{ - Certificate, Platform, RootCertificates, SecretKey, UnknownConnectionAction, -}; -use tokio::sync::mpsc; +use tashi_vertex::{KeyPublic, KeySecret, Options}; +use tokio_rustls::rustls; #[derive(clap::Args, Clone, Debug)] pub struct RunArgs { @@ -155,27 +152,46 @@ pub struct ClusterConfig { pub cluster_accept_peer_with_cert: bool, } +trait KeySecretToRustls { + fn to_rustls(&self) -> crate::Result>; +} + +impl KeySecretToRustls for KeySecret { + fn to_rustls(&self) -> crate::Result> { + let der = self.to_der_vec().wrap_err("failed to encode key to DER")?; + Ok(rustls::pki_types::PrivateKeyDer::Pkcs8(der.into())) + } +} + struct TceConfig { - config: tashi_consensus_engine::Config, - roots: Option>, - add_nodes: mpsc::UnboundedReceiver, - joining_running_session: bool, + options: Options, + pub secret_key: KeySecret, + pub _joining_running_session: bool, + pub initial_peers: Vec<(KeyPublic, SocketAddr)>, } impl SecretKeyOpt { /// NOTE: uses blocking I/O internally if the secret key was specified as a file. - pub fn read_key(&self) -> crate::Result { - if let Some(der) = &self.secret_key { - let der_bytes = hex::decode(der).wrap_err("error decoding hex-encoded secret key")?; - return SecretKey::from_der(&der_bytes) - .wrap_err("error decoding P-256 secret key from DER"); + pub fn read_key(&self) -> crate::Result { + if let Some(key_input) = &self.secret_key { + // First, try to parse directly as a Base58 secret key + if let Ok(key) = KeySecret::from_str(key_input) { + return Ok(key); + } + + // If that fails, treat it as a file path + let content = std::fs::read_to_string(key_input) + .wrap_err_with(|| format!("failed to read secret key file: {key_input}"))?; + + return KeySecret::from_str(content.trim()) + .wrap_err("failed to parse secret key from file"); } if let Some(path) = &self.secret_key_file { return read_secret_key(path); } - Ok(SecretKey::generate()) + Ok(KeySecret::generate()) } } @@ -243,13 +259,9 @@ pub fn main(args: RunArgs) -> crate::Result<()> { ) })? } else { - vec![Certificate::generate_self_signed( - &key, - tls_socket_addr, - &args.tls_config.server_name, - None, - )? - .into_rustls()] + // Certificate::generate_self_signed was from TCE. + // We need to implement using rcgen or similar if needed, or require cert file. + eyre::bail!("Self-signed certificate generation is temporarily disabled during migration. Please provide a certificate file."); }; eyre::Ok(broker::TlsConfig { @@ -277,23 +289,41 @@ async fn main_async( ) -> crate::Result<()> { let tce = match tce_config { Some(tce_config) => { - let (platform, messages) = Platform::start( - tce_config.config, - QuicSocket::bind_udp(args.cluster_addr).await?, - tce_config.joining_running_session, - )?; + let context = + Arc::new(tashi_vertex::Context::new().wrap_err("failed to create context")?); + let socket = tashi_vertex::Socket::bind(&context, &args.cluster_addr.to_string()) + .await + .wrap_err("failed to bind cluster socket")?; + let mut peers = tashi_vertex::Peers::new().wrap_err("failed to create peers")?; + + for (key, addr) in tce_config.initial_peers { + peers + .insert( + &addr.to_string(), + &key, + tashi_vertex::PeerCapabilities::default(), + ) + .wrap_err("failed to add initial peer")?; + } + + let engine = tashi_vertex::Engine::start( + &context, + socket, + tce_config.options, + &tce_config.secret_key, + peers, + ) + .wrap_err("failed to start engine")?; Some(TceState { - platform: Arc::new(platform), - messages, - roots: tce_config.roots, - add_nodes: tce_config.add_nodes, + engine: Arc::new(engine), + context, }) } None => None, }; - let tce_platform = tce.as_ref().map(|tce| tce.platform.clone()); + let tce_platform = tce.as_ref().map(|tce| tce.engine.clone()); let mut broker = MqttBroker::bind( args.mqtt_addr, @@ -315,8 +345,8 @@ async fn main_async( res = tokio::signal::ctrl_c() => { res.wrap_err("error from ctrl_c() handler")?; - if let Some(platform) = tce_platform { - platform.shutdown().await; + if let Some(_platform) = tce_platform { + // platform.shutdown().await; } break; } @@ -332,34 +362,41 @@ async fn main_async( } fn create_tce_config( - secret_key: SecretKey, + secret_key: KeySecret, addresses: &Addresses, config: &ClusterConfig, ) -> crate::Result { - let nodes: HashMap<_, _> = addresses - .addresses - .iter() - .map(|address| (address.key.clone(), address.addr)) - .collect(); + // let nodes: HashMap<_, _> = addresses + // .addresses + // .iter() + // .map(|address| (address.key.clone(), address.addr)) + // .collect(); // The address book is only required to contain the existing nodes. - let joining_running_session = !nodes.contains_key(&secret_key.public_key()); - - let mut tce_config = tashi_consensus_engine::Config::new(secret_key); - - tce_config - .initial_nodes(nodes.into_iter().collect()) - .enable_hole_punching(false) - // TODO: we can dispatch messages before they come to consensus - // but we need to make sure we don't duplicate PUBLISHes. - // .report_events_before_consensus(true) - // Since a FoxMQ cluster is permissioned, don't kick failed nodes which may later recover. - .fallen_behind_kick_seconds(None); + // let joining_running_session = !nodes.contains_key(&secret_key.public()); + // For now assume we are always joining or starting based on some other logic, or just let vertex handle it? + // tashi-vertex handle this via initial peers. + let _joining_running_session = false; // TODO: restore logic + + let options = Options::new(); + + // Add initial peers + let mut initial_peers = Vec::new(); + for address in &addresses.addresses { + let addr: SocketAddr = address + .addr + .to_string() + .parse() + .expect("invalid socket address"); + initial_peers.push((address.key.clone(), addr)); + } - if let Some(cert_path) = &config.cluster_cert { - tce_config.tls_cert_chain(Certificate::load_chain_from(cert_path)?); + if let Some(_cert_path) = &config.cluster_cert { + // TODO: handle certs in tashi-vertex + // options.tls_cert_chain(Certificate::load_chain_from(cert_path)?); } + /* let roots = if let Some(root_cert_path) = &config.cluster_root_cert { let roots = Arc::new(RootCertificates::read_from(root_cert_path)?); tce_config.tls_roots(roots.clone()); @@ -367,7 +404,9 @@ fn create_tce_config( } else { None }; + */ + /* let (add_nodes_tx, add_nodes_rx) = mpsc::unbounded_channel(); if config.cluster_accept_peer_with_cert { @@ -385,20 +424,21 @@ fn create_tce_config( Ok(UnknownConnectionAction::VoteToAddPeer) }); } + */ Ok(TceConfig { - config: tce_config, - roots, - add_nodes: add_nodes_rx, - joining_running_session, + options, + secret_key: secret_key.clone(), + _joining_running_session, + initial_peers, }) } /// NOTE: uses blocking I/O internally. -fn read_secret_key(path: &Path) -> crate::Result { +fn read_secret_key(path: &Path) -> crate::Result { // There's no benefit to using `tokio::fs` because it just does the blocking work on a background thread. let pem = std::fs::read(path).wrap_err_with(|| format!("error reading {}", path.display()))?; - SecretKey::from_pem(&pem) + KeySecret::from_der(&pem) .wrap_err_with(|| format!("error reading P-256 secret key from {}", path.display())) } diff --git a/src/config/addresses.rs b/src/config/addresses.rs index f8ac31c..9a01ac1 100644 --- a/src/config/addresses.rs +++ b/src/config/addresses.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use std::path::Path; -use tashi_consensus_engine::PublicKey; +use tashi_vertex::KeyPublic; #[derive(serde::Deserialize, serde::Serialize)] pub struct Addresses { @@ -10,7 +10,7 @@ pub struct Addresses { #[derive(serde::Deserialize, serde::Serialize)] pub struct Address { - pub key: PublicKey, + pub key: KeyPublic, pub addr: SocketAddr, } diff --git a/src/lib.rs b/src/lib.rs index cac7921..2c7bf3b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,21 @@ use color_eyre::eyre::WrapErr; pub use color_eyre::eyre::{Error, Result}; use tracing_subscriber::EnvFilter; -pub use tashi_consensus_engine::{flatten_task_result, map_join_error}; +pub fn map_join_error(e: tokio::task::JoinError) -> Error { + if e.is_cancelled() { + eyre!("task cancelled") + } else { + eyre!(e).wrap_err("task panicked") + } +} + +pub fn flatten_task_result(res: Result, tokio::task::JoinError>) -> Result { + match res { + Ok(Ok(val)) => Ok(val), + Ok(Err(err)) => Err(err), + Err(e) => Err(map_join_error(e)), + } +} use crate::cli::LogFormat; diff --git a/src/mqtt/broker/connection.rs b/src/mqtt/broker/connection.rs index 239218b..712cd0a 100644 --- a/src/mqtt/broker/connection.rs +++ b/src/mqtt/broker/connection.rs @@ -718,26 +718,28 @@ impl Connection { tracing::trace!("submitting transaction: {transaction:?}"); - if let Some(tce_platform) = &self.shared.tce_platform { - tce_platform - .reserve_tx() - .await - // If this fails, we'll drop the connection without sending a PUBACK/PUBREC, - // so the client will know we died before taking ownership and can try another broker. - .or_else(|_| disconnect!(ServerShuttingDown, "broker shutting down"))? - .send( - transaction - .to_der() - .map_err(Into::into) - .and_then(|it| it.try_into()) - .or_else(|e| { - tracing::error!(?transaction, ?e, "failed to encode transaction"); - disconnect!( - ProtocolError, - "PUBLISH exceeded consensus protocol size limit" - ); - })?, - ); + if let Some(engine) = &self.shared.engine { + // Encode the transaction to DER + let der = transaction.to_der().map_err(|e| { + tracing::error!(?transaction, ?e, "failed to encode transaction"); + ConnectionError::Disconnect { + reason: DisconnectReasonCode::UnspecifiedError, + message: "failed to encode transaction".into(), + } + })?; + + // Create a tashi-vertex transaction + let mut vertex_txn = tashi_vertex::Transaction::allocate(der.len()); + vertex_txn.copy_from_slice(&der); + + // Send it + engine.send_transaction(vertex_txn).map_err(|e| { + tracing::error!(?e, "failed to submit transaction to tashi-vertex"); + ConnectionError::Disconnect { + reason: DisconnectReasonCode::ServerShuttingDown, + message: "broker shutting down".into(), + } + })?; } router.transaction(transaction).await; diff --git a/src/mqtt/broker/mod.rs b/src/mqtt/broker/mod.rs index 8307154..b8ff861 100644 --- a/src/mqtt/broker/mod.rs +++ b/src/mqtt/broker/mod.rs @@ -1,4 +1,3 @@ -use std::collections::VecDeque; use std::net::SocketAddr; use std::num::NonZeroUsize; use std::sync::Arc; @@ -16,10 +15,9 @@ use tokio_rustls::rustls; use tokio_util::sync::CancellationToken; use crate::collections::{hash_map, HashMap}; -use tashi_consensus_engine::{Platform, TxnPermit, TxnTryReserveError}; +use tashi_vertex::{Engine, Transaction as TvTransaction}; use connection::Connection; -use rumqttd_protocol::QoS; use crate::cli::run::WsConfig; use crate::config::permissions::PermissionsConfig; @@ -85,9 +83,6 @@ pub struct MqttBroker { clients: Clients, - // this is a vecdeque just to make it fair, it doesn't really *have* to be one. - pending_wills: VecDeque, - /// Store information about a connection awaiting another connection to exit to take it over. /// /// When `key` closes out, `value` takes over the session immediately, @@ -107,12 +102,6 @@ pub struct MqttBroker { router: MqttRouter, } -struct PendingWill { - client_id: ClientId, - client_idx: ClientIndex, - will: Will, -} - struct SessionTakeover { client_id: ClientId, assigned: bool, @@ -153,7 +142,7 @@ struct Shared { password_hasher: PasswordHashingPool, users: UsersConfig, broker_tx: mpsc::Sender, - tce_platform: Option>, + engine: Option>, router: RouterHandle, max_keep_alive: KeepAlive, @@ -209,7 +198,7 @@ impl MqttBroker { let (broker_tx, broker_rx) = mpsc::channel(100); - let tce_platform = tce.as_ref().map(|tce| tce.platform.clone()); + let engine = tce.as_ref().map(|tce| tce.engine.clone()); let router = MqttRouter::start(tce, token.clone(), permissions_config); @@ -220,7 +209,6 @@ impl MqttBroker { websocket, token, clients: Default::default(), - pending_wills: Default::default(), pending_session_takeovers: Default::default(), connections: SlotMap::with_capacity_and_key(256), tasks: JoinSet::new(), @@ -230,7 +218,7 @@ impl MqttBroker { ), users, broker_tx: broker_tx.clone(), - tce_platform, + engine, router: router.handle(), max_keep_alive, }), @@ -246,16 +234,13 @@ impl MqttBroker { let mut shutdown = false; // hack: borrowck limitations means we'd have an overlapping borrow. - let tce_platform = self.shared.tce_platform.clone(); + let _engine = self.shared.engine.clone(); while !shutdown { let tls_accept_fut = OptionFuture::from(self.tls.as_mut().map(|it| it.accept())); let ws_accept_fut = OptionFuture::from(self.websocket.as_mut().map(|it| it.accept())); - let tce_platform_fut = - OptionFuture::from(tce_platform.as_ref().map(|k| k.reserve_tx())); - tokio::select! { _ = self.token.cancelled() => { shutdown = true; @@ -271,8 +256,7 @@ impl MqttBroker { handle_connection_lost( &mut self.inactive_sessions, &mut self.router, - self.shared.tce_platform.as_deref(), - &mut self.pending_wills, + self.shared.engine.as_deref(), data ); @@ -301,12 +285,6 @@ impl MqttBroker { Some(event) = self.inactive_sessions.next_event() => { self.handle_inactive_session_event(event)?; } - Some(Ok(permit)) = tce_platform_fut, if !self.pending_wills.is_empty() => { - // technically a bug if this fails, but it doesn't really matter. - if let Some(will) = self.pending_wills.pop_front() { - dispatch_will(&mut self.router, Some(permit), will.client_id, will.client_idx, will.will) - } - } } } @@ -330,8 +308,7 @@ impl MqttBroker { if let Some(will) = session.last_will { execute_will( &mut self.router, - self.shared.tce_platform.as_deref(), - &mut self.pending_wills, + self.shared.engine.as_deref(), client_id, client_idx, will, @@ -344,8 +321,7 @@ impl MqttBroker { super::session::Event::WillElapsed(_, will) => execute_will( &mut self.router, - self.shared.tce_platform.as_deref(), - &mut self.pending_wills, + self.shared.engine.as_deref(), client_id, client_idx, will, @@ -408,8 +384,7 @@ impl MqttBroker { if let Some(last_will) = store.session.last_will.take() { execute_will( &mut self.router, - self.shared.tce_platform.as_deref(), - &mut self.pending_wills, + self.shared.engine.as_deref(), client_id, client_index, last_will, @@ -512,7 +487,6 @@ impl MqttBroker { mut tasks, mut inactive_sessions, mut router, - mut pending_wills, shared, .. } = self; @@ -527,8 +501,7 @@ impl MqttBroker { handle_connection_lost( &mut inactive_sessions, &mut router, - shared.tce_platform.as_deref(), - &mut pending_wills, + shared.engine.as_deref(), data, ); tracing::info!("{} connections remaining", tasks.len()); @@ -656,8 +629,7 @@ impl Clients { fn handle_connection_lost( inactive_sessions: &mut InactiveSessions, router: &mut MqttRouter, - tce_platform: Option<&Platform>, - pending_wills: &mut VecDeque, + engine: Option<&Engine>, data: ConnectionData, ) { let ConnectionData { @@ -685,42 +657,14 @@ fn handle_connection_lost( return; }; - execute_will( - router, - tce_platform, - pending_wills, - client_id, - client_index, - will, - ) + execute_will(router, engine, client_id, client_index, will) } } } -fn try_reserve_will( - tce_platform: &Platform, - client_id: ClientId, - qos: QoS, -) -> Result>, ()> { - match tce_platform.try_reserve_tx() { - Ok(reservation) => Ok(Some(reservation)), - Err(TxnTryReserveError::Closed) => { - // We need to keep platform around until the broker shuts down... Or we need to store the wills. - // Currently we do neither, and plan on doing the first one. - tracing::error!(%client_id, "Failed to send will due to platform shut down"); - Ok(None) - } - // we can ignore QoS 0 messages if our buffers are full (and this counts). - Err(TxnTryReserveError::Full) if matches!(qos, QoS::AtMostOnce) => { - tracing::debug!("Ignoring QoS 0 will due to full load"); - Ok(None) - } - Err(TxnTryReserveError::Full) => Err(()), - } -} fn dispatch_will( router: &mut MqttRouter, - permit: Option>, + engine: Option<&Engine>, client_id: ClientId, client_idx: ClientIndex, mut will: Will, @@ -730,55 +674,40 @@ fn dispatch_will( tracing::trace!("submitting transaction: {:?}", will.transaction); let transaction = Transaction { - data: TransactionData::Publish(will.transaction), + data: TransactionData::Publish(will.transaction.clone()), }; - let txn = match transaction - .to_der() - .map_err(Into::into) - .and_then(TryInto::try_into) - { - Ok(txn) => txn, - Err(err) => { - tracing::warn!(%client_id, "expiring client will invalid {err}"); - // don't kill the broker. - return; + if let Some(engine) = engine { + match create_tv_transaction(&transaction) { + Ok(tv_txn) => { + if let Err(e) = engine.send_transaction(tv_txn) { + tracing::error!(%client_id, "Failed to submit will transaction: {e}"); + } + } + Err(e) => { + tracing::error!(%client_id, "Failed to encode will transaction: {e}"); + } } - }; - - if let Some(permit) = permit { - permit.send(txn); } - let TransactionData::Publish(transaction) = transaction.data else { - unreachable!() - }; + let TransactionData::Publish(transaction) = transaction.data; router.publish_will(client_idx, transaction); } +fn create_tv_transaction(txn: &Transaction) -> crate::Result { + let der = txn.to_der().map_err(Into::::into)?; + let mut tv_txn = TvTransaction::allocate(der.len()); + tv_txn.copy_from_slice(&der); + Ok(tv_txn) +} + fn execute_will( router: &mut MqttRouter, - tce_platform: Option<&Platform>, - pending_wills: &mut VecDeque, + engine: Option<&Engine>, client_id: ClientId, client_idx: ClientIndex, will: Will, ) { - match tce_platform { - Some(tce_platform) => { - match try_reserve_will(tce_platform, client_id, will.transaction.meta.qos()) { - Ok(None) => {} - Ok(Some(permit)) => { - dispatch_will(router, Some(permit), client_id, client_idx, will) - } - Err(()) => pending_wills.push_back(PendingWill { - client_id, - client_idx, - will, - }), - }; - } - None => dispatch_will(router, None, client_id, client_idx, will), - } + dispatch_will(router, engine, client_id, client_idx, will) } diff --git a/src/mqtt/broker/websocket.rs b/src/mqtt/broker/websocket.rs index fb7e161..fa096b7 100644 --- a/src/mqtt/broker/websocket.rs +++ b/src/mqtt/broker/websocket.rs @@ -2,6 +2,7 @@ use std::mem; use std::net::{IpAddr, SocketAddr}; use std::str; +use crate::flatten_task_result; use axum::async_trait; use axum::extract::rejection::ExtensionRejection; use axum::extract::ws::rejection::WebSocketUpgradeRejection; @@ -15,7 +16,6 @@ use bytes::BytesMut; use color_eyre::eyre; use color_eyre::eyre::WrapErr; use futures::{SinkExt, TryStreamExt}; -use tashi_consensus_engine::flatten_task_result; use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio::task::JoinHandle; diff --git a/src/mqtt/retain.rs b/src/mqtt/retain.rs index fc00d3c..acf8bdf 100644 --- a/src/mqtt/retain.rs +++ b/src/mqtt/retain.rs @@ -9,7 +9,7 @@ use std::collections::{btree_map, BTreeMap}; use std::mem; use std::ops::Bound; use std::sync::Arc; -use tashi_consensus_engine::Timestamp; +pub type Timestamp = u64; slotmap::new_key_type! { struct MessageIndex; } diff --git a/src/mqtt/router.rs b/src/mqtt/router.rs index bfeb17a..5cfbba4 100644 --- a/src/mqtt/router.rs +++ b/src/mqtt/router.rs @@ -3,24 +3,20 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; use std::ops::{Index, IndexMut}; use std::sync::{Arc, OnceLock}; -use std::time::{Duration, Instant, SystemTime}; +use std::time::{Instant, SystemTime}; use bytes::Bytes; -use color_eyre::eyre::WrapErr; use color_eyre::eyre::{self}; -use der::{Decode, Encode}; +use der::Decode; use slotmap::SecondaryMap; use tokio::sync::mpsc; -use tokio::sync::mpsc::error::SendError; use tokio::task; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, Span}; +use tracing::Span; use crate::collections::{HashMap, HashSet}; -use tashi_consensus_engine::{ - CreatorId, Message, MessageStream, Platform, PlatformEvent, RootCertificates, -}; +use tashi_vertex::{Context, Engine, Event, KeyPublic, Message}; use rumqttd_protocol::{QoS, RetainForwardRule, SubscribeReasonCode, UnsubAckReason}; @@ -32,8 +28,8 @@ use crate::mqtt::retain::RetainedMessages; use crate::mqtt::trie::{self, Filter, FilterTrie, TopicName}; use crate::mqtt::{ClientId, ClientIndex, ConnectionId}; use crate::transaction::{ - AddNodeTransaction, BytesAsOctetString, PublishMeta, PublishTrasaction, TimestampSeconds, - Transaction, TransactionData, + BytesAsOctetString, PublishMeta, PublishTrasaction, TimestampSeconds, Transaction, + TransactionData, }; // Tokio's channels allocate in slabs of 32. @@ -71,11 +67,10 @@ pub struct FilterProperties { #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct SubscriptionId(NonZeroU32); +#[derive(Clone)] pub struct TceState { - pub platform: Arc, - pub messages: MessageStream, - pub roots: Option>, - pub add_nodes: mpsc::UnboundedReceiver, + pub engine: Arc, + pub context: Arc, } impl MqttRouter { @@ -424,7 +419,7 @@ struct Subscription { enum PublishOrigin<'a> { System { source: Span }, Local(ClientIndex), - Consensus(&'a CreatorId), + Consensus(&'a KeyPublic), } impl PublishOrigin<'_> { @@ -530,7 +525,7 @@ impl ClientState { struct InvalidSubscriptionKind; use futures::future::OptionFuture; -use futures::TryFutureExt; +// use futures::TryFutureExt; // Experimenting with using free functions instead of methods for less right-drift in core logic. // Methods should be added to `RouterState` or its constituent types @@ -538,12 +533,14 @@ use futures::TryFutureExt; #[tracing::instrument(name = "router", skip(state))] async fn run(mut state: RouterState) -> crate::Result<()> { loop { - let mut tce_msgs_fut = OptionFuture::from(None); - let mut add_nodes = OptionFuture::from(None); + // let mut tce_msgs_fut = OptionFuture::from(None); + // let mut add_nodes = OptionFuture::from(None); + + let mut tce_msg_fut = OptionFuture::from(None); - if let Some(tce) = &mut state.tce { - tce_msgs_fut = Some(tce.messages.next_message()).into(); - add_nodes = Some(tce.add_nodes.recv()).into(); + if let Some(tce) = &state.tce { + let engine = tce.engine.clone(); + tce_msg_fut = Some(async move { engine.recv_message().await }).into(); } tokio::select! { @@ -561,16 +558,29 @@ async fn run(mut state: RouterState) -> crate::Result<()> { msg.expect("BUG: system_rx cannot return None as its Sender is held in a `static`") ); } - Some(res) = tce_msgs_fut => { - let Some(msg) = res.wrap_err("error from MessageStream")? else { - tracing::debug!("Message stream closed; exiting."); - break; - }; - handle_message(&mut state, msg) + Some(res) = tce_msg_fut => { + match res { + Ok(Some(Message::Event(event))) => { + handle_tashi_event(&mut state, event); + } + Ok(Some(Message::SyncPoint(_))) => { + tracing::trace!("received sync point"); + } + Ok(None) => { + tracing::debug!("Tashi Vertex engine stopped; exiting."); + break; + } + Err(e) => { + tracing::error!(?e, "Tashi Vertex engine error"); + break; + } + } } + /* Some(Some(add_node)) = add_nodes => { handle_add_node(&mut state, add_node, false); }, + */ _ = state.token.cancelled() => { break; } @@ -631,10 +641,6 @@ fn handle_command(state: &mut RouterState, client_idx: ClientIndex, command: Rou TransactionData::Publish(publish) => { dispatch(state, publish.into(), PublishOrigin::Local(client_idx)) } - TransactionData::AddNode(add_node) => { - // This really shouldn't be called from here, but there's little harm in doing it. - handle_add_node(state, add_node, false); - } }, RouterCommand::Subscribe(request) => handle_subscribe(state, client_idx, request), RouterCommand::Unsubscribe { packet_id, filters } => { @@ -804,41 +810,21 @@ fn handle_subscribe(state: &mut RouterState, client_idx: ClientIndex, request: S } } -fn handle_message(state: &mut RouterState, message: Message) { - if let Some(tce) = state.tce.as_ref() { - let tce_platform = tce.platform.clone(); - match message { - Message::SyncPoint(sync_point) => { - // TODO: handle sync points - // TODO: serialize retained messages [TG-492] - tracing::debug!(?sync_point, "received sync point"); - } - Message::Event(event) => { - handle_tce_event(state, event, &tce_platform); - } - } - } -} - -#[tracing::instrument(skip_all, fields(creator=%event.creator, timestamp=event.timestamp_created()))] -fn handle_tce_event(state: &mut RouterState, event: PlatformEvent, platform: &Platform) { - let Some(consensus_timestamp) = event.timestamp_finalized else { - return; - }; +fn handle_tashi_event(state: &mut RouterState, event: Event) { + let consensus_timestamp = event.consensus_at(); - // Way too noisy if we don't filter this. - if !event.application_transactions().is_empty() { + if event.transaction_count() > 0 { tracing::trace!( "received event with {} application transactions", - event.application_transactions().len() + event.transaction_count() ); } // TODO: we'll want to check if our own events are coming to consensus for QoS 1 and 2 // and re-send them if not. - for (idx, transaction) in event.application_transactions().iter().enumerate() { - let transaction = match Transaction::from_der(transaction.as_bytes()) { + for (idx, transaction_bytes) in event.transactions().enumerate() { + let transaction = match Transaction::from_der(transaction_bytes) { Ok(txn) => txn, Err(e) => { // This isn't a fatal error. @@ -851,14 +837,16 @@ fn handle_tce_event(state: &mut RouterState, event: PlatformEvent, platform: &Pl TransactionData::Publish(publish) => { let publish = Arc::new(publish); - if &event.creator != platform.creator_id() { - // Locally sent messages would have been dispatched directly - dispatch( - state, - publish.clone(), - PublishOrigin::Consensus(&event.creator), - ); - } + // FIXME: we need a way to check if we created this event to avoid re-dispatching + // local messages. + // if event.creator() != platform.creator_id() { + // Locally sent messages would have been dispatched directly + dispatch( + state, + publish.clone(), + PublishOrigin::Consensus(event.creator()), + ); + // } // Only for TCE messages because we need to have total ordering // to know which retained message is the latest for a given topic. @@ -868,14 +856,9 @@ fn handle_tce_event(state: &mut RouterState, event: PlatformEvent, platform: &Pl .retained_messages .insert(consensus_timestamp, idx, publish); } else { - // Empty payload means clear the retained message. - state.retained_messages.remove(&publish.topic); } } } - TransactionData::AddNode(add_node) => { - handle_add_node(state, add_node, true); - } } } } @@ -918,64 +901,6 @@ fn handle_system_command(state: &mut RouterState, command: SystemCommand) { } } -#[tracing::instrument(skip_all, fields(key=%add_node.key, addr=%add_node.socket_addr.0, from_consensus))] -fn handle_add_node(state: &mut RouterState, add_node: AddNodeTransaction, from_consensus: bool) { - let Some(tce) = &state.tce else { - tracing::warn!("BUG: received AddNode with no TCE instance configured"); - return; - }; - - let Some(roots) = &tce.roots else { - tracing::debug!("rejecting AddNode with no TLS roots configured"); - return; - }; - - if let Err(e) = roots.verify_chain(&add_node.certs) { - tracing::debug!("rejecting AddNode with invalid TLS roots: {e:?}"); - return; - } - - let key = add_node.key.clone(); - let addr = add_node.socket_addr.0; - - if !from_consensus { - let platform = tce.platform.clone(); - - // If the TCE queue is full, don't block the router task. - // - // FIXME: this is technically unbounded so an attacker could theoretically DoS us - // by repeatedly connecting with new keys, but they'd have to have a valid TLS root cert - // which means they *should* be a trustworthy peer to begin with. - tokio::spawn( - async move { - let permit = platform.reserve_tx().await.wrap_err("TCE shutting down")?; - - let transaction = Transaction { - data: TransactionData::AddNode(add_node), - } - .to_der() - .wrap_err("error encoding transaction")?; - - permit.send(transaction.try_into().wrap_err("transaction too large")?); - - eyre::Ok(()) - } - .inspect_err(|e| tracing::warn!("error sending AddNodeTransaction to cluster: {e:?}")) - .instrument(tracing::debug_span!("send_txn")), - ); - } - - tracing::info!("voting to add node"); - - if tce - .platform - .vote_add_node(key, addr, Duration::from_secs(60)) - .is_err() - { - tracing::debug!("TCE shutting down"); - } -} - // `PublishTransaction` wrapped in `Arc` for cheap clones. // This is a potential candidate for `triomphe::Arc` but `PublishTransaction` is so large // that it dwarfs the extra machine word for the weak count. @@ -1151,18 +1076,18 @@ pub fn system_publish(topic: impl Into, message: impl Into) { let res = system_tx.send(SystemCommand::Publish { source, txn: PublishTrasaction { - topic, + topic: topic.clone(), meta: PublishMeta::new(QoS::AtMostOnce, false, false), - payload: BytesAsOctetString(message), + payload: BytesAsOctetString(message.into()), timestamp_received: TimestampSeconds::now(), properties: None, }, }); - if let Err(SendError(SystemCommand::Publish { txn, .. })) = res { + if let Err(e) = res { tracing::debug!( - topic = txn.topic, - "Attempted to publish to system topic, but MqttRouter is dead" + topic = parsed_topic.root(), + "Attempting to publish to system command channel after MqttRouter is shut down: {e}" ); } } diff --git a/src/mqtt/session.rs b/src/mqtt/session.rs index 4ced015..4fd7ce8 100644 --- a/src/mqtt/session.rs +++ b/src/mqtt/session.rs @@ -91,7 +91,7 @@ impl InactiveSessions { Some(Event::Expiration(client_id, inactive.session.session)) }, Some(client_id) = self.will_expirations.next() => { - let client_id = client_id.into_inner(); + let client_id = client_id.into_inner(); // this happening implies a non-local bug, it isn't trivial to ignore (it would require having the event polling be in a loop so that this could be skipped... Which would be reasonable if it wasn't already a bug) let inactive = self.sessions.get_mut(&client_id).expect("BUG: will expiration not removed when session expired/claimed"); let will = inactive.session.session.last_will.take().expect("BUG: Will claimed and not removed"); diff --git a/src/transaction.rs b/src/transaction.rs index d1ee2f0..41dca2b 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -5,8 +5,8 @@ use bytes::Bytes; use der::asn1::{BitStringRef, OctetString, OctetStringRef, Utf8StringRef}; use der::{Decode, Encode, Header, Length, Reader, Tag, TagNumber, Writer}; use rumqttd_protocol::QoS; -use tashi_consensus_engine::protocol::Endpoint; -use tashi_consensus_engine::{Certificate, PublicKey}; +// use tashi_consensus_engine::protocol::Endpoint; +// use tashi_consensus_engine::{Certificate, PublicKey}; #[derive(der::Sequence, Debug)] pub struct Transaction { @@ -18,13 +18,11 @@ pub struct Transaction { pub enum TransactionData { #[asn1(context_specific = "2", constructed = "true")] Publish(PublishTrasaction), - #[asn1(context_specific = "3", constructed = "true")] - AddNode(AddNodeTransaction), } // Transcoding to DER was chosen so that we are not baking-in a specific version of the MQTT protocol. /// DER mapping of [`rumqttd::protocol::Packet::Publish`]. -#[derive(der::Sequence, Debug, PartialEq, Eq)] +#[derive(der::Sequence, Debug, PartialEq, Eq, Clone)] pub struct PublishTrasaction { pub topic: String, pub meta: PublishMeta, @@ -46,7 +44,7 @@ pub struct PublishTrasaction { /// /// `topic_alias` and `subscription_identifiers` are omitted as they are only used between /// a client and a broker. -#[derive(der::Sequence, Debug, PartialEq, Eq)] +#[derive(der::Sequence, Debug, PartialEq, Eq, Clone)] pub struct PublishTransactionProperties { // Note: these match the tag bytes used by the MQTT protocol, where possible. #[asn1(context_specific = "1", optional = "true")] @@ -77,13 +75,7 @@ pub struct PublishTransactionProperties { pub user_properties: Option, } -#[derive(der::Sequence, Debug, PartialEq, Eq)] -pub struct AddNodeTransaction { - pub socket_addr: Endpoint, - pub key: PublicKey, - pub certs: Vec, -} - +/* impl TryFrom for tashi_consensus_engine::ApplicationTransaction { type Error = color_eyre::eyre::Error; @@ -91,6 +83,7 @@ impl TryFrom for tashi_consensus_engine::ApplicationTransaction { value.to_der()?.try_into() } } +*/ // Layout: // 7 dup (1 bit) diff --git a/tests/Dockerfile b/tests/Dockerfile index 3a07fd7..a279665 100644 --- a/tests/Dockerfile +++ b/tests/Dockerfile @@ -1,5 +1,7 @@ FROM rust:1-bookworm +RUN apt-get update && apt-get install -y cmake build-essential && rm -rf /var/lib/apt/lists/* + WORKDIR /home/foxmq # The minimum number of files required to allow `cargo fetch` to cache all dependencies. @@ -31,7 +33,7 @@ RUN --mount=type=ssh --mount=target=docker-util/,source=docker-util/ bash docker RUN --mount=type=ssh cargo fetch # Build the biggest deps in a separately cached layer. -RUN cargo build --release -p tokio -p tracing -p tashi-consensus-engine +RUN cargo build --release -p tokio -p tracing -p tashi-vertex COPY ./ .