diff --git a/rust/.gitignore b/rust/.gitignore index 91712988..a07dfa21 100644 --- a/rust/.gitignore +++ b/rust/.gitignore @@ -4,4 +4,4 @@ .DS_Store .idea .vscode -target +/target diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 83792a94..31ec6e16 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -32,21 +32,33 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" [[package]] name = "atomic_refcell" -version = "0.1.13" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c" +checksum = "21e4227379beff4205943696e6c3e0cd809bacdf3f0edd6e3dd153e2269571a4" [[package]] name = "autocfg" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" + +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bindgen" @@ -65,15 +77,15 @@ dependencies = [ "quote", "regex", "rustc-hash", - "shlex", + "shlex 1.3.0", "syn", ] [[package]] name = "bitflags" -version = "2.10.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" [[package]] name = "bitstream-io" @@ -84,20 +96,29 @@ dependencies = [ "no_std_io2", ] +[[package]] +name = "block2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5" +dependencies = [ + "objc2", +] + [[package]] name = "bumpalo" -version = "3.19.1" +version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" [[package]] name = "cc" -version = "1.2.49" +version = "1.2.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90583009037521a116abf44494efecd645ba48b6622457080f080b85544e2215" +checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f" dependencies = [ "find-msvc-tools", - "shlex", + "shlex 2.0.1", ] [[package]] @@ -111,9 +132,9 @@ dependencies = [ [[package]] name = "cfg-expr" -version = "0.20.5" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21be0e1ce6cdb2ee7fff840f922fb04ead349e5cfb1e750b769132d44ce04720" +checksum = "fb693542bcafa528e198be0ebd9d3632ca5b7c93dbe7237460e199910835997c" dependencies = [ "smallvec", "target-lexicon", @@ -125,11 +146,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" -version = "0.4.42" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", "num-traits", @@ -149,9 +176,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.53" +version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e340e012a1bf4935f5282ed1436d1489548e8f72308207ea5df0e23d2d03f8" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" dependencies = [ "clap_builder", "clap_derive", @@ -159,9 +186,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.53" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76b5d13eaa18c901fd2f7fca939fefe3a0727a953561fefdf3b2922b8569d00" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ "anstyle", "clap_lex", @@ -169,9 +196,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.49" +version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" dependencies = [ "heck", "proc-macro2", @@ -181,15 +208,15 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.6" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "cmake" -version = "0.1.56" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b042e5d8a74ae91bb0961acd039822472ec99f8ab0948cbf6d1369588f8be586" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" dependencies = [ "cc", ] @@ -256,11 +283,34 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "ctrlc" +version = "3.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162" +dependencies = [ + "dispatch2", + "nix", + "windows-sys", +] + +[[package]] +name = "dispatch2" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38" +dependencies = [ + "bitflags", + "block2", + "libc", + "objc2", +] + [[package]] name = "either" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" [[package]] name = "equivalent" @@ -270,30 +320,36 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + +[[package]] +name = "foldhash" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" [[package]] name = "futures-channel" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", ] [[package]] name = "futures-core" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" [[package]] name = "futures-executor" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" dependencies = [ "futures-core", "futures-task", @@ -302,9 +358,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", @@ -319,34 +375,34 @@ checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" [[package]] name = "futures-task" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" [[package]] name = "futures-util" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", "futures-macro", "futures-task", "pin-project-lite", - "pin-utils", "slab", ] [[package]] name = "getrandom" -version = "0.3.4" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" dependencies = [ "cfg-if", "libc", "r-efi", "wasip2", + "wasip3", ] [[package]] @@ -448,9 +504,9 @@ dependencies = [ [[package]] name = "gst-plugin-version-helper" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a68a894ef2d738054b950e1dbef5d9012b63fd968d4d32dbccd31bd8d8d4b219" +checksum = "94668bc2592732b8c2b653668ae41211d45988fb61264888b9c2d545d4bd826d" dependencies = [ "chrono", "toml_edit", @@ -458,9 +514,9 @@ dependencies = [ [[package]] name = "gstreamer" -version = "0.24.4" +version = "0.24.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bed73742c5d54cb48533be608b67d89f96e1ebbba280be7823f1ef995e3a9d7" +checksum = "1e8251db223ca38d9aefaf3d19f6f11581a9123cd12dacebd8b9e182da965023" dependencies = [ "cfg-if", "futures-channel", @@ -567,9 +623,9 @@ dependencies = [ [[package]] name = "gstreamer-sys" -version = "0.24.4" +version = "0.24.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d88630697e757c319e7bcec7b13919ba80492532dd3238481c1c4eee05d4904" +checksum = "b5d37c1a599ae57b8186948bd5699f2dbfc044baea9d400228b489a85bcf2759" dependencies = [ "cfg-if", "glib-sys", @@ -610,9 +666,18 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.16.1" +version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + +[[package]] +name = "hashbrown" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" [[package]] name = "heck" @@ -622,9 +687,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "iana-time-zone" -version = "0.1.64" +version = "0.1.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33e57f83510bb73707521ebaffa789ec8caf86f9657cad665b092b581d40e9fb" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -644,14 +709,22 @@ dependencies = [ "cc", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "indexmap" -version = "2.12.1" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.17.1", + "serde", + "serde_core", ] [[package]] @@ -674,16 +747,18 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.15" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "js-sys" -version = "0.3.83" +version = "0.3.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "464a3709c7f55f1f721e5389aa6ea4e3bc6aba669353300af094b29ffbdde1d8" +checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11" dependencies = [ + "cfg-if", + "futures-util", "once_cell", "wasm-bindgen", ] @@ -703,11 +778,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" -version = "0.2.178" +version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] name = "libloading" @@ -721,9 +802,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.29" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +checksum = "616ec5685824bcc94416c6d4a7a446eea774a31efd7062c8480ba6fd06d7a6e5" [[package]] name = "matchers" @@ -736,9 +817,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.8.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +checksum = "6b947ae49db0d222b1dbc6b113ce7248a3fc3a6ca21b696717bfc000ba4484d8" [[package]] name = "minimal-lexical" @@ -756,7 +837,9 @@ checksum = "956787520e75e9bd233246045d19f42fb73242759cc57fba9611d940ae96d4b0" name = "mxl" version = "0.1.0" dependencies = [ + "base64", "clap", + "ctrlc", "libloading", "mxl-sys", "serde", @@ -776,11 +859,23 @@ dependencies = [ "libloading", ] +[[package]] +name = "nix" +version = "0.31.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf20d2fde8ff38632c426f1165ed7436270b44f199fc55284c38276f9db47c3d" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "no_std_io2" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b51ed7824b6e07d354605f4abb3d9d300350701299da96642ee084f5ce631550" +checksum = "418abd1b6d34fbf6cae440dc874771b0525a604428704c76e48b29a5e67b8003" dependencies = [ "memchr", ] @@ -832,11 +927,26 @@ dependencies = [ "autocfg", ] +[[package]] +name = "objc2" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a12a8ed07aefc768292f076dc3ac8c48f3781c8f2d5851dd3d98950e8c5a89f" +dependencies = [ + "objc2-encode", +] + +[[package]] +name = "objc2-encode" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" + [[package]] name = "once_cell" -version = "1.21.3" +version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "option-operations" @@ -849,27 +959,21 @@ dependencies = [ [[package]] name = "pastey" -version = "0.2.1" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b867cad97c0791bbd3aaa6472142568c6c9e8f71937e98379f584cfb0cf35bec" +checksum = "2ee67f1008b1ba2321834326597b8e186293b049a023cdef258527550b9935b4" [[package]] name = "pin-project-lite" -version = "0.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" - -[[package]] -name = "pin-utils" -version = "0.1.0" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" [[package]] name = "pkg-config" -version = "0.3.32" +version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" [[package]] name = "prettyplease" @@ -883,42 +987,42 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "3.4.0" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" +checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" dependencies = [ "toml_edit", ] [[package]] name = "proc-macro2" -version = "1.0.103" +version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.42" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" dependencies = [ "proc-macro2", ] [[package]] name = "r-efi" -version = "5.3.0" +version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" [[package]] name = "regex" -version = "1.12.2" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" dependencies = [ "aho-corasick", "memchr", @@ -928,9 +1032,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" dependencies = [ "aho-corasick", "memchr", @@ -939,15 +1043,15 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.8" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" [[package]] name = "rustc-hash" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" [[package]] name = "rustversion" @@ -956,10 +1060,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" [[package]] -name = "ryu" -version = "1.0.20" +name = "semver" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" [[package]] name = "serde" @@ -993,22 +1097,22 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.145" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "itoa", "memchr", - "ryu", "serde", "serde_core", + "zmij", ] [[package]] name = "serde_spanned" -version = "1.0.4" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776" +checksum = "6662b5879511e06e8999a8a235d848113e942c9124f211511b16466ee2995f26" dependencies = [ "serde_core", ] @@ -1028,11 +1132,17 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "shlex" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" + [[package]] name = "slab" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" [[package]] name = "smallvec" @@ -1059,9 +1169,9 @@ dependencies = [ [[package]] name = "system-deps" -version = "7.0.7" +version = "7.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c8f33736f986f16d69b6cb8b03f55ddcad5c41acc4ccc39dd88e84aa805e7f" +checksum = "396a35feb67335377e0251fcbc1092fc85c484bd4e3a7a54319399da127796e7" dependencies = [ "cfg-expr", "heck", @@ -1072,24 +1182,24 @@ dependencies = [ [[package]] name = "target-lexicon" -version = "0.13.3" +version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df7f62577c25e07834649fc3b39fafdc597c0a3527dc1c60129201ccfcbaa50c" +checksum = "adb6935a6f5c20170eeceb1a3835a49e12e19d792f6dd344ccc76a985ca5a6ca" [[package]] name = "thiserror" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", @@ -1107,9 +1217,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.9.9+spec-1.0.0" +version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb5238e643fc34a1d5d7e753e1532a91912d74b63b92b3ea51fde8d1b7bc79dd" +checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee" dependencies = [ "indexmap", "serde_core", @@ -1122,18 +1232,18 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.7.4+spec-1.0.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe3cea6b2aa3b910092f6abd4053ea464fab5f9c170ba5e9a6aead16ec4af2b6" +checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7" dependencies = [ "serde_core", ] [[package]] name = "toml_edit" -version = "0.23.10+spec-1.0.0" +version = "0.25.12+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" +checksum = "d2153edc6955a6c354fad8f5efd38b6a8769bdccf9fe50f8e1329f81b0baa5d7" dependencies = [ "indexmap", "toml_datetime", @@ -1143,24 +1253,24 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.0.5+spec-1.0.0" +version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c03bee5ce3696f31250db0bbaff18bc43301ce0e8db2ed1f07cbb2acf89984c" +checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ "winnow", ] [[package]] name = "toml_writer" -version = "1.0.5+spec-1.0.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9cd6190959dce0994aa8970cd32ab116d1851ead27e866039acaf2524ce44fa" +checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db" [[package]] name = "tracing" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "log", "pin-project-lite", @@ -1181,9 +1291,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.35" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a04e24fab5c89c6a36eb8558c9656f30d81de51dfa4d3b45f26b21d61fa0a6c" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", "valuable", @@ -1202,9 +1312,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.22" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ "matchers", "nu-ansi-term", @@ -1220,9 +1330,9 @@ dependencies = [ [[package]] name = "tracing-test" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +checksum = "19a4c448db514d4f24c5ddb9f73f2ee71bfb24c526cf0c570ba142d1119e0051" dependencies = [ "tracing-core", "tracing-subscriber", @@ -1231,9 +1341,9 @@ dependencies = [ [[package]] name = "tracing-test-macro" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +checksum = "ad06847b7afb65c7866a36664b75c40b895e318cea4f71299f013fb22965329d" dependencies = [ "quote", "syn", @@ -1241,9 +1351,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.22" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-width" @@ -1251,11 +1361,17 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "uuid" -version = "1.19.0" +version = "1.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" +checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" dependencies = [ "getrandom", "js-sys", @@ -1277,18 +1393,27 @@ checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e" [[package]] name = "wasip2" -version = "1.0.1+wasi-0.2.4" +version = "1.0.3+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.57.1", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", ] [[package]] name = "wasm-bindgen" -version = "0.2.106" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd" +checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409" dependencies = [ "cfg-if", "once_cell", @@ -1299,9 +1424,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.106" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48cb0d2638f8baedbc542ed444afc0644a29166f1595371af4fecf8ce1e7eeb3" +checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1309,9 +1434,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.106" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cefb59d5cd5f92d9dcf80e4683949f15ca4b511f4ac0a6e14d4e1ac60c6ecd40" +checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e" dependencies = [ "bumpalo", "proc-macro2", @@ -1322,13 +1447,47 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.106" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc538057e648b67f72a982e708d485b2efa771e1ac05fec311f9f63e5800db4" +checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437" dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -1399,15 +1558,109 @@ dependencies = [ [[package]] name = "winnow" -version = "0.7.14" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" +checksum = "0592e1c9d151f854e6fd382574c3a0855250e1d9b2f99d9281c6e6391af352f1" dependencies = [ "memchr", ] [[package]] name = "wit-bindgen" -version = "0.46.0" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + +[[package]] +name = "zmij" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/rust/mxl-sys/Cargo.toml b/rust/mxl-sys/Cargo.toml index 0fe4555c..ddd01476 100644 --- a/rust/mxl-sys/Cargo.toml +++ b/rust/mxl-sys/Cargo.toml @@ -17,3 +17,4 @@ cmake = "0.1.54" [features] mxl-not-built = [] +mxl-fabrics-ofi = [] diff --git a/rust/mxl-sys/build.rs b/rust/mxl-sys/build.rs index ca418c01..44028ab3 100644 --- a/rust/mxl-sys/build.rs +++ b/rust/mxl-sys/build.rs @@ -62,6 +62,61 @@ fn get_bindgen_specs() -> BindgenSpecs { } } +fn get_bindgen_specs_fabrics() -> BindgenSpecs { + let header = "wrapper-fabrics.h".to_string(); + + let manifest_dir = + PathBuf::from(env::var("CARGO_MANIFEST_DIR").expect("failed to get current directory")); + let repo_root = manifest_dir.parent().unwrap().parent().unwrap(); + let mut includes_dirs = vec![ + repo_root + .join("lib") + .join("include") + .to_string_lossy() + .to_string(), + repo_root + .join("lib") + .join("fabrics") + .join("include") + .to_string_lossy() + .to_string(), + ]; + if cfg!(not(feature = "mxl-not-built")) { + let out_dir = PathBuf::from(std::env::var("OUT_DIR").unwrap()); + let build_version_dir = out_dir.join("include").to_string_lossy().to_string(); + + includes_dirs.push(build_version_dir); + + // Rebuild if any file in lib/ changes + let lib_root = repo_root.join("lib"); + println!("cargo:rerun-if-changed={}", lib_root.display()); + + let dst = cmake::Config::new(repo_root) + .generator("Ninja") + .configure_arg("--preset") + .configure_arg(BUILD_VARIANT) + .configure_arg("-B") + .configure_arg(out_dir.join("build")) + .define("MXL_ENABLE_FABRICS_OFI", "ON") + .define("BUILD_DOCS", "OFF") + .define("BUILD_TESTS", "OFF") + .define("BUILD_TOOLS", "OFF") + .build(); + + println!( + "cargo:rustc-link-search={}", + dst.join("lib/fabrics/ofi").display() + ); + println!("cargo:rustc-link-lib=mxl"); + println!("cargo:rustc-link-lib=mxl-fabrics"); + } + + BindgenSpecs { + header, + includes_dirs, + } +} + fn main() { let bindgen_specs = get_bindgen_specs(); for include_dir in &bindgen_specs.includes_dirs { @@ -89,6 +144,28 @@ fn main() { bindings .write_to_file(out_path.join("bindings.rs")) .expect("Could not write bindings"); + + if cfg!(feature = "mxl-fabrics-ofi") { + let fabrics_bindings = bindgen::builder() + .clang_args( + get_bindgen_specs_fabrics() + .includes_dirs + .iter() + .map(|dir| format!("-I{dir}")), + ) + .header("wrapper-fabrics.h") + .derive_default(true) + .derive_debug(true) + .prepend_enum_name(false) + .dynamic_library_name("libmxlfabrics") + .dynamic_link_require_all(false) + .parse_callbacks(Box::new(CB)) + .generate() + .unwrap(); + fabrics_bindings + .write_to_file(out_path.join("fabrics_bindings.rs")) + .expect("Could not write fabrics bindings"); + } } #[derive(Debug)] diff --git a/rust/mxl-sys/src/lib.rs b/rust/mxl-sys/src/lib.rs index 86c46c47..020045fa 100644 --- a/rust/mxl-sys/src/lib.rs +++ b/rust/mxl-sys/src/lib.rs @@ -17,3 +17,8 @@ extern crate libloading; include!(concat!(env!("OUT_DIR"), "/bindings.rs")); + +#[cfg(feature = "mxl-fabrics-ofi")] +pub mod fabrics { + include!(concat!(env!("OUT_DIR"), "/fabrics_bindings.rs")); +} diff --git a/rust/mxl-sys/wrapper-fabrics.h b/rust/mxl-sys/wrapper-fabrics.h new file mode 100644 index 00000000..2e74e3cf --- /dev/null +++ b/rust/mxl-sys/wrapper-fabrics.h @@ -0,0 +1,4 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +#include "mxl/fabrics.h" diff --git a/rust/mxl/Cargo.toml b/rust/mxl/Cargo.toml index 951770af..91a853f5 100644 --- a/rust/mxl/Cargo.toml +++ b/rust/mxl/Cargo.toml @@ -21,6 +21,13 @@ serde.workspace = true clap.workspace = true serde_json.workspace = true tracing-subscriber.workspace = true +ctrlc = "3" +base64 = "0.22" [features] mxl-not-built = ["mxl-sys/mxl-not-built"] +mxl-fabrics-ofi = ["mxl-sys/mxl-fabrics-ofi"] + +[[example]] +name = "fabrics-demo" +required-features = ["mxl-fabrics-ofi"] diff --git a/rust/mxl/examples/fabrics-demo.rs b/rust/mxl/examples/fabrics-demo.rs new file mode 100644 index 00000000..e36d20ab --- /dev/null +++ b/rust/mxl/examples/fabrics-demo.rs @@ -0,0 +1,471 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +mod common; + +use std::{ + sync::{ + Arc, + atomic::{self, AtomicBool}, + }, + time::Duration, +}; + +use clap::{Parser, Subcommand}; + +use base64::{Engine as _, prelude::BASE64_STANDARD}; + +use mxl::{ + Error, FlowConfigInfo, FlowInfo, FlowReader, FlowWriter, GrainReader, GrainWriter, + MxlFabricsApi, MxlInstance, SamplesReader, SamplesWriter, + config::{get_mxl_fabrics_ofi_so_path, get_mxl_so_path}, + fabrics::{ + EndpointAddress, FabricsInstance, TargetInfo, + initiator::{self, Initiator}, + target::{self, Target}, + }, +}; + +#[derive(Debug, Parser)] +#[command( + version = clap::crate_version!(), + author = clap::crate_authors!(), + subcommand_required = true, + arg_required_else_help = true +)] +pub struct Cli { + #[arg(short, long, help = "The MXL domain directory")] + pub domain: String, + + #[arg( + short, + long, + default_value = "tcp", + help = "The fabrics provider. One of (tcp, verbs or efa)." + )] + pub provider: String, + + #[arg( + short, + long, + help = "This corresponds to the interface identifier of the fabrics endpoint, it can also be a logical address. This can be seen as the bind address when using sockets." + )] + pub node: String, + + #[arg( + short, + long, + help = "This corresponds to a service identifier for the fabrics endpoint. This can be seen as the bind port when using sockets." + )] + pub service: String, + + #[command(subcommand)] + pub command: Command, +} + +#[derive(Debug, Subcommand)] +pub enum Command { + /// Run as a receiver (fabrics target + flow writer). + Target { + #[arg( + long, + help = "The JSON file which contains the NMOS Flow configuration." + )] + flow_file: String, + }, + /// Run as an initiator (flow reader + fabrics initiator). + Initiator { + #[arg(long, help = "The flow ID to read from.")] + flow_id: String, + #[arg( + long, + help = "The target information to send to. Start the target first and paste the printed target info here." + )] + target_info: String, + }, +} + +struct TargetEndpoint<'a> { + _instance: &'a MxlInstance, + flow_config: FlowConfigInfo, + flow_writer: FlowWriter, + target: Target, +} + +impl<'a> TargetEndpoint<'a> { + pub fn new( + instance: &'a MxlInstance, + fabrics_api: &Arc, + flow_file: &str, + cli: &Cli, + ) -> Result<(Self, TargetInfo), mxl::Error> { + let flow_config_str = std::fs::read_to_string(flow_file).expect("Failed to read flow file"); + + let (flow_writer, flow_config, _) = instance.create_flow_writer(&flow_config_str, None)?; + + let fabrics_instance = instance.create_fabrics_instance(fabrics_api)?; + + let provider = fabrics_instance.provider_from_str(&cli.provider)?; + + let target_config = target::Config::new( + EndpointAddress { + node: Some(&cli.node), + service: Some(&cli.service), + }, + provider, + &flow_writer, + ); + + let target = fabrics_instance.create_target()?; + let (target, target_info) = target.setup(&target_config)?; + + Ok(( + Self { + _instance: instance, + flow_config, + flow_writer, + target, + }, + target_info, + )) + } + + pub fn run(self, running: Arc) -> Result<(), mxl::Error> { + match self.target.specialize(&self.flow_config) { + target::Either::Grain(target) => { + Self::run_discrete(target, self.flow_writer.to_grain_writer()?, running)?; + } + target::Either::Sample(target) => { + Self::run_continuous(target, self.flow_writer.to_samples_writer()?, running)?; + } + } + Ok(()) + } + + fn run_discrete( + target: Target, + writer: GrainWriter, + running: Arc, + ) -> Result<(), mxl::Error> { + while running.load(atomic::Ordering::SeqCst) { + match target.read(Duration::from_millis(200)) { + Ok(read_result) => { + let grain = writer.open_grain(read_result.grain_index)?; + let valid_slices = grain.valid_slices(); + grain.commit(valid_slices)?; + + tracing::debug!( + "Commited grain index {}, slice index {}.", + read_result.grain_index, + valid_slices + ); + } + Err(mxl::Error::NotReady) => { + continue; + } + Err(mxl::Error::Interrupted) => { + tracing::info!("Interrupted, exiting."); + break; + } + Err(e) => { + return Err(e); + } + } + } + Ok(()) + } + + fn run_continuous( + target: Target, + writer: SamplesWriter, + running: Arc, + ) -> Result<(), mxl::Error> { + while running.load(atomic::Ordering::SeqCst) { + match target.read(Duration::from_millis(200)) { + Ok(read_result) => { + let samples = writer.open_samples(read_result.head_index, read_result.count)?; + samples.commit()?; + + tracing::debug!( + "Commited samples, head index {}, count {}.", + read_result.head_index, + read_result.count + ); + } + Err(mxl::Error::NotReady) => { + continue; + } + Err(mxl::Error::Interrupted) => { + tracing::info!("Interrupted, exiting."); + break; + } + Err(e) => { + return Err(e); + } + } + } + Ok(()) + } +} + +struct InitiatorEndpoint<'a> { + instance: &'a MxlInstance, + fabrics_instance: FabricsInstance, + flow_reader: FlowReader, + initiator: Initiator, +} + +impl<'a> InitiatorEndpoint<'a> { + pub fn new( + instance: &'a MxlInstance, + fabrics_api: &Arc, + flow_id: &str, + cli: &Cli, + ) -> Result { + let flow_reader = instance.create_flow_reader(flow_id)?; + + let fabrics_instance = instance.create_fabrics_instance(fabrics_api)?; + + let initiator = fabrics_instance.create_initiator()?; + + let provider = fabrics_instance.provider_from_str(&cli.provider)?; + + let initiator_config = initiator::Config::new( + EndpointAddress { + node: Some(&cli.node), + service: Some(&cli.service), + }, + provider, + &flow_reader, + ); + + let initiator = initiator.setup(&initiator_config)?; + + Ok(Self { + instance, + fabrics_instance, + initiator, + flow_reader, + }) + } + + pub fn run(self, target_info_str: &str, running: Arc) -> Result<(), mxl::Error> { + let flow_info = self.flow_reader.get_info()?; + + let target_info_str = + String::from_utf8(BASE64_STANDARD.decode(target_info_str).map_err(|e| { + Error::Other(format!("Failed to decode target_info from base64: {e}")) + })?) + .map_err(|e| Error::Other(format!("Decoded target_info is not valid UTF-8: {e}")))?; + + let target_info = self + .fabrics_instance + .target_info_from_str(&target_info_str)?; + + match self.initiator.specialize(&flow_info.config) { + initiator::Either::Grain(initiator) => { + initiator.add_target(&target_info)?; + // Wait to be connected + loop { + if !running.load(atomic::Ordering::SeqCst) { + return Ok(()); + } + + if initiator.make_progress(Duration::from_millis(250)).is_ok() { + break; + } + } + Self::run_discrete( + self.instance, + initiator, + self.flow_reader.to_grain_reader()?, + &flow_info, + running, + )?; + } + initiator::Either::Samples(initiator) => { + initiator.add_target(&target_info)?; + // Wait to be connected + loop { + if !running.load(atomic::Ordering::SeqCst) { + return Ok(()); + } + + if initiator.make_progress(Duration::from_millis(250)).is_ok() { + break; + } + } + Self::run_continuous( + self.instance, + initiator, + self.flow_reader.to_samples_reader()?, + &flow_info, + running, + )?; + } + } + + tracing::info!("Stopping as requested."); + + Ok(()) + } + + fn run_discrete( + instance: &MxlInstance, + initiator: Initiator, + reader: GrainReader, + flow_info: &FlowInfo, + running: Arc, + ) -> Result<(), mxl::Error> { + let rate = flow_info.config.common().grain_rate()?; + let mut index = instance.get_current_index(&rate); + while running.load(atomic::Ordering::SeqCst) { + match reader.get_complete_grain(index, Duration::from_millis(200)) { + Ok(grain) => { + match initiator.transfer(index, 0, grain.total_slices) { + Err(Error::NotReady) => { + // Retry the same grain + continue; + } + Err(e) => { + return Err(e); + } + Ok(_) => {} + }; + + // Transfer was posted, now wait for completion + loop { + match initiator.make_progress(Duration::from_millis(10)) { + Ok(_) => { + // we're done exiting the loop + break; + } + Err(Error::Interrupted) => { + return Ok(()); + } + Err(Error::NotReady) => { + // Retry + continue; + } + Err(e) => { + return Err(e); + } + } + } + index += 1; + } + Err(Error::OutOfRangeTooLate) => { + // We are too late, move to the next grain + index = instance.get_current_index(&rate); + } + Err(Error::OutOfRangeTooEarly) => { + // We are too early, retry the same grain + } + Err(e) => { + tracing::error!("Error reading from flow: {}.", e); + } + } + } + Ok(()) + } + + fn run_continuous( + instance: &MxlInstance, + initiator: Initiator, + reader: SamplesReader, + flow_info: &FlowInfo, + running: Arc, + ) -> Result<(), mxl::Error> { + let rate = flow_info.config.common().grain_rate()?; + let count = flow_info.config.common().max_sync_batch_size_hint() as usize; + let mut index = instance.get_current_index(&rate); + while running.load(atomic::Ordering::SeqCst) { + match reader.get_samples_non_blocking(index, count) { + Ok(_sample) => { + match initiator.transfer(index, count) { + Err(Error::NotReady) => { + // Retry the same grain + continue; + } + Err(e) => { + return Err(e); + } + Ok(_) => {} + }; + + // Transfer was posted, now wait for completion + loop { + match initiator.make_progress(Duration::from_millis(10)) { + Ok(_) => { + // we're done exiting the loop + break; + } + Err(Error::Interrupted) => { + return Ok(()); + } + Err(Error::NotReady) => { + // Retry + continue; + } + Err(e) => { + return Err(e); + } + } + } + index += 1; + } + Err(Error::OutOfRangeTooLate) => { + // We are too late, move to the next grain + index = instance.get_current_index(&rate); + } + Err(Error::OutOfRangeTooEarly) => { + // We are too early, retry the same grain + } + Err(e) => { + tracing::error!("Error reading from flow: {}.", e); + } + } + } + Ok(()) + } +} + +fn main() -> Result<(), mxl::Error> { + common::setup_logging(); + + let cli = Cli::parse(); + + let running = Arc::new(AtomicBool::new(true)); + let running2 = running.clone(); + ctrlc::set_handler(move || { + running2.store(false, atomic::Ordering::SeqCst); + }) + .expect("Error setting Ctrl-C handler"); + + let api = mxl::load_api(get_mxl_so_path())?; + + let fabrics_api = mxl::load_fabrics_api(get_mxl_fabrics_ofi_so_path())?; + + let instance = mxl::MxlInstance::new(api, &cli.domain, "")?; + + match &cli.command { + Command::Initiator { + flow_id, + target_info, + } => { + let initiator = InitiatorEndpoint::new(&instance, &fabrics_api, flow_id, &cli)?; + initiator.run(target_info, running)?; + } + Command::Target { flow_file } => { + let (target, target_info) = + TargetEndpoint::new(&instance, &fabrics_api, flow_file, &cli)?; + tracing::info!( + "Target Info: {}", + BASE64_STANDARD.encode(target_info.to_string()?) + ); + target.run(running)?; + } + } + + Ok(()) +} diff --git a/rust/mxl/src/api.rs b/rust/mxl/src/api.rs index 57418f8f..018c82e2 100644 --- a/rust/mxl/src/api.rs +++ b/rust/mxl/src/api.rs @@ -15,3 +15,15 @@ pub fn load_api(path_to_so_file: impl AsRef) -> Result { libmxl::new(path_to_so_file.as_ref().as_os_str())? })) } + +#[cfg(feature = "mxl-fabrics-ofi")] +pub type MxlFabricsApi = mxl_sys::fabrics::libmxlfabrics; +#[cfg(feature = "mxl-fabrics-ofi")] +pub type MxlFabricsAPiHandle = Arc; + +#[cfg(feature = "mxl-fabrics-ofi")] +pub fn load_fabrics_api(path_to_so_file: impl AsRef) -> Result { + Ok(Arc::new(unsafe { + mxl_sys::fabrics::libmxlfabrics::new(path_to_so_file.as_ref().as_os_str())? + })) +} diff --git a/rust/mxl/src/config.rs b/rust/mxl/src/config.rs index b80fd0c2..e3983690 100644 --- a/rust/mxl/src/config.rs +++ b/rust/mxl/src/config.rs @@ -20,6 +20,23 @@ pub fn get_mxl_so_path() -> std::path::PathBuf { .join("libmxl.so") } +#[cfg(not(feature = "mxl-not-built"))] +pub fn get_mxl_fabrics_ofi_so_path() -> std::path::PathBuf { + // The mxl-sys build script ensures that the build directory is in the library path + // so we can just return the library name here. + "libmxl-fabrics.so".into() +} + +#[cfg(feature = "mxl-not-built")] +pub fn get_mxl_fabrics_ofi_so_path() -> std::path::PathBuf { + std::path::PathBuf::from_str(MXL_BUILD_DIR) + .expect("build error: 'MXL_FABRICS_SO_PATH' is invalid") + .join("lib") + .join("fabrics") + .join("ofi") + .join("libmxl-fabrics.so") +} + pub fn get_mxl_repo_root() -> std::path::PathBuf { std::path::PathBuf::from_str(MXL_REPO_ROOT).expect("build error: 'MXL_REPO_ROOT' is invalid") } diff --git a/rust/mxl/src/error.rs b/rust/mxl/src/error.rs index 4b6aa4ce..bb130940 100644 --- a/rust/mxl/src/error.rs +++ b/rust/mxl/src/error.rs @@ -23,6 +23,12 @@ pub enum Error { InvalidArg, #[error("Conflict")] Conflict, + #[error("Not ready")] + NotReady, + #[error("Not found")] + NotFound, + #[error("Interrupted")] + Interrupted, /// The error is not defined in the MXL API, but it is used to wrap other errors. #[error("Other error: {0}")] Other(String), @@ -47,6 +53,9 @@ impl Error { mxl_sys::MXL_ERR_TIMEOUT => Err(Error::Timeout), mxl_sys::MXL_ERR_INVALID_ARG => Err(Error::InvalidArg), mxl_sys::MXL_ERR_CONFLICT => Err(Error::Conflict), + mxl_sys::MXL_ERR_NOT_READY => Err(Error::NotReady), + mxl_sys::MXL_ERR_NOT_FOUND => Err(Error::NotFound), + mxl_sys::MXL_ERR_INTERRUPTED => Err(Error::Interrupted), other => Err(Error::Unknown(other)), } } diff --git a/rust/mxl/src/fabrics/config.rs b/rust/mxl/src/fabrics/config.rs new file mode 100644 index 00000000..b2b5ab7c --- /dev/null +++ b/rust/mxl/src/fabrics/config.rs @@ -0,0 +1,32 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use std::ffi::CString; + +use crate::Error; + +/// Address of a logical network endpoint. This is analogous to a hostname and port number in classic ipv4 networking. +/// The actual values for node and service vary between providers, but often an ip address as the node value and a port number as the service +/// value are sufficient. +pub struct EndpointAddress<'a> { + pub node: Option<&'a str>, + pub service: Option<&'a str>, +} + +impl<'a> TryFrom<&EndpointAddress<'a>> for mxl_sys::fabrics::FabricsEndpointAddress { + type Error = Error; + + fn try_from(value: &EndpointAddress) -> Result { + let node = if let Some(node) = value.node { + CString::new(node)?.into_raw() + } else { + std::ptr::null_mut() + }; + let service = if let Some(service) = value.service { + CString::new(service)?.into_raw() + } else { + std::ptr::null_mut() + }; + Ok(mxl_sys::fabrics::FabricsEndpointAddress { node, service }) + } +} diff --git a/rust/mxl/src/fabrics/initiator/config.rs b/rust/mxl/src/fabrics/initiator/config.rs new file mode 100644 index 00000000..a34e7df4 --- /dev/null +++ b/rust/mxl/src/fabrics/initiator/config.rs @@ -0,0 +1,48 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + Error, FlowReader, + fabrics::{config::EndpointAddress, provider::Provider}, +}; + +/// Configuration object required to set up an initiator. +pub struct Config<'a> { + version: i32, + endpoint_addr: EndpointAddress<'a>, + provider: Provider, + flow_reader: &'a FlowReader, +} + +impl<'a> Config<'a> { + pub fn new( + endpoint_addr: EndpointAddress<'a>, + provider: Provider, + flow_reader: &'a FlowReader, + ) -> Self { + Self { + version: 0, + endpoint_addr, + provider, + flow_reader, + } + } +} + +impl<'a> TryFrom<&Config<'a>> for mxl_sys::fabrics::FabricsInitiatorConfig { + type Error = Error; + + fn try_from(value: &Config) -> Result { + Ok(Self { + version: value.version, + endpointAddress: (&value.endpoint_addr).try_into()?, + provider: (&value.provider).into(), + // SAFETY: The type cast is necessary, because this FlowReader is scoped in mxl_sys::fabrics::*, not mxl_sys::*, but this is the same type. + reader: unsafe { + std::mem::transmute::( + value.flow_reader.inner(), + ) + }, + }) + } +} diff --git a/rust/mxl/src/fabrics/initiator/grain.rs b/rust/mxl/src/fabrics/initiator/grain.rs new file mode 100644 index 00000000..c5ba7cec --- /dev/null +++ b/rust/mxl/src/fabrics/initiator/grain.rs @@ -0,0 +1,74 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use crate::{ + Error, Result, + fabrics::{TargetInfo, initiator::Grain, initiator::Initiator}, +}; + +impl Initiator { + /// Add a target to the initiator. This will allow the initiator to send data to the target in subsequent calls to + /// mxlFabricsInitiatorTransferGrain(). This function is always non-blocking. If additional connection setup is required + /// by the underlying implementation, it will only happen during a call to make_progress*(). + pub fn add_target(&self, target: &TargetInfo) -> Result<()> { + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_initiator_add_target(self.instance.inner, target.inner) + }) + } + + /// Remove a target from the initiator. This function is always non-blocking. If any additional communication for a graceful shutdown is + /// required it will happend during a call to make_progress*(). It is guaranteed that no new grain transfer operations will + /// be queued for this target during calls to transfer() after the target was removed, but it is only guaranteed that + /// the connection shutdown has completed after make_progress*() no longer returns Error::NotReady. + pub fn remove_target(&self, target: &TargetInfo) -> Result<()> { + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_initiator_remove_target(self.instance.inner, target.inner) + }) + } + + /// This function must be called regularly for the initiator to make progress on queued transfer operations, connection establishment + /// operations and connection shutdown operations. + pub fn make_progress_non_blocking(&self) -> Result<()> { + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_initiator_make_progress_non_blocking(self.instance.inner) + }) + } + + /// This function must be called regularly for the initiator to make progress on queued transfer operations, connection establishment + /// operations and connection shutdown operations. + pub fn make_progress(&self, timeout: Duration) -> Result<()> { + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_initiator_make_progress_blocking( + self.instance.inner, + timeout.as_millis() as u16, + ) + }) + } + + /// Enqueue a transfer operation to all added targets. This function is always non-blocking. The transfer operation might be started right + /// away, but is only guaranteed to have completed after mxlFabricsInitiatorMakeProgress*() no longer returns Error::NotReady. + pub fn transfer(&self, grain_index: u64, start_slice: u16, end_slice: u16) -> Result<()> { + Error::from_status(unsafe { + self.instance.ctx.api().fabrics_initiator_transfer_grain( + self.instance.inner, + grain_index, + start_slice, + end_slice, + ) + }) + } +} diff --git a/rust/mxl/src/fabrics/initiator/mod.rs b/rust/mxl/src/fabrics/initiator/mod.rs new file mode 100644 index 00000000..51519158 --- /dev/null +++ b/rust/mxl/src/fabrics/initiator/mod.rs @@ -0,0 +1,147 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +mod config; +mod grain; +mod samples; + +use crate::{ + FlowConfigInfo, + error::{Error, Result}, + fabrics::instance::FabricsInstanceContext, +}; + +pub use config::Config; + +use std::{marker::PhantomData, sync::Arc}; + +use states::*; + +pub mod states { + /// Used to create a new initiator + pub struct New {} + + /// Waiting for the initiator to be initialized with the setup function + pub struct Initializing {} + + /// The setup function has been called, but the initiator has not yet been specialized into a + /// grain or samples initiator + pub struct Specializing {} + + /// The initiator has been specialized into a grain initiator. It can only transfer grains to + /// targets. + pub struct Grain {} + + /// The initiator has been specialized into a samples initiator. It can only transfer samples to + pub struct Samples {} + + impl InitiatorState for New {} + impl InitiatorState for Initializing {} + impl InitiatorState for Specializing {} + impl InitiatorState for Grain {} + impl InitiatorState for Samples {} + + pub trait InitiatorState {} +} + +/// Wrapper class that holds a reference count to the Fabrics Instance and the actual initiator instance. +struct InitiatorInstance { + ctx: Arc, + inner: mxl_sys::fabrics::FabricsInitiator, +} +unsafe impl Send for InitiatorInstance {} + +impl Drop for InitiatorInstance { + fn drop(&mut self) { + if !self.inner.is_null() { + unsafe { + self.ctx + .api() + .fabrics_destroy_initiator(self.ctx.inner, self.inner); + } + } + } +} + +pub struct Initiator { + instance: InitiatorInstance, + _marker: std::marker::PhantomData, +} +//SAFETY: An initiator is safe to be sent across threads, but it's not thread-safe to use its API functions. +unsafe impl Send for Initiator {} + +pub enum Either { + Grain(Initiator), + Samples(Initiator), +} + +impl Initiator { + /// Create a new initiator + pub(crate) fn new( + ctx: Arc, + initiator: mxl_sys::fabrics::FabricsInitiator, + ) -> Initiator { + let instance = InitiatorInstance { + ctx, + inner: initiator, + }; + Initiator { + instance, + _marker: std::marker::PhantomData, + } + } +} + +impl Initiator { + /// Configure the initiator. + pub fn setup(self, config: &Config) -> Result> { + Error::from_status(unsafe { + self.instance.ctx.api().fabrics_initiator_setup( + self.instance.inner, + &config.try_into()?, + std::ptr::null(), // Unused for now + ) + })?; + Ok(Initiator { + instance: self.instance, + _marker: PhantomData, + }) + } +} + +impl Initiator { + /// Specialize the initator into a concrete grain or samples initator + pub fn specialize(self, flow_config: &FlowConfigInfo) -> Either { + if flow_config.is_discrete_flow() { + Either::Grain(Initiator { + instance: self.instance, + _marker: PhantomData, + }) + } else { + Either::Samples(Initiator { + instance: self.instance, + _marker: PhantomData, + }) + } + } +} + +/// Create a new initiator +#[doc(hidden)] +pub(crate) fn create_initiator( + ctx: Arc, +) -> Result> { + let mut initiator = mxl_sys::fabrics::FabricsInitiator::default(); + unsafe { + Error::from_status( + ctx.api() + .fabrics_create_initiator(ctx.inner, &mut initiator), + )? + } + if initiator.is_null() { + return Err(Error::Other( + "Failed to create fabrics initiator.".to_string(), + )); + } + Ok(Initiator::new(ctx.clone(), initiator)) +} diff --git a/rust/mxl/src/fabrics/initiator/samples.rs b/rust/mxl/src/fabrics/initiator/samples.rs new file mode 100644 index 00000000..751abb72 --- /dev/null +++ b/rust/mxl/src/fabrics/initiator/samples.rs @@ -0,0 +1,73 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use crate::{ + Error, Result, + fabrics::{TargetInfo, initiator::Initiator, initiator::Samples}, +}; + +impl Initiator { + /// Add a target to the initiator. This will allow the initiator to send data to the target in subsequent calls to + /// mxlFabricsInitiatorTransferGrain(). This function is always non-blocking. If additional connection setup is required + /// by the underlying implementation, it will only happen during a call to make_progress*(). + pub fn add_target(&self, target: &TargetInfo) -> Result<()> { + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_initiator_add_target(self.instance.inner, target.inner) + }) + } + + /// Remove a target from the initiator. This function is always non-blocking. If any additional communication for a graceful shutdown is + /// required it will happend during a call to make_progress*(). It is guaranteed that no new grain transfer operations will + /// be queued for this target during calls to transfer() after the target was removed, but it is only guaranteed that + /// the connection shutdown has completed after make_progress*() no longer returns Error::NotReady. + pub fn remove_target(&self, target: &TargetInfo) -> Result<()> { + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_initiator_remove_target(self.instance.inner, target.inner) + }) + } + + /// This function must be called regularly for the initiator to make progress on queued transfer operations, connection establishment + /// operations and connection shutdown operations. + pub fn make_progress_non_blocking(&self) -> Result<()> { + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_initiator_make_progress_non_blocking(self.instance.inner) + }) + } + + /// This function must be called regularly for the initiator to make progress on queued transfer operations, connection establishment + /// operations and connection shutdown operations. + pub fn make_progress(&self, timeout: Duration) -> Result<()> { + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_initiator_make_progress_blocking( + self.instance.inner, + timeout.as_millis() as u16, + ) + }) + } + + /// Enqueue a transfer operation to all added targets. This function is always non-blocking. The transfer operation might be started right + /// away, but is only guaranteed to have completed after mxlFabricsInitiatorMakeProgress*() no longer returns Error::NotReady. + pub fn transfer(&self, head_index: u64, count: usize) -> Result<()> { + Error::from_status(unsafe { + self.instance.ctx.api().fabrics_initiator_transfer_samples( + self.instance.inner, + head_index, + count, + ) + }) + } +} diff --git a/rust/mxl/src/fabrics/instance.rs b/rust/mxl/src/fabrics/instance.rs new file mode 100644 index 00000000..93572cbf --- /dev/null +++ b/rust/mxl/src/fabrics/instance.rs @@ -0,0 +1,104 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use crate::{ + api::MxlFabricsAPiHandle, + error::{Error, Result}, + fabrics::{ + initiator::{self, Initiator, create_initiator}, + provider::Provider, + target::{self, Target, create_target}, + target_info::TargetInfo, + }, + instance::InstanceContext, +}; + +pub(crate) fn create_instance( + ctx: Arc, + fabrics_api: &MxlFabricsAPiHandle, +) -> Result { + let mut inst = std::ptr::null_mut(); + unsafe { + Error::from_status(fabrics_api.fabrics_create_instance( + std::mem::transmute::<*mut mxl_sys::Instance_t, *mut mxl_sys::fabrics::Instance_t>( + ctx.instance, + ), + std::ptr::null(), // Unused for now + &mut inst, + ))?; + } + if inst.is_null() { + return Err(Error::Other( + "Failed to create fabrics instance.".to_string(), + )); + } + + #[allow(clippy::arc_with_non_send_sync)] + // This is intentional, this Arc only implement Send, because fabric API as a whole is not thread-safe to use. + let ctx = Arc::new(FabricsInstanceContext { + _parent_ctx: ctx.clone(), + api: fabrics_api.clone(), + inner: inst, + }); + + Ok(FabricsInstance::new(ctx)) +} + +pub(crate) struct FabricsInstanceContext { + _parent_ctx: Arc, + api: MxlFabricsAPiHandle, + pub(crate) inner: mxl_sys::fabrics::FabricsInstance, +} +unsafe impl Send for FabricsInstanceContext {} + +impl FabricsInstanceContext { + pub(crate) fn api(&self) -> &MxlFabricsAPiHandle { + &self.api + } +} + +impl Drop for FabricsInstanceContext { + fn drop(&mut self) { + if !self.inner.is_null() { + unsafe { + let _ = self.api.fabrics_destroy_instance(self.inner); + } + } + } +} + +/// This is just a factory type for creating Fabrics related objects such as Targets, Initiators, etc. +/// The fabrics instance and its pointer are held in the `FabricsInstanceContext`` object. +/// This is created via an [MxlInstance](crate::MxlInstance). +pub struct FabricsInstance { + /// The fabric API is not-thread safe (Sync). + ctx: Arc, +} +/// SAFETY: FabricsInstance is safe to send to another thread, but fabric API as a whole is not thread-safe +unsafe impl Send for FabricsInstance {} + +impl FabricsInstance { + fn new(ctx: Arc) -> Self { + Self { ctx } + } + + /// Create a fabrics target. The target is the receiver of write operations from an initiator. + pub fn create_target(&self) -> Result> { + create_target(self.ctx.clone()) + } + + /// Create a fabrics initiator instance. + pub fn create_initiator(&self) -> Result> { + create_initiator(self.ctx.clone()) + } + + pub fn provider_from_str(&self, provider: &str) -> Result { + Provider::from_str(self.ctx.clone(), provider) + } + + pub fn target_info_from_str(&self, target_info: &str) -> Result { + TargetInfo::from_str(self.ctx.clone(), target_info) + } +} diff --git a/rust/mxl/src/fabrics/mod.rs b/rust/mxl/src/fabrics/mod.rs new file mode 100644 index 00000000..a2d2e728 --- /dev/null +++ b/rust/mxl/src/fabrics/mod.rs @@ -0,0 +1,33 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +//! This module provides the Fabrics API extension for this library. The main type is the +//! [FabricsInstance], which is used to create Targets and Initiators for remote data transfers. +//! This module is gated by the `mxl-fabrics-ofi` feature flag. +//! +//! # Details +//! - To get a FabricsInstance, you must create it from MXL instance and a loaded Fabrics API. +//! ``` +//! let mxl_api = mxl::load_api(mxl::config::get_mxl_so_path()) .unwrap(); +//! let instance = mxl::MxlInstance::new(mxl_api, "/dev/shm","").unwrap(); +//! +//! let mxl_fabrics_api = mxl::load_fabrics_api(mxl::config::get_mxl_fabrics_ofi_so_path()); +//! let fabrics_instance = instance.create_fabrics_instance(&mxl_fabrics_api.unwrap()).unwrap(); +//! +//! // You can now create Targets and Initiators from the fabrics_instance +//! let target = fabrics_instance.create_target().unwrap(); +//! let initiator = fabrics_instance.create_initiator().unwrap(); +//! ```` +mod config; +pub mod initiator; +mod instance; +mod provider; +pub mod target; +mod target_info; + +pub use config::EndpointAddress; +pub use instance::FabricsInstance; +pub use provider::Provider; +pub use target_info::TargetInfo; + +pub(crate) use instance::create_instance; diff --git a/rust/mxl/src/fabrics/provider.rs b/rust/mxl/src/fabrics/provider.rs new file mode 100644 index 00000000..4b332f17 --- /dev/null +++ b/rust/mxl/src/fabrics/provider.rs @@ -0,0 +1,111 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use mxl_sys::fabrics::FabricsProvider; + +use crate::error::{Error, Result}; +use std::{ffi::CString, sync::Arc}; + +use crate::fabrics::instance::FabricsInstanceContext; + +/// The provider corresponds to the transport used for transfers. This is created from a +/// [FabricsInstance](crate::fabrics::FabricsInstance). +#[derive(Clone)] +pub struct Provider { + inner: ProviderType, + ctx: Arc, +} +unsafe impl Send for Provider {} +/// SAFETY: Although the `FabricsInstanceContext` type as a whole is not thread-safe, the subset of functions that this `Provider type uses is thread-safe: `fabrics_provider_from_string` and `fabrics_provider_to_string` +unsafe impl Sync for Provider {} + +/// The available transports +#[derive(Clone)] +enum ProviderType { + Auto, + Tcp, + Verbs, + Efa, + Shm, +} + +impl From for ProviderType { + fn from(value: mxl_sys::fabrics::FabricsProvider) -> Self { + match value { + mxl_sys::fabrics::MXL_FABRICS_PROVIDER_AUTO => ProviderType::Auto, + mxl_sys::fabrics::MXL_FABRICS_PROVIDER_TCP => ProviderType::Tcp, + mxl_sys::fabrics::MXL_FABRICS_PROVIDER_VERBS => ProviderType::Verbs, + mxl_sys::fabrics::MXL_FABRICS_PROVIDER_EFA => ProviderType::Efa, + mxl_sys::fabrics::MXL_FABRICS_PROVIDER_SHM => ProviderType::Shm, + _ => panic!("Unknown FabricsProvider value"), + } + } +} + +impl From<&ProviderType> for mxl_sys::fabrics::FabricsProvider { + fn from(value: &ProviderType) -> Self { + match value { + ProviderType::Auto => mxl_sys::fabrics::MXL_FABRICS_PROVIDER_AUTO, + ProviderType::Tcp => mxl_sys::fabrics::MXL_FABRICS_PROVIDER_TCP, + ProviderType::Verbs => mxl_sys::fabrics::MXL_FABRICS_PROVIDER_VERBS, + ProviderType::Efa => mxl_sys::fabrics::MXL_FABRICS_PROVIDER_EFA, + ProviderType::Shm => mxl_sys::fabrics::MXL_FABRICS_PROVIDER_SHM, + } + } +} + +impl From<&Provider> for mxl_sys::fabrics::FabricsProvider { + fn from(value: &Provider) -> Self { + (&value.inner).into() + } +} + +impl Provider { + fn new(ctx: Arc, inner: FabricsProvider) -> Self { + Self { + inner: inner.into(), + ctx, + } + } + + /// Convert a string to a fabrics provider enum value. + /// Public visibility is set to crate only, because a `FabricsInstanceContext` is required. + /// See [FabricsInstance](crate::FabricsInstance). + pub(crate) fn from_str(ctx: Arc, s: &str) -> Result { + let mut inner = FabricsProvider::default(); + + Error::from_status(unsafe { + ctx.api() + .fabrics_provider_from_string(CString::new(s)?.as_ptr(), &mut inner) + })?; + + Ok(Self::new(ctx, inner)) + } + + /// Convert a fabrics provider enum value to a string. + pub fn to_string(&self) -> Result { + let mut size = 0; + + let prov: mxl_sys::fabrics::FabricsProvider = (&self.inner).into(); + + Error::from_status(unsafe { + self.ctx + .api() + .fabrics_provider_to_string(prov, std::ptr::null_mut(), &mut size) + })?; + + // fabrics_provider_to_string already includes space for null terminator. So we must remove it here, because CString includes it. + let out_string = unsafe { CString::from_vec_unchecked(vec![0; size - 1]) }; + + Error::from_status(unsafe { + self.ctx.api().fabrics_provider_to_string( + prov, + out_string.as_ptr() as *mut i8, + &mut size, + ) + })?; + out_string + .into_string() + .map_err(|e| Error::Other(e.to_string())) + } +} diff --git a/rust/mxl/src/fabrics/target/config.rs b/rust/mxl/src/fabrics/target/config.rs new file mode 100644 index 00000000..635b8dfd --- /dev/null +++ b/rust/mxl/src/fabrics/target/config.rs @@ -0,0 +1,50 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use crate::FlowWriter; + +use crate::{ + Error, + fabrics::{config::EndpointAddress, provider::Provider}, +}; + +/// Configuration object required to set up a target. +pub struct Config<'a> { + version: i32, + endpoint_addr: EndpointAddress<'a>, + provider: Provider, + flow_writer: &'a FlowWriter, +} + +impl<'a> Config<'a> { + pub fn new( + endpoint_addr: EndpointAddress<'a>, + provider: Provider, + flow_writer: &'a FlowWriter, + ) -> Self { + Self { + version: 0, + endpoint_addr, + provider, + flow_writer, + } + } +} + +impl<'a> TryFrom<&Config<'a>> for mxl_sys::fabrics::FabricsTargetConfig { + type Error = Error; + + fn try_from(value: &Config) -> Result { + Ok(Self { + version: value.version, + endpointAddress: (&value.endpoint_addr).try_into()?, + provider: (&value.provider).into(), + // SAFETY: The type cast is necessary, because this FlowWriter is scoped in mxl_sys::fabrics::*, not mxl_sys::*, but this is the same type. + writer: unsafe { + std::mem::transmute::( + value.flow_writer.inner(), + ) + }, + }) + } +} diff --git a/rust/mxl/src/fabrics/target/grain.rs b/rust/mxl/src/fabrics/target/grain.rs new file mode 100644 index 00000000..af6609dd --- /dev/null +++ b/rust/mxl/src/fabrics/target/grain.rs @@ -0,0 +1,42 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +// use std::time::Duration; +// +use crate::{ + Error, Result, + fabrics::{target::Target, target::states::Grain}, +}; +use std::time::Duration; + +/// Returned value from calling read* methods. +pub struct GrainReadResult { + pub grain_index: u64, +} + +impl Target { + /// Blocking accessor for a new grain. + pub fn read(&self, timeout: Duration) -> Result { + let mut grain_index = 0; + Error::from_status(unsafe { + self.instance.ctx.api().fabrics_target_read_grain( + self.instance.inner, + timeout.as_millis() as u16, + &mut grain_index, + ) + })?; + Ok(GrainReadResult { grain_index }) + } + + /// Non-blocking accessor for a new grain. + pub fn read_non_blocking(&self) -> Result { + let mut grain_index = 0; + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_target_read_grain_non_blocking(self.instance.inner, &mut grain_index) + })?; + Ok(GrainReadResult { grain_index }) + } +} diff --git a/rust/mxl/src/fabrics/target/mod.rs b/rust/mxl/src/fabrics/target/mod.rs new file mode 100644 index 00000000..20b9fac1 --- /dev/null +++ b/rust/mxl/src/fabrics/target/mod.rs @@ -0,0 +1,146 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +mod config; +mod grain; +mod samples; + +use std::{marker::PhantomData, sync::Arc}; + +use crate::{ + FlowConfigInfo, + error::{Error, Result}, + fabrics::{instance::FabricsInstanceContext, target_info::TargetInfo}, +}; +pub use config::Config; + +use states::*; + +pub mod states { + /// Used to create a new target + pub struct New {} + + /// Waiting for the target to be initialized with the setup function + pub struct Initializing {} + + /// The setup function has been called, but the target has not yet been specialized into a + /// grain or samples target + pub struct Specializing {} + + /// The target has been specialized into a grain target. It can only receive grains + pub struct Grain {} + + /// The target has been specialized into a samples target. It can only receive samples + pub struct Sample {} + + impl TargetState for New {} + impl TargetState for Initializing {} + impl TargetState for Specializing {} + impl TargetState for Grain {} + impl TargetState for Sample {} + + pub trait TargetState {} +} + +/// Wrapper class that holds a reference count to the Fabrics Instance and the actual target +/// instance. +pub struct TargetInstance { + ctx: Arc, + inner: mxl_sys::fabrics::FabricsTarget, +} +unsafe impl Send for TargetInstance {} + +impl Drop for TargetInstance { + fn drop(&mut self) { + if !self.inner.is_null() { + unsafe { + self.ctx + .api() + .fabrics_destroy_target(self.ctx.inner, self.inner); + } + } + } +} + +pub struct Target { + instance: TargetInstance, + _marker: PhantomData, +} +//SAFETY: A target is safe to be sent across threads, but it's not thread-safe to use its API functions. +unsafe impl Send for Target {} + +pub enum Either { + Grain(Target), + Sample(Target), +} + +impl Target { + pub(crate) fn new( + ctx: Arc, + target: mxl_sys::fabrics::FabricsTarget, + ) -> Target { + let instance = TargetInstance { ctx, inner: target }; + Target { + instance, + _marker: PhantomData, + } + } +} + +impl Target { + /// Configure the target. After the target has been configured, it is ready to receive transfers from an initiator. + /// If additional connection setup is required by the underlying implementation it might not happen during the call to + /// setup(), but be deferred until the first call to mxlFabricsTargetTryNewGrain(). + pub fn setup(self, config: &Config) -> Result<(Target, TargetInfo)> { + let mut info = mxl_sys::fabrics::FabricsTargetInfo::default(); + Error::from_status(unsafe { + self.instance.ctx.api().fabrics_target_setup( + self.instance.inner, + &config.try_into()?, + std::ptr::null(), + &mut info, + ) + })?; + + let ctx = self.instance.ctx.clone(); + + Ok(( + Target { + instance: self.instance, + _marker: PhantomData, + }, + TargetInfo::new(ctx, info), + )) + } +} + +impl Target { + /// Specialize the target into a concrete grain or samples target + pub fn specialize(self, flow_config: &FlowConfigInfo) -> Either { + if flow_config.is_discrete_flow() { + Either::Grain(Target { + instance: self.instance, + _marker: PhantomData, + }) + } else { + Either::Sample(Target { + instance: self.instance, + _marker: PhantomData, + }) + } + } +} + +/// Create a new target. +#[doc(hidden)] +pub(crate) fn create_target(ctx: Arc) -> Result> { + let mut target = mxl_sys::fabrics::FabricsTarget::default(); + unsafe { + Error::from_status(ctx.api().fabrics_create_target(ctx.inner, &mut target))?; + } + if target.is_null() { + return Err(Error::Other("Failed to create fabrics target.".to_string())); + } + + Ok(Target::new(ctx.clone(), target)) +} diff --git a/rust/mxl/src/fabrics/target/samples.rs b/rust/mxl/src/fabrics/target/samples.rs new file mode 100644 index 00000000..55eeb71d --- /dev/null +++ b/rust/mxl/src/fabrics/target/samples.rs @@ -0,0 +1,48 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use crate::{ + Error, + error::Result, + fabrics::target::{Target, states::Sample}, +}; + +pub struct SampleReadResult { + pub head_index: u64, + pub count: usize, +} + +impl Target { + ///Blocking accessor for a new grain. + pub fn read(&self, timeout: Duration) -> Result { + let mut head_index = 0; + let mut count = 0; + Error::from_status(unsafe { + self.instance.ctx.api().fabrics_target_read_samples( + self.instance.inner, + timeout.as_millis() as u16, + &mut head_index, + &mut count, + ) + })?; + Ok(SampleReadResult { head_index, count }) + } + /// Non-blocking accessor for a new grain. + pub fn read_non_blocking(&self) -> Result { + let mut head_index = 0; + let mut count = 0; + Error::from_status(unsafe { + self.instance + .ctx + .api() + .fabrics_target_read_samples_non_blocking( + self.instance.inner, + &mut head_index, + &mut count, + ) + })?; + Ok(SampleReadResult { head_index, count }) + } +} diff --git a/rust/mxl/src/fabrics/target_info.rs b/rust/mxl/src/fabrics/target_info.rs new file mode 100644 index 00000000..8a741d3c --- /dev/null +++ b/rust/mxl/src/fabrics/target_info.rs @@ -0,0 +1,74 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ffi::CString, sync::Arc}; + +use crate::error::{Error, Result}; +use mxl_sys::fabrics::FabricsTargetInfo; + +use crate::fabrics::instance::FabricsInstanceContext; + +/// The TargetInfo object holds the local fabric address, keys and memory region addresses for a target. +/// It is returned after setting up a new target and must be passed to the initiator to connect it. +pub struct TargetInfo { + ctx: Arc, + pub(crate) inner: FabricsTargetInfo, +} +unsafe impl Send for TargetInfo {} +/// SAFETY: Although the `FabricsInstanceContext` type as a whole is not thread-safe, the subset of functions that this `TargetInfo` type uses is thread-safe: `fabrics_target_info_from_string` and `fabrics_target_info_to_string` +unsafe impl Sync for TargetInfo {} + +impl TargetInfo { + pub(crate) fn new(ctx: Arc, inner: FabricsTargetInfo) -> Self { + Self { ctx, inner } + } + + /// Parse a targetInfo object from its string representation. + /// Public visibility is set to crate only, because a `FabricsInstanceContext` is required. + /// See [FabricsInstance](crate::FabricsInstance). + pub(crate) fn from_str(ctx: Arc, s: &str) -> Result { + let mut inner = FabricsTargetInfo::default(); + + Error::from_status(unsafe { + ctx.api() + .fabrics_target_info_from_string(CString::new(s)?.as_ptr(), &mut inner) + })?; + + Ok(Self::new(ctx, inner)) + } + + /// Serialize a target info object obtained from mxlFabricsTargetSetup() into a string representation. + pub fn to_string(&self) -> Result { + let mut size = 0; + Error::from_status(unsafe { + self.ctx.api().fabrics_target_info_to_string( + self.inner, + std::ptr::null_mut(), + &mut size, + ) + })?; + + // The size returned by `fabrics_target_info_to_string` previous call already includes space for null terminator. + // Since CString ctor also includes the null terminated character, we must take the size minus 1. + let out_string = unsafe { CString::from_vec_unchecked(vec![0; size - 1]) }; + + Error::from_status(unsafe { + self.ctx.api().fabrics_target_info_to_string( + self.inner, + out_string.as_ptr() as *mut i8, + &mut size, + ) + })?; + out_string + .into_string() + .map_err(|e| Error::Other(e.to_string())) + } +} + +impl Drop for TargetInfo { + fn drop(&mut self) { + if !self.inner.is_null() { + unsafe { self.ctx.api().fabrics_free_target_info(self.inner) }; + } + } +} diff --git a/rust/mxl/src/flow/reader.rs b/rust/mxl/src/flow/reader.rs index ebc5ab63..902401bc 100644 --- a/rust/mxl/src/flow/reader.rs +++ b/rust/mxl/src/flow/reader.rs @@ -71,6 +71,11 @@ impl FlowReader { Self { context, reader } } + #[allow(dead_code)] + pub(crate) fn inner(&self) -> mxl_sys::FlowReader { + self.reader + } + pub fn get_info(&self) -> Result { get_flow_info(&self.context, self.reader) } diff --git a/rust/mxl/src/flow/writer.rs b/rust/mxl/src/flow/writer.rs index 5c8cfd27..231c7417 100644 --- a/rust/mxl/src/flow/writer.rs +++ b/rust/mxl/src/flow/writer.rs @@ -34,6 +34,11 @@ impl FlowWriter { } } + #[allow(dead_code)] + pub(crate) fn inner(&self) -> mxl_sys::FlowWriter { + self.writer + } + pub fn to_grain_writer(mut self) -> Result { let flow_type = self.get_flow_type()?; if !is_discrete_data_format(flow_type) { diff --git a/rust/mxl/src/grain/data.rs b/rust/mxl/src/grain/data.rs index 6ed0bbb5..26ef763b 100644 --- a/rust/mxl/src/grain/data.rs +++ b/rust/mxl/src/grain/data.rs @@ -10,6 +10,10 @@ pub struct GrainData<'a> { pub total_size: usize, pub flags: u32, + + /// The number of slices in the full grain. This is does not change depending on whether the + /// grain is partial or complete. + pub total_slices: u16, } impl<'a> GrainData<'a> { diff --git a/rust/mxl/src/grain/reader.rs b/rust/mxl/src/grain/reader.rs index 270c836f..35dedb2a 100644 --- a/rust/mxl/src/grain/reader.rs +++ b/rust/mxl/src/grain/reader.rs @@ -84,6 +84,7 @@ impl GrainReader { payload, total_size: grain_info.grainSize as usize, flags: grain_info.flags, + total_slices: grain_info.totalSlices, }) } @@ -117,6 +118,7 @@ impl GrainReader { payload, total_size: grain_info.grainSize as usize, flags: grain_info.flags, + total_slices: grain_info.totalSlices, }) } diff --git a/rust/mxl/src/grain/write_access.rs b/rust/mxl/src/grain/write_access.rs index f93043cb..17efee8e 100644 --- a/rust/mxl/src/grain/write_access.rs +++ b/rust/mxl/src/grain/write_access.rs @@ -50,6 +50,9 @@ impl<'a> GrainWriteAccess<'a> { pub fn total_slices(&self) -> u16 { self.grain_info.totalSlices } + pub fn valid_slices(&self) -> u16 { + self.grain_info.validSlices + } pub fn commit(mut self, valid_slices: u16) -> Result<()> { self.committed_or_canceled = true; diff --git a/rust/mxl/src/instance.rs b/rust/mxl/src/instance.rs index bc6253d8..d7cc51e5 100644 --- a/rust/mxl/src/instance.rs +++ b/rust/mxl/src/instance.rs @@ -3,6 +3,8 @@ use std::{ffi::CString, sync::Arc}; +#[cfg(feature = "mxl-fabrics-ofi")] +use crate::api::MxlFabricsAPiHandle; use crate::{Error, FlowConfigInfo, FlowReader, FlowWriter, Result, api::MxlApiHandle}; /// This struct stores the context that is shared by all objects. @@ -240,4 +242,14 @@ impl MxlInstance { .ok_or_else(|| Error::Other("Instance is still in use.".to_string()))?; context.destroy() } + + #[cfg(feature = "mxl-fabrics-ofi")] + pub fn create_fabrics_instance( + &self, + fabrics_api: &MxlFabricsAPiHandle, + ) -> Result { + use crate::fabrics; + + fabrics::create_instance(self.context.clone(), fabrics_api) + } } diff --git a/rust/mxl/src/lib.rs b/rust/mxl/src/lib.rs index 8c327ced..10c1683e 100644 --- a/rust/mxl/src/lib.rs +++ b/rust/mxl/src/lib.rs @@ -22,3 +22,8 @@ pub use mxl_sys::Rational; pub use samples::{ data::*, reader::SamplesReader, write_access::SamplesWriteAccess, writer::SamplesWriter, }; + +#[cfg(feature = "mxl-fabrics-ofi")] +pub mod fabrics; +#[cfg(feature = "mxl-fabrics-ofi")] +pub use api::{MxlFabricsApi, load_fabrics_api}; diff --git a/rust/mxl/tests/basic_tests.rs b/rust/mxl/tests/basic_tests.rs index d08b81c8..72cf1efa 100644 --- a/rust/mxl/tests/basic_tests.rs +++ b/rust/mxl/tests/basic_tests.rs @@ -5,84 +5,14 @@ /// /// The tests now require an MXL library of a specific name to be present in the system. This should /// change in the future. For now, feel free to just edit the path to your library. +mod common; + use std::time::Duration; -use mxl::{MxlInstance, OwnedGrainData, OwnedSamplesData, config::get_mxl_so_path}; +use common::{read_flow_def, setup_test}; +use mxl::{OwnedGrainData, OwnedSamplesData}; use tracing::info; -static LOG_ONCE: std::sync::Once = std::sync::Once::new(); - -struct TestDomainGuard { - dir: std::path::PathBuf, -} - -impl TestDomainGuard { - fn new(test: &str) -> Self { - let dir = std::path::PathBuf::from(format!( - "/dev/shm/mxl_rust_unit_tests_domain_{}_{}", - test, - uuid::Uuid::new_v4() - )); - std::fs::create_dir_all(dir.as_path()).unwrap_or_else(|_| { - panic!( - "Failed to create test domain directory \"{}\".", - dir.display() - ) - }); - Self { dir } - } - - fn domain(&self) -> String { - self.dir.to_string_lossy().to_string() - } -} - -impl Drop for TestDomainGuard { - fn drop(&mut self) { - std::fs::remove_dir_all(self.dir.as_path()).unwrap_or_else(|_| { - panic!( - "Failed to remove test domain directory \"{}\".", - self.dir.display() - ) - }); - } -} - -fn setup_test(test: &str) -> (MxlInstance, TestDomainGuard) { - // Set up the logging to use the RUST_LOG environment variable and if not present, print INFO - // and higher. - LOG_ONCE.call_once(|| { - tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::builder() - .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) - .from_env_lossy(), - ) - .init(); - }); - - let mxl_api = mxl::load_api(get_mxl_so_path()).unwrap(); - let domain_guard = TestDomainGuard::new(test); - ( - MxlInstance::new(mxl_api, domain_guard.domain().as_str(), "").unwrap(), - domain_guard, - ) -} - -fn read_flow_def>(path: P) -> String { - let flow_config_file = mxl::config::get_mxl_repo_root().join(path); - - std::fs::read_to_string(flow_config_file.as_path()) - .map_err(|error| { - mxl::Error::Other(format!( - "Error while reading flow definition from \"{}\": {}", - flow_config_file.display(), - error - )) - }) - .unwrap() -} - #[test] fn basic_mxl_grain_writing_reading() { let (mxl_instance, _domain_guard) = setup_test("grains"); diff --git a/rust/mxl/tests/common/mod.rs b/rust/mxl/tests/common/mod.rs new file mode 100644 index 00000000..8e76f287 --- /dev/null +++ b/rust/mxl/tests/common/mod.rs @@ -0,0 +1,84 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +use mxl::{MxlInstance, config::get_mxl_so_path}; + +#[cfg(feature = "mxl-fabrics-ofi")] +use mxl::{MxlFabricsApi, config::get_mxl_fabrics_ofi_so_path}; + +static LOG_ONCE: std::sync::Once = std::sync::Once::new(); + +pub struct TestDomainGuard { + dir: std::path::PathBuf, +} + +impl TestDomainGuard { + fn new(test: &str) -> Self { + let dir = std::path::PathBuf::from(format!( + "/dev/shm/mxl_rust_tests_domain_{}_{}", + test, + uuid::Uuid::new_v4() + )); + std::fs::create_dir_all(dir.as_path()).unwrap_or_else(|_| { + panic!( + "Failed to create test domain directory \"{}\".", + dir.display() + ) + }); + Self { dir } + } + + fn domain(&self) -> String { + self.dir.to_string_lossy().to_string() + } +} + +impl Drop for TestDomainGuard { + fn drop(&mut self) { + std::fs::remove_dir_all(self.dir.as_path()).unwrap_or_else(|_| { + panic!( + "Failed to remove test domain directory \"{}\".", + self.dir.display() + ) + }); + } +} + +pub fn setup_test(test: &str) -> (MxlInstance, TestDomainGuard) { + LOG_ONCE.call_once(|| { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .init(); + }); + + let mxl_api = mxl::load_api(get_mxl_so_path()).unwrap(); + let domain_guard = TestDomainGuard::new(test); + ( + MxlInstance::new(mxl_api, domain_guard.domain().as_str(), "").unwrap(), + domain_guard, + ) +} + +#[cfg(feature = "mxl-fabrics-ofi")] +#[allow(dead_code)] +pub fn load_fabrics_test_api() -> std::sync::Arc { + mxl::load_fabrics_api(get_mxl_fabrics_ofi_so_path()).unwrap() +} + +pub fn read_flow_def>(path: P) -> String { + let flow_config_file = mxl::config::get_mxl_repo_root().join(path); + + std::fs::read_to_string(flow_config_file.as_path()) + .map_err(|error| { + mxl::Error::Other(format!( + "Error while reading flow definition from \"{}\": {}", + flow_config_file.display(), + error + )) + }) + .unwrap() +} diff --git a/rust/mxl/tests/fabrics_ofi_tests.rs b/rust/mxl/tests/fabrics_ofi_tests.rs new file mode 100644 index 00000000..7b79e9f9 --- /dev/null +++ b/rust/mxl/tests/fabrics_ofi_tests.rs @@ -0,0 +1,525 @@ +// SPDX-FileCopyrightText: 2026 Contributors to the Media eXchange Layer project. +// SPDX-License-Identifier: Apache-2.0 + +#![cfg(feature = "mxl-fabrics-ofi")] + +mod common; + +use std::time::{Duration, Instant}; + +use common::{load_fabrics_test_api, read_flow_def, setup_test}; +use mxl::{ + Error, FlowReader, FlowWriter, GrainReader, GrainWriter, OwnedGrainData, OwnedSamplesData, + SamplesReader, SamplesWriter, + fabrics::{ + EndpointAddress, + initiator::{self, Initiator}, + target::{self, Target}, + }, +}; + +const POLL_TIMEOUT: Duration = Duration::from_secs(5); +const BLOCKING_WAIT: Duration = Duration::from_millis(20); +const AUDIO_SAMPLE_COUNT: usize = 42; + +fn tcp_endpoint() -> EndpointAddress<'static> { + EndpointAddress { + node: Some("127.0.0.1"), + service: Some("0"), + } +} + +fn poll_until_success(mut step: F, timeout_message: &str) +where + F: FnMut() -> bool, +{ + let deadline = Instant::now() + POLL_TIMEOUT; + while Instant::now() < deadline { + if step() { + return; + } + } + + panic!("{timeout_message}"); +} + +fn wait_for_grain_connection( + target: &Target, + initiator: &Initiator, +) { + poll_until_success( + || { + match target.read_non_blocking() { + Ok(_) | Err(Error::NotReady) => {} + Err(error) => panic!("unexpected target status while waiting for connection: {error}"), + } + + match initiator.make_progress(BLOCKING_WAIT) { + Ok(()) => true, + Err(Error::NotReady) => false, + Err(error) => panic!( + "unexpected initiator status while waiting for grain connection: {error}" + ), + } + }, + "failed to connect grain initiator and target in 5 seconds", + ); +} + +fn wait_for_samples_connection( + target: &Target, + initiator: &Initiator, +) { + poll_until_success( + || { + match target.read_non_blocking() { + Ok(_) | Err(Error::NotReady) => {} + Err(error) => { + panic!("unexpected target status while waiting for sample connection: {error}") + } + } + + match initiator.make_progress(BLOCKING_WAIT) { + Ok(()) => true, + Err(Error::NotReady) => false, + Err(error) => panic!( + "unexpected initiator status while waiting for sample connection: {error}" + ), + } + }, + "failed to connect sample initiator and target in 5 seconds", + ); +} + +fn wait_for_grain_transfer_start( + target: &Target, + initiator: &Initiator, + grain_index: u64, + end_slice: u16, +) { + poll_until_success( + || { + match target.read_non_blocking() { + Ok(_) | Err(Error::NotReady) => {} + Err(error) => panic!("unexpected target status before grain transfer: {error}"), + } + + match initiator.make_progress(BLOCKING_WAIT) { + Ok(()) | Err(Error::NotReady) => {} + Err(error) => panic!( + "unexpected initiator status before grain transfer: {error}" + ), + } + + match initiator.transfer(grain_index, 0, end_slice) { + Ok(()) => true, + Err(Error::NotReady) => false, + Err(error) => panic!("failed to start grain transfer: {error}"), + } + }, + "failed to start grain transfer in 5 seconds", + ); +} + +fn wait_for_samples_transfer_start( + target: &Target, + initiator: &Initiator, + head_index: u64, + count: usize, +) { + poll_until_success( + || { + match target.read_non_blocking() { + Ok(_) | Err(Error::NotReady) => {} + Err(error) => panic!("unexpected target status before sample transfer: {error}"), + } + + match initiator.make_progress(BLOCKING_WAIT) { + Ok(()) | Err(Error::NotReady) => {} + Err(error) => panic!( + "unexpected initiator status before sample transfer: {error}" + ), + } + + match initiator.transfer(head_index, count) { + Ok(()) => true, + Err(Error::NotReady) => false, + Err(error) => panic!("failed to start samples transfer: {error}"), + } + }, + "failed to start samples transfer in 5 seconds", + ); +} + +fn wait_for_grain_transfer_completion( + target: &Target, + initiator: &Initiator, + expected_grain_index: u64, +) -> u64 { + let mut completed_index = None; + poll_until_success( + || { + match initiator.make_progress(BLOCKING_WAIT) { + Ok(()) | Err(Error::NotReady) => {} + Err(error) => panic!( + "unexpected initiator status while completing grain transfer: {error}" + ), + } + + match target.read(BLOCKING_WAIT) { + Ok(result) => { + assert_eq!(result.grain_index, expected_grain_index); + completed_index = Some(result.grain_index); + true + } + Err(Error::NotReady) => false, + Err(Error::Interrupted) => panic!("grain target disconnected before transfer completed"), + Err(error) => panic!("unexpected grain completion status: {error}"), + } + }, + "grain transfer did not complete in 5 seconds", + ); + completed_index.unwrap() +} + +fn wait_for_samples_transfer_completion( + target: &Target, + initiator: &Initiator, + expected_head_index: u64, + expected_count: usize, +) -> (u64, usize) { + let mut completed = None; + poll_until_success( + || { + match initiator.make_progress(BLOCKING_WAIT) { + Ok(()) | Err(Error::NotReady) => {} + Err(error) => panic!( + "unexpected initiator status while completing samples transfer: {error}" + ), + } + + match target.read(BLOCKING_WAIT) { + Ok(result) => { + assert_eq!(result.head_index, expected_head_index); + assert_eq!(result.count, expected_count); + completed = Some((result.head_index, result.count)); + true + } + Err(Error::NotReady) => false, + Err(Error::Interrupted) => { + panic!("samples target disconnected before transfer completed") + } + Err(error) => panic!("unexpected samples completion status: {error}"), + } + }, + "samples transfer did not complete in 5 seconds", + ); + completed.unwrap() +} + +fn wait_for_target_grain( + reader: &GrainReader, + grain_index: u64, +) -> OwnedGrainData { + let mut result = None; + poll_until_success( + || match reader.get_grain_non_blocking(grain_index) { + Ok(grain) => { + result = Some(grain.into()); + true + } + Err(Error::OutOfRangeTooEarly) | Err(Error::NotReady) => false, + Err(error) => panic!("unexpected target grain read status: {error}"), + }, + "target grain did not become visible in 5 seconds", + ); + result.unwrap() +} + +fn wait_for_target_samples( + reader: &SamplesReader, + head_index: u64, + count: usize, +) -> OwnedSamplesData { + let mut result = None; + poll_until_success( + || match reader.get_samples_non_blocking(head_index, count) { + Ok(samples) => { + result = Some(samples.into()); + true + } + Err(Error::OutOfRangeTooEarly) | Err(Error::NotReady) => false, + Err(error) => panic!("unexpected target samples read status: {error}"), + }, + "target samples did not become visible in 5 seconds", + ); + result.unwrap() +} + +fn create_video_flow(mxl_instance: &mxl::MxlInstance) -> (FlowWriter, FlowReader, mxl::FlowConfigInfo) { + create_flow_with_def(mxl_instance, read_flow_def("lib/tests/data/v210_flow.json")) +} + +fn create_video_flow_with_unique_id( + mxl_instance: &mxl::MxlInstance, +) -> (FlowWriter, FlowReader, mxl::FlowConfigInfo) { + create_flow_with_def( + mxl_instance, + flow_def_with_fresh_id(&read_flow_def("lib/tests/data/v210_flow.json")), + ) +} + +fn create_audio_flow(mxl_instance: &mxl::MxlInstance) -> (FlowWriter, FlowReader, mxl::FlowConfigInfo) { + create_flow_with_def(mxl_instance, read_flow_def("lib/tests/data/audio_flow.json")) +} + +fn create_audio_flow_with_unique_id( + mxl_instance: &mxl::MxlInstance, +) -> (FlowWriter, FlowReader, mxl::FlowConfigInfo) { + create_flow_with_def( + mxl_instance, + flow_def_with_fresh_id(&read_flow_def("lib/tests/data/audio_flow.json")), + ) +} + +fn flow_def_with_fresh_id(flow_def: &str) -> String { + let mut value: serde_json::Value = serde_json::from_str(flow_def).unwrap(); + value["id"] = serde_json::Value::String(uuid::Uuid::new_v4().to_string()); + serde_json::to_string(&value).unwrap() +} + +fn create_flow_with_def( + mxl_instance: &mxl::MxlInstance, + flow_def: String, +) -> (FlowWriter, FlowReader, mxl::FlowConfigInfo) { + let (flow_writer, flow_config, was_created) = mxl_instance + .create_flow_writer(flow_def.as_str(), None) + .unwrap(); + assert!(was_created); + + let flow_id = flow_config.common().id().to_string(); + let flow_reader = mxl_instance.create_flow_reader(flow_id.as_str()).unwrap(); + + (flow_writer, flow_reader, flow_config) +} + +fn fill_grain_payload(payload: &mut [u8]) { + for (index, byte) in payload.iter_mut().enumerate() { + *byte = (index % 251) as u8; + } +} + +fn fill_samples_payload(writer: &mut mxl::SamplesWriteAccess<'_>) { + for channel in 0..writer.channels() { + let (first, second) = writer.channel_data_mut(channel).unwrap(); + for (index, byte) in first.iter_mut().enumerate() { + *byte = (channel as u8).wrapping_mul(17).wrapping_add(index as u8); + } + for (index, byte) in second.iter_mut().enumerate() { + *byte = (channel as u8) + .wrapping_mul(29) + .wrapping_add(index as u8) + .wrapping_add(3); + } + } +} + +#[test] +fn provider_tcp_roundtrip() { + let (mxl_instance, _domain_guard) = setup_test("provider_tcp_roundtrip"); + + { + let fabrics_api = load_fabrics_test_api(); + let fabrics_instance = mxl_instance.create_fabrics_instance(&fabrics_api).unwrap(); + let provider = fabrics_instance.provider_from_str("tcp").unwrap(); + + assert_eq!(provider.to_string().unwrap(), "tcp"); + } + + mxl_instance.destroy().unwrap(); +} + +#[test] +fn target_info_roundtrip() { + let (mxl_instance, _domain_guard) = setup_test("target_info_roundtrip"); + + { + let fabrics_api = load_fabrics_test_api(); + let fabrics_instance = mxl_instance.create_fabrics_instance(&fabrics_api).unwrap(); + let (flow_writer, _flow_reader, _flow_config) = create_video_flow(&mxl_instance); + + let provider = fabrics_instance.provider_from_str("tcp").unwrap(); + let target = fabrics_instance.create_target().unwrap(); + let config = target::Config::new(tcp_endpoint(), provider, &flow_writer); + let (_target, target_info) = target.setup(&config).unwrap(); + + let serialized = target_info.to_string().unwrap(); + let deserialized = fabrics_instance.target_info_from_str(&serialized).unwrap(); + + assert_eq!(serialized, deserialized.to_string().unwrap()); + } + + mxl_instance.destroy().unwrap(); +} + +#[test] +fn tcp_grain_transfer_delivers_payload_to_target_flow() { + let (mxl_instance, _domain_guard) = setup_test("tcp_grain_transfer"); + + { + let fabrics_api = load_fabrics_test_api(); + let fabrics_instance = mxl_instance.create_fabrics_instance(&fabrics_api).unwrap(); + + let (source_flow_writer, source_flow_reader, source_flow_config) = + create_video_flow(&mxl_instance); + let (target_flow_writer, target_flow_reader, _target_flow_config) = + create_video_flow_with_unique_id(&mxl_instance); + + let source_grain_writer: GrainWriter = source_flow_writer.to_grain_writer().unwrap(); + let source_grain_reader: GrainReader = source_flow_reader.to_grain_reader().unwrap(); + let target_grain_reader: GrainReader = target_flow_reader.to_grain_reader().unwrap(); + + let (target, target_info) = { + let target_provider = fabrics_instance.provider_from_str("tcp").unwrap(); + let target = fabrics_instance.create_target().unwrap(); + let target_config = + target::Config::new(tcp_endpoint(), target_provider, &target_flow_writer); + target.setup(&target_config).unwrap() + }; + let target_grain_writer: GrainWriter = target_flow_writer.to_grain_writer().unwrap(); + let target = match target.specialize(&source_flow_config) { + target::Either::Grain(target) => target, + target::Either::Sample(_) => panic!("expected grain target for video flow"), + }; + + let initiator_flow_reader = mxl_instance + .create_flow_reader(source_flow_config.common().id().to_string().as_str()) + .unwrap(); + let initiator = { + let initiator_provider = fabrics_instance.provider_from_str("tcp").unwrap(); + let initiator = fabrics_instance.create_initiator().unwrap(); + let initiator_config = initiator::Config::new( + tcp_endpoint(), + initiator_provider, + &initiator_flow_reader, + ); + initiator.setup(&initiator_config).unwrap() + }; + let initiator = match initiator.specialize(&source_flow_config) { + initiator::Either::Grain(initiator) => initiator, + initiator::Either::Samples(_) => panic!("expected grain initiator for video flow"), + }; + + initiator.add_target(&target_info).unwrap(); + wait_for_grain_connection(&target, &initiator); + + let grain_index = + mxl_instance.get_current_index(&source_flow_config.common().grain_rate().unwrap()); + let mut grain = source_grain_writer.open_grain(grain_index).unwrap(); + fill_grain_payload(grain.payload_mut()); + let total_slices = grain.total_slices(); + grain.commit(total_slices).unwrap(); + + let expected: OwnedGrainData = source_grain_reader + .get_complete_grain(grain_index, POLL_TIMEOUT) + .unwrap() + .into(); + + wait_for_grain_transfer_start(&target, &initiator, grain_index, total_slices); + let completed_index = wait_for_grain_transfer_completion(&target, &initiator, grain_index); + + let committed_grain = target_grain_writer.open_grain(completed_index).unwrap(); + let committed_slices = committed_grain.valid_slices(); + committed_grain.commit(committed_slices).unwrap(); + + let actual = wait_for_target_grain(&target_grain_reader, grain_index); + assert_eq!(actual.payload, expected.payload); + + initiator.remove_target(&target_info).unwrap(); + } + + mxl_instance.destroy().unwrap(); +} + +#[test] +fn tcp_samples_transfer_delivers_payload_to_target_flow() { + let (mxl_instance, _domain_guard) = setup_test("tcp_samples_transfer"); + + { + let fabrics_api = load_fabrics_test_api(); + let fabrics_instance = mxl_instance.create_fabrics_instance(&fabrics_api).unwrap(); + + let (source_flow_writer, source_flow_reader, source_flow_config) = + create_audio_flow(&mxl_instance); + let (target_flow_writer, target_flow_reader, _target_flow_config) = + create_audio_flow_with_unique_id(&mxl_instance); + + let source_samples_writer: SamplesWriter = source_flow_writer.to_samples_writer().unwrap(); + let source_samples_reader: SamplesReader = source_flow_reader.to_samples_reader().unwrap(); + let target_samples_reader: SamplesReader = target_flow_reader.to_samples_reader().unwrap(); + + let (target, target_info) = { + let target_provider = fabrics_instance.provider_from_str("tcp").unwrap(); + let target = fabrics_instance.create_target().unwrap(); + let target_config = + target::Config::new(tcp_endpoint(), target_provider, &target_flow_writer); + target.setup(&target_config).unwrap() + }; + let target_samples_writer: SamplesWriter = target_flow_writer.to_samples_writer().unwrap(); + let target = match target.specialize(&source_flow_config) { + target::Either::Sample(target) => target, + target::Either::Grain(_) => panic!("expected samples target for audio flow"), + }; + + let initiator_flow_reader = mxl_instance + .create_flow_reader(source_flow_config.common().id().to_string().as_str()) + .unwrap(); + let initiator = { + let initiator_provider = fabrics_instance.provider_from_str("tcp").unwrap(); + let initiator = fabrics_instance.create_initiator().unwrap(); + let initiator_config = + initiator::Config::new(tcp_endpoint(), initiator_provider, &initiator_flow_reader); + initiator.setup(&initiator_config).unwrap() + }; + let initiator = match initiator.specialize(&source_flow_config) { + initiator::Either::Samples(initiator) => initiator, + initiator::Either::Grain(_) => panic!("expected samples initiator for audio flow"), + }; + + initiator.add_target(&target_info).unwrap(); + wait_for_samples_connection(&target, &initiator); + + let head_index = + mxl_instance.get_current_index(&source_flow_config.common().sample_rate().unwrap()); + let mut samples = source_samples_writer + .open_samples(head_index, AUDIO_SAMPLE_COUNT) + .unwrap(); + fill_samples_payload(&mut samples); + samples.commit().unwrap(); + + let expected: OwnedSamplesData = source_samples_reader + .get_samples(head_index, AUDIO_SAMPLE_COUNT, POLL_TIMEOUT) + .unwrap() + .into(); + + wait_for_samples_transfer_start(&target, &initiator, head_index, AUDIO_SAMPLE_COUNT); + let (completed_head_index, completed_count) = wait_for_samples_transfer_completion( + &target, + &initiator, + head_index, + AUDIO_SAMPLE_COUNT, + ); + + let committed_samples = target_samples_writer + .open_samples(completed_head_index, completed_count) + .unwrap(); + committed_samples.commit().unwrap(); + + let actual = wait_for_target_samples(&target_samples_reader, head_index, AUDIO_SAMPLE_COUNT); + assert_eq!(actual.payload, expected.payload); + + initiator.remove_target(&target_info).unwrap(); + } + + mxl_instance.destroy().unwrap(); +}