diff --git a/Cargo.lock b/Cargo.lock index 0bac7b3374..b950a2886a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,9 +39,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" +checksum = "d664a92ecae85fd0a7392615844904654d1d5f5514837f471ddef4a057aba1b6" dependencies = [ "anstyle", "anstyle-parse", @@ -59,58 +59,58 @@ checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anstyle-parse" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.0.0" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" dependencies = [ - "windows-sys", + "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.1" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] name = "anyhow" -version = "1.0.75" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +checksum = "ca87830a3e3fb156dc96cfbd31cb620265dd053be734723f22b760d6cc3c3051" [[package]] name = "async-lock" -version = "3.0.0" +version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45e900cdcd39bb94a14487d3f7ef92ca222162e6c7c3fe7cb3550ea75fb486ed" +checksum = "7125e42787d53db9dd54261812ef17e937c95a51e4d291373b670342fa44310c" dependencies = [ - "event-listener", + "event-listener 4.0.2", "event-listener-strategy", "pin-project-lite", ] [[package]] name = "async-trait" -version = "0.1.74" +version = "0.1.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +checksum = "531b97fb4cd3dfdce92c35dedbfdc1f0b9d8091c8ca943d6dae340ef5012d514" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] @@ -165,7 +165,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.38", + "syn 2.0.45", "which", ] @@ -188,7 +188,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.38", + "syn 2.0.45", "which", ] @@ -312,9 +312,9 @@ dependencies = [ [[package]] name = "clang-sys" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c688fc74432808e3eb684cae8830a86be1d66a2bd58e1f248ed0960a590baf6f" +checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1" dependencies = [ "glob", "libc", @@ -323,9 +323,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.7" +version = "4.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac495e00dcec98c83465d5ad66c5c4fabd652fd6686e7c6269b117e729a6f17b" +checksum = "dcfab8ba68f3668e89f6ff60f5b205cea56aa7b769451a59f34b8682f51c056d" dependencies = [ "clap_builder", "clap_derive", @@ -333,9 +333,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.7" +version = "4.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c77ed9a32a62e6ca27175d00d29d05ca32e396ea1eb5fb01d8256b669cec7663" +checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9" dependencies = [ "anstream", "anstyle", @@ -352,7 +352,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] @@ -369,9 +369,9 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "concurrent-queue" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" dependencies = [ "crossbeam-utils", ] @@ -384,9 +384,9 @@ checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" [[package]] name = "core-foundation" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" dependencies = [ "core-foundation-sys", "libc", @@ -394,9 +394,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.4" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" [[package]] name = "cpufeatures" @@ -418,9 +418,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" dependencies = [ "cfg-if", ] @@ -452,11 +452,17 @@ dependencies = [ "zip", ] +[[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + [[package]] name = "deranged" -version = "0.3.9" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", ] @@ -493,21 +499,38 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" -version = "0.3.5" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] name = "event-listener" -version = "3.0.1" +version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cec0252c2afff729ee6f00e903d479fba81784c8e2bd77447673471fdfaea1" +checksum = "218a870470cce1469024e9fb66b901aa983929d81304a1cdb299f28118e550d5" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72557800024fabbaa2449dd4bf24e37b93702d457a4d4f2b0dd1f0f039f20c1" dependencies = [ "concurrent-queue", "parking", @@ -516,11 +539,11 @@ dependencies = [ [[package]] name = "event-listener-strategy" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96b852f1345da36d551b9473fa1e2b1eb5c5195585c6c018118bc92a8d91160" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" dependencies = [ - "event-listener", + "event-listener 4.0.2", "pin-project-lite", ] @@ -551,6 +574,33 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + +[[package]] +name = "fluvio-wasm-timer" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b768c170dc045fa587a8f948c91f9bcfb87f774930477c6215addf54317f137f" +dependencies = [ + "futures", + "js-sys", + "parking_lot 0.11.2", + "pin-utils", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "fnv" version = "1.0.7" @@ -574,18 +624,18 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] [[package]] name = "futures" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -598,9 +648,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -608,15 +658,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -625,38 +675,38 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -682,20 +732,22 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] name = "gimli" -version = "0.28.0" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "glob" @@ -705,16 +757,16 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.21" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" dependencies = [ "bytes", "fnv", "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", "indexmap", "slab", "tokio", @@ -747,9 +799,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.12.3" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" [[package]] name = "heck" @@ -780,18 +832,29 @@ dependencies = [ [[package]] name = "home" -version = "0.5.5" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" dependencies = [ - "windows-sys", + "windows-sys 0.52.0", ] [[package]] name = "http" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" dependencies = [ "bytes", "fnv", @@ -800,12 +863,12 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.11", "pin-project-lite", ] @@ -823,22 +886,22 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.14.27" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", "futures-core", "futures-util", "h2", - "http", + "http 0.2.11", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2", "tokio", "tower-service", "tracing", @@ -860,9 +923,9 @@ dependencies = [ [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -881,11 +944,11 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.9.3" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ - "autocfg", + "equivalent", "hashbrown", ] @@ -898,6 +961,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -906,9 +981,9 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "itoa" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "jobserver" @@ -921,9 +996,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.65" +version = "0.3.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8" +checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca" dependencies = [ "wasm-bindgen", ] @@ -942,45 +1017,45 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.149" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "libloading" -version = "0.7.4" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" +checksum = "c571b676ddfc9a8c12f1f3d3085a7b163966a8fd8098a90640953ce5f6170161" dependencies = [ "cfg-if", - "winapi", + "windows-sys 0.48.0", ] [[package]] name = "linkme" -version = "0.3.17" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ed2ee9464ff9707af8e9ad834cffa4802f072caad90639c583dd3c62e6e608" +checksum = "d3ae8aae8e1d516e0a3ceee1219eded7f73741607e4227bf11ef2c3e31580427" dependencies = [ "linkme-impl", ] [[package]] name = "linkme-impl" -version = "0.3.17" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba125974b109d512fccbc6c0244e7580143e460895dfd6ea7f8bbb692fd94396" +checksum = "ad083d767be37e709a232ae2a244445ed032bb9c6bf7d9442dd416ba5a7b7264" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] name = "linux-raw-sys" -version = "0.4.10" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" +checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" [[package]] name = "lock_api" @@ -1000,9 +1075,9 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "memchr" -version = "2.6.4" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "mime" @@ -1027,13 +1102,22 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", +] + +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", ] [[package]] @@ -1071,17 +1155,23 @@ dependencies = [ ] [[package]] -name = "nt" -version = "0.1.0" +name = "nt-rs" +version = "0.0.1" dependencies = [ - "anyhow", - "bindgen 0.66.1", - "build-utils", - "bytes", - "reqwest", - "tempfile", + "event-listener 5.0.0", + "flume", + "fluvio-wasm-timer", + "futures", + "http 1.0.0", + "rmp", + "serde", + "serde_json", + "thiserror", "tokio", - "zip", + "tokio-tungstenite", + "tungstenite", + "wasm-bindgen-futures", + "web-sys", ] [[package]] @@ -1094,6 +1184,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-traits" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -1106,24 +1205,24 @@ dependencies = [ [[package]] name = "object" -version = "0.32.1" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" dependencies = [ "memchr", ] [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" -version = "0.10.58" +version = "0.10.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9dfc0783362704e97ef3bd24261995a699468440099ef95d869b4d9732f829a" +checksum = "8cde4d2d9200ad5909f8dac647e29482e07c3a35de8a13fce7c9c7747ad9f671" dependencies = [ "bitflags 2.4.1", "cfg-if", @@ -1142,7 +1241,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] @@ -1153,9 +1252,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.94" +version = "0.9.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f55da20b29f956fb01f0add8683eb26ee13ebe3ebd935e49898717c6b4b2830" +checksum = "c1665caf8ab2dc9aef43d1c0023bd904633a6a05cb30b0ad59bec2ae986e57a7" dependencies = [ "cc", "libc", @@ -1175,6 +1274,17 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1182,7 +1292,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.9", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -1193,9 +1317,9 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "smallvec", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -1209,6 +1333,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "pbkdf2" version = "0.11.0" @@ -1229,9 +1359,9 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project" @@ -1250,7 +1380,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] @@ -1267,9 +1397,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.27" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" [[package]] name = "pnet_base" @@ -1332,23 +1462,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "2dd5e8a1f1029c43224ad5898e50140c2aebb1705f19e67c918ebf5b9e797fe1" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.33" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "22a37c9326af5ed140c86a46655b5278de879853be5573c01df185b6f49a580a" dependencies = [ "proc-macro2", ] @@ -1383,6 +1513,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -1423,9 +1562,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.22" +version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ "base64", "bytes", @@ -1433,7 +1572,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.11", "http-body", "hyper", "hyper-tls", @@ -1476,6 +1615,17 @@ dependencies = [ "zip", ] +[[package]] +name = "rmp" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + [[package]] name = "robotrs" version = "0.1.0" @@ -1487,8 +1637,8 @@ dependencies = [ "hal-sys", "impl-trait-for-tuples", "linkme", - "nt", - "parking_lot", + "nt-rs", + "parking_lot 0.12.1", "pin-project", "thiserror", "tracing", @@ -1509,30 +1659,30 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" -version = "0.38.21" +version = "0.38.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3" +checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" dependencies = [ "bitflags 2.4.1", "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] name = "ryu" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" [[package]] name = "schannel" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" dependencies = [ - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1566,29 +1716,29 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.190" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.190" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] name = "serde_json" -version = "1.0.108" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +checksum = "cb0652c533506ad7a2e353cce269330d6afd8bdfb6d75e0ace5b35aacbd7b9e9" dependencies = [ "itoa", "ryu", @@ -1664,28 +1814,27 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.1" +version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" [[package]] name = "socket2" -version = "0.4.10" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", - "winapi", + "windows-sys 0.48.0", ] [[package]] -name = "socket2" -version = "0.5.5" +name = "spin" +version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" dependencies = [ - "libc", - "windows-sys", + "lock_api", ] [[package]] @@ -1707,10 +1856,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af341b2be485d647b5dc4cfb2da99efac35b5c95748a08fb7233480fedc5ead3" dependencies = [ "hex", - "parking_lot", + "parking_lot 0.12.1", "pnet_packet", "rand", - "socket2 0.5.5", + "socket2", "thiserror", "tokio", "tracing", @@ -1729,9 +1878,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.38" +version = "2.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" +checksum = "0eae3c679c56dc214320b67a1bc04ef3dfbd6411f6443974b5e4893231298e66" dependencies = [ "proc-macro2", "quote", @@ -1761,35 +1910,35 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.8.1" +version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" +checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", + "redox_syscall 0.4.1", "rustix", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] name = "thiserror" -version = "1.0.50" +version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" +checksum = "b2cd5904763bad08ad5513ddbb12cf2ae273ca53fa9f68e843e236ec6dfccc09" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.50" +version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" +checksum = "3dcf4a824cce0aeacd6f38ae6f24234c8e80d68632338ebaa1443b5df9e29e19" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] @@ -1804,9 +1953,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" dependencies = [ "deranged", "powerfmt", @@ -1837,32 +1986,32 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.35.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" dependencies = [ "backtrace", "bytes", "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.5", + "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] @@ -1875,6 +2024,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -1914,7 +2075,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", ] [[package]] @@ -1929,9 +2090,9 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.4" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ "log", "once_cell", @@ -1940,9 +2101,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "nu-ansi-term", "sharded-slab", @@ -1954,9 +2115,28 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.4" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + +[[package]] +name = "tungstenite" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.0.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] [[package]] name = "typenum" @@ -1966,9 +2146,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unicode-bidi" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" [[package]] name = "unicode-ident" @@ -1987,15 +2167,21 @@ dependencies = [ [[package]] name = "url" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", "idna", "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" @@ -2037,9 +2223,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" +checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -2047,24 +2233,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" +checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" -version = "0.4.38" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9afec9963e3d0994cac82455b2b3502b81a7f40f9a0d32181f7528d9f4b43e02" +checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" dependencies = [ "cfg-if", "js-sys", @@ -2074,9 +2260,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" +checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2084,28 +2270,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" +checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.45", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" +checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "web-sys" -version = "0.3.65" +version = "0.3.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85" +checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f" dependencies = [ "js-sys", "wasm-bindgen", @@ -2151,7 +2337,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", ] [[package]] @@ -2160,13 +2355,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -2175,42 +2385,84 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winreg" version = "0.50.0" @@ -2218,7 +2470,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ "cfg-if", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] diff --git a/nt/Cargo.toml b/nt/Cargo.toml index a5f561e2a0..0e9a2c4eb5 100644 --- a/nt/Cargo.toml +++ b/nt/Cargo.toml @@ -1,16 +1,33 @@ [package] -name = "nt" -version = "0.1.0" +name = "nt-rs" +version = "0.0.1" edition = "2021" +license = "MIT" +description = "An NT 4.0 (and parially 4.1) client" +homepage = "https://github.com/BlueZeeKing/robotrs" +repository = "https://github.com/BlueZeeKing/robotrs" +readme = "README.md" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[build-dependencies] -anyhow = "1.0.75" -bindgen = "0.66.1" -bytes = "1.4.0" -reqwest = "0.11.20" -tempfile = "3.8.0" -tokio = { version = "1.32.0", features = ["full"] } -zip = "0.6.6" -build-utils = { path = "../build-utils" } +[features] +default = ["tokio"] +tokio = ["dep:tokio", "dep:http", "dep:tokio-tungstenite", "dep:tungstenite"] +wasm = ["dep:web-sys", "dep:wasm-bindgen-futures", "dep:fluvio-wasm-timer"] + +[dependencies] +flume = "0.11.0" +futures = "0.3.30" +rmp = "0.8.12" +serde = { version = "1.0.193", features = ["derive"] } +serde_json = "1.0.108" +thiserror = "1.0.53" +http = { version = "1.0.0", optional = true } +tokio = { version = "1.35.1", features = ["full"], optional = true } +tokio-tungstenite = { version = "0.21.0", optional = true } +tungstenite = { version = "0.21.0", optional = true } +web-sys = { version = "0.3.66", features = ["WebSocket", "BinaryType", "Blob", "MessageEvent", "Window", "Performance", "console"], optional = true } +wasm-bindgen-futures = { version = "0.4.39", optional = true } +fluvio-wasm-timer = { version = "0.2.5", optional = true } +event-listener = "5.0.0" diff --git a/nt/README.md b/nt/README.md new file mode 100644 index 0000000000..f42ca470b2 --- /dev/null +++ b/nt/README.md @@ -0,0 +1 @@ +# NT-rs diff --git a/nt/build.rs b/nt/build.rs deleted file mode 100644 index af24eea295..0000000000 --- a/nt/build.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::path::Path; - -use anyhow::Result; -use build_utils::{ - artifact::Artifact, - build, -}; - -const MAVEN: &str = "https://frcmaven.wpi.edu/artifactory/release/"; - -#[tokio::main] -async fn main() -> Result<()> { - let headers = vec![ - Artifact::builder() - .group_id("edu.wpi.first.hal".to_owned()) - .artifact_id("hal-cpp".to_owned()) - .version(build_utils::WPI_VERSION.to_owned()) - .maven_url(MAVEN.to_owned()) - .build()?, - Artifact::builder() - .group_id("edu.wpi.first.wpiutil".to_owned()) - .artifact_id("wpiutil-cpp".to_owned()) - .version(build_utils::WPI_VERSION.to_owned()) - .maven_url(MAVEN.to_owned()) - .build()?, - Artifact::builder() - .group_id("edu.wpi.first.wpimath".to_owned()) - .artifact_id("wpimath-cpp".to_owned()) - .version(build_utils::WPI_VERSION.to_owned()) - .maven_url(MAVEN.to_owned()) - .build()?, - Artifact::builder() - .group_id("edu.wpi.first.ntcore".to_owned()) - .artifact_id("ntcore-cpp".to_owned()) - .version(build_utils::WPI_VERSION.to_owned()) - .maven_url(MAVEN.to_owned()) - .lib_name("ntcore".to_owned()) - .build()?, - Artifact::builder() - .group_id("edu.wpi.first.wpinet".to_owned()) - .artifact_id("wpinet-cpp".to_owned()) - .version(build_utils::WPI_VERSION.to_owned()) - .maven_url(MAVEN.to_owned()) - .lib_name("wpinet".to_owned()) - .build()?, - ]; - - build(&headers, "NT_.*", Path::new("ntcore.h")).await -} diff --git a/nt/src/backends.rs b/nt/src/backends.rs new file mode 100644 index 0000000000..e64aa0cef3 --- /dev/null +++ b/nt/src/backends.rs @@ -0,0 +1,4 @@ +#[cfg(feature = "tokio")] +pub mod tokio; +#[cfg(feature = "wasm")] +pub mod wasm; diff --git a/nt/src/backends/tokio.rs b/nt/src/backends/tokio.rs new file mode 100644 index 0000000000..ed0b12a32c --- /dev/null +++ b/nt/src/backends/tokio.rs @@ -0,0 +1,162 @@ +use core::panic; +use flume::RecvError; +use futures::{sink::SinkExt, stream::StreamExt, FutureExt}; +use http::{header::SEC_WEBSOCKET_PROTOCOL, uri::InvalidUri, Request}; +use std::{io::Cursor, str::FromStr, time::Duration}; +use tokio::{select, task::JoinHandle}; +use tokio_tungstenite::connect_async; +use tungstenite::{handshake::client::generate_key, Message}; + +use http::Uri; + +use crate::{ + types::{BinaryMessage, BinaryMessageError, TextMessage}, + Backend, Error, Result, Timer, +}; + +#[derive(Debug, Error)] +pub enum TokioError { + #[error("Encountered an http error: {0}")] + Http(#[from] http::Error), + #[error("Encountered a websocket error: {0}")] + Websocket(#[from] tungstenite::Error), + #[error("Invalid uri error: {0}")] + Uri(#[from] InvalidUri), + #[error("Server does not support the nt v4.0 protocol")] + UnsupportedServer, + #[error("Error while encoding or decoding a binary message: {0}")] + BinaryMessage(#[from] BinaryMessageError), + #[error("Error while encoding or decoding a text message: {0}")] + TextMessage(#[from] serde_json::Error), + #[error("Error while sending a message")] + Send, + #[error("Error while receiving a message: {0}")] + Receive(#[from] RecvError), + #[error("Other error occured: {0}")] + Other(Box), + #[error("Encountered an unknown frame")] + UnknownFrame, + #[error("Encountered an incorrect type")] + Type, +} + +pub struct TokioBackend {} + +impl Backend for TokioBackend { + type Output = JoinHandle<()>; + type Error = TokioError; + + fn create( + host: &str, + name: &str, + send: flume::Sender>, + receive: flume::Receiver, + ) -> Result { + let uri = Uri::from_str(&format!("ws://{host}:5810/nt/{name}"))?; + + let send2 = send.clone(); + + Ok(tokio::spawn(async move { + let req = Request::builder() + .method("GET") + .header("Host", uri.host().unwrap()) + .header("Connection", "Upgrade") + .header("Upgrade", "websocket") + .header("Sec-WebSocket-Version", "13") + .header("Sec-WebSocket-Key", generate_key()) + .header("Sec-WebSocket-Protocol", "networktables.first.wpi.edu") + .uri(uri) + .body(())?; + + let (mut connection, res) = connect_async(req.clone()).await?; + + if res + .headers() + .get(SEC_WEBSOCKET_PROTOCOL) + .ok_or(TokioError::UnsupportedServer)? + != "networktables.first.wpi.edu" + { + return Err(TokioError::UnsupportedServer); + } + + + loop { + select! { + message = receive.recv_async() => { + let message = message?; + + match message { + crate::NtMessage::Text(msg) => connection.send(Message::Text(serde_json::to_string(&[msg])?)).await?, + crate::NtMessage::Binary(msg) => { + let mut buf = Vec::new(); + msg.to_writer(&mut buf)?; + connection.send(Message::Binary(buf)).await? + } + crate::NtMessage::Reconnect(_) => { + loop { + if let Ok((mut new_con, _)) = connect_async(req.clone()).await { + std::mem::swap(&mut new_con, &mut connection); + + send.send(Ok(crate::NtMessage::Reconnect(Some(receive.drain().collect::>())))).map_err(|_| TokioError::Send)?; + + break; + } else { + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }, + } + } + message = connection.next() => { + if matches!(message, Some(Err(_))) || message.is_none() { + loop { + if let Ok((mut new_con, _)) = connect_async(req.clone()).await { + std::mem::swap(&mut new_con, &mut connection); + + send.send(Ok(crate::NtMessage::Reconnect(Some(receive.drain().collect::>())))).map_err(|_| TokioError::Send)?; + + receive.drain().for_each(|_| {}); + + break; + } else { + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + + continue; + } + + let message = message.unwrap()?; + + match message { + Message::Text(msg) => { + let msgs = serde_json::from_str::>(&msg)?; + for msg in msgs { + send.send(Ok(crate::NtMessage::Text(msg))).map_err(|_| TokioError::Send)?; + } + } + Message::Binary(msg) => { + let mut cursor = Cursor::new(msg); + + while (cursor.position() as usize) < cursor.get_ref().len() { + send.send(Ok(crate::NtMessage::Binary(BinaryMessage::from_reader(&mut cursor)?))).map_err(|_| TokioError::Send)?; + } + } + _ => return >::Err(TokioError::UnknownFrame), + } + } + } + } + }.map(move |out| { + if let Err(err) = out { + let _res = send2.send(Err(Error::Other(Box::new(err)))); + } + }))) + } +} + +impl Timer for TokioBackend { + async fn time(duration: std::time::Duration) { + tokio::time::sleep(duration).await; + } +} diff --git a/nt/src/backends/wasm.rs b/nt/src/backends/wasm.rs new file mode 100644 index 0000000000..dbe48ee39e --- /dev/null +++ b/nt/src/backends/wasm.rs @@ -0,0 +1,163 @@ +use std::{io::Cursor, sync::Arc}; + +use event_listener::Event; +use flume::{Receiver, Sender}; +use futures::{select, FutureExt}; +use thiserror::Error; +use wasm_bindgen_futures::{ + js_sys::{ArrayBuffer, JsString, Uint8Array}, + spawn_local, + wasm_bindgen::{closure::Closure, JsCast}, +}; +use web_sys::{wasm_bindgen::JsValue, MessageEvent, WebSocket}; + +use crate::{ + types::{BinaryMessage, TextMessage}, + Backend, NtMessage, Timer, +}; + +pub struct WasmBackend {} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Encountered a JS error")] + Js, +} + +impl From for Error { + fn from(_value: JsValue) -> Self { + Self::Js + } +} + +fn create_msg_callback(send: Sender>) -> Closure { + Closure::::new(move |e: MessageEvent| { + if let Ok(buf) = e.data().dyn_into::() { + let array = Uint8Array::new(&buf); + let mut buf = vec![0; array.length() as usize]; + array.copy_to(&mut buf); + + let mut reader = Cursor::new(buf); + + while (reader.position() as usize) < reader.get_ref().len() { + let _ = send.send(Ok(crate::NtMessage::Binary( + BinaryMessage::from_reader(&mut reader).unwrap(), + ))); + } + } else if let Ok(text) = e.data().dyn_into::() { + let text: String = text.into(); + let msgs = serde_json::from_str::>(&text).unwrap(); + for msg in msgs { + send.send(Ok(crate::NtMessage::Text(msg))).unwrap(); + } + } + }) +} + +fn create_ready_callback( + receive: Receiver, + ws: WebSocket, + event: Arc, +) -> Closure { + Closure::::new(move |_e: MessageEvent| { + let receive2 = receive.clone(); + let ws2 = ws.clone(); + let event2 = event.clone(); + + spawn_local(async move { + select! { + _ = main_loop(receive2, ws2).fuse() => {}, + _ = event2.listen().fuse() => {} + } + }); + }) +} + +async fn main_loop(receive: Receiver, ws: WebSocket) { + loop { + let recv = receive.recv_async().await.unwrap(); + match recv { + crate::NtMessage::Text(msg) => ws + .send_with_str(&serde_json::to_string(&[msg]).unwrap()) + .unwrap(), + crate::NtMessage::Binary(msg) => { + let mut buf = Vec::new(); + msg.to_writer(&mut buf).unwrap(); + ws.send_with_u8_array(&buf).unwrap(); + } + crate::NtMessage::Reconnect(_) => { + ws.close().unwrap(); + } + } + } +} + +fn create_close_callback( + send: Sender>, + receive: Receiver, + url: String, + close_event: Arc, +) -> Closure { + Closure::::new(move |_e: MessageEvent| { + close_event.notify(u32::MAX); + let ws = WebSocket::new_with_str(&url, "networktables.first.wpi.edu").unwrap(); + + ws.set_binary_type(web_sys::BinaryType::Arraybuffer); + + let new_close_event = Arc::new(Event::new()); + + let msg_callback = create_msg_callback(send.clone()); + let ready_callback = + create_ready_callback(receive.clone(), ws.clone(), new_close_event.clone()); + let close_callback = + create_close_callback(send.clone(), receive.clone(), url.clone(), new_close_event); + + ws.set_onmessage(Some(msg_callback.as_ref().unchecked_ref())); + ws.set_onopen(Some(ready_callback.as_ref().unchecked_ref())); + ws.set_onclose(Some(close_callback.as_ref().unchecked_ref())); + + msg_callback.forget(); + ready_callback.forget(); + close_callback.forget(); + }) +} + +impl Backend for WasmBackend { + type Output = (); + type Error = Error; + + fn create( + host: &str, + name: &str, + send: flume::Sender>, + receive: flume::Receiver, + ) -> std::result::Result { + let url = format!("ws://{host}:5810/nt/{name}"); + let ws = WebSocket::new_with_str(&url, "networktables.first.wpi.edu")?; + + let close_event = Arc::new(Event::new()); + + ws.set_binary_type(web_sys::BinaryType::Arraybuffer); + + let msg_callback = create_msg_callback(send.clone()); + let ready_callback = + create_ready_callback(receive.clone(), ws.clone(), close_event.clone()); + let close_callback = create_close_callback(send, receive, url, close_event); + + ws.set_onmessage(Some(msg_callback.as_ref().unchecked_ref())); + ws.set_onopen(Some(ready_callback.as_ref().unchecked_ref())); + ws.set_onclose(Some(close_callback.as_ref().unchecked_ref())); + + msg_callback.forget(); + ready_callback.forget(); + close_callback.forget(); + + Ok(()) + } +} + +impl Timer for WasmBackend { + async fn time(duration: std::time::Duration) { + let _ = fluvio_wasm_timer::Delay::new(duration).await; + } +} diff --git a/nt/src/inner.rs b/nt/src/inner.rs new file mode 100644 index 0000000000..6f90c94e10 --- /dev/null +++ b/nt/src/inner.rs @@ -0,0 +1,429 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicI64, AtomicU32, AtomicU64}, + Mutex, + }, + time::Duration, +}; + +use flume::{unbounded, Receiver, Sender}; +use futures::{select, FutureExt}; + +use crate::{ + time::get_time, + types::{BinaryData, BinaryMessage, Properties, SubscriptionOptions, TextMessage}, + Backend, Error, NtMessage, Result, SubscriberUpdate, Timer, +}; + +pub struct Topics { + pub topics: HashMap>, + pub topic_ids: HashMap, + + pub prefix_channels: HashMap)>>, + + pub publishers: HashMap, + pub subscribers: HashMap, +} + +pub struct PublisherData { + pub name: String, + pub data_type: String, +} + +pub struct SubscriberData { + pub name: String, + pub options: SubscriptionOptions, +} + +impl Default for Topics { + fn default() -> Self { + Self { + topics: Default::default(), + topic_ids: Default::default(), + + prefix_channels: Default::default(), + + publishers: Default::default(), + subscribers: Default::default(), + } + } +} + +pub struct InnerNetworkTableClient { + pub receive: Receiver>, + pub send: Sender, + pub topics: Mutex, + pub time_offset: AtomicI64, + + pub subuid: AtomicU32, + pub pubuid: AtomicU32, + + pub last_time_update: AtomicU64, +} + +impl InnerNetworkTableClient { + pub async fn new(host: &str, name: &str) -> Result<(Self, B::Output)> { + let (send_out, receive_out) = unbounded(); + let (send_in, receive_in) = unbounded(); + + let out = match B::create(host, name, send_in, receive_out) { + Ok(out) => out, + Err(err) => return Err(Error::Other(Box::new(err))), + }; + + send_out + .send(NtMessage::Binary(BinaryMessage { + id: -1, + timestamp: 0, + data: BinaryData::Int(get_time() as i64), + })) + .map_err(|_| Error::Send)?; + + let NtMessage::Binary(msg) = receive_in.recv_async().await?? else { + return Err(Error::Type); + }; + + if msg.id != -1 { + return Err(Error::Type); // TODO: Maybe not the right response + } + + let BinaryData::Int(time) = msg.data else { + return Err(Error::Type); + }; + + let server_time = (get_time() as i64 - time) / 2 + msg.timestamp as i64; + let offset = server_time - get_time() as i64; + + Ok(( + Self { + send: send_out, + receive: receive_in, + topics: Mutex::new(Default::default()), + time_offset: AtomicI64::new(offset), + + subuid: AtomicU32::new(u32::MIN), + pubuid: AtomicU32::new(u32::MIN), + + last_time_update: AtomicU64::new(get_time()), + }, + out, + )) + } + + pub fn get_server_time(&self) -> u64 { + let offset = self.time_offset.load(std::sync::atomic::Ordering::Relaxed); + (get_time() as i64 + offset) as u64 + } + + pub async fn main_loop(&self) -> Result<()> { + select! { + res = self.time_loop::().fuse() => { + return res; + } + res = self.recv_loop().fuse() => { + return res; + } + } + } + + pub async fn time_loop(&self) -> Result<()> { + loop { + if get_time() + - self + .last_time_update + .load(std::sync::atomic::Ordering::Relaxed) + > Duration::from_secs(3).as_micros() as u64 + { + self.send + .send(NtMessage::Reconnect(None)) + .map_err(|_| Error::Send)?; + } + + T::time(Duration::from_secs(2)).await; + + self.start_sync_time()?; + } + } + + pub async fn recv_loop(&self) -> Result<()> { + loop { + let val = self.receive.recv_async().await??; + + match val { + NtMessage::Text(msg) => match msg { + TextMessage::Announce { + name, + id, + data_type, + pubuid: _, + properties, + } => { + let mut topics = self.topics.lock().unwrap(); + + if let Some(sender) = topics.topics.get(&name) { + if sender.send(SubscriberUpdate::Type(data_type)).is_err() + || sender + .send(SubscriberUpdate::Properties(properties)) + .is_err() + { + topics.topics.remove(&name); + } else { + topics.topic_ids.insert(id, name); + } + } else { + let (send, receive) = unbounded(); + + topics.topics.insert(name.clone(), send.clone()); + topics.topic_ids.insert(id, name.clone()); + + let Some((_, sender)) = topics + .prefix_channels + .iter() + .find(|(prefix, _)| name.starts_with(prefix.as_str())) + else { + continue; + }; + + send.send(SubscriberUpdate::Type(data_type)).unwrap(); + send.send(SubscriberUpdate::Properties(properties)).unwrap(); + + sender.send((name, receive)).map_err(|_| Error::Send)?; + } + } + TextMessage::Unannounce { name, id } => { + let mut topics = self.topics.lock().unwrap(); + + topics.topics.remove(&name); + topics.topic_ids.remove(&id); + } + TextMessage::Properties { + name, + ack: _, + update, + } => { + let mut topics = self.topics.lock().unwrap(); + + let topic = topics.topics.get(&name); + + if let Some(topic) = topic { + if topic.send(SubscriberUpdate::Properties(update)).is_err() { + topics.topics.remove(&name); + } + } + } + _ => unreachable!("A server-bound message was sent to the client"), + }, + NtMessage::Binary(msg) => { + if msg.id == -1 { + let BinaryData::Int(time) = msg.data else { + return Err(Error::Type); + }; + + let server_time = (get_time() as i64 - time) / 2 + msg.timestamp as i64; + let offset = server_time - get_time() as i64; + + self.time_offset + .fetch_min(offset, std::sync::atomic::Ordering::Relaxed); + self.last_time_update + .store(get_time(), std::sync::atomic::Ordering::Relaxed); + } else { + let mut topics = self.topics.lock().unwrap(); + + let Some(name) = topics.topic_ids.get(&(msg.id as u32)) else { + topics.topic_ids.remove(&(msg.id as u32)); + continue; + }; + + let is_sender_dropped = topics + .topics + .get(name) + .map(|topic| topic.send(SubscriberUpdate::Data(msg.data)).is_err()) + .unwrap_or(false); + + if is_sender_dropped { + let name = name.to_owned(); + topics.topics.remove(&name); + topics.topic_ids.remove(&(msg.id as u32)); + } + } + } + NtMessage::Reconnect(backlog) => { + let backlog = backlog.unwrap(); + + self.last_time_update + .store(get_time(), std::sync::atomic::Ordering::Relaxed); + + self.send + .send(NtMessage::Binary(BinaryMessage { + id: -1, + timestamp: 0, + data: BinaryData::Int(get_time() as i64), + })) + .map_err(|_| Error::Send)?; + + let mut msg = self.receive.recv_async().await??; + + while !matches!(&msg, NtMessage::Binary(msg) if msg.id == -1) { + msg = self.receive.recv_async().await??; + } + + let NtMessage::Binary(msg) = msg else { + unreachable!(); + }; + + let BinaryData::Int(time) = msg.data else { + return Err(Error::Type); + }; + + let server_time = (get_time() as i64 - time) / 2 + msg.timestamp as i64; + let offset = server_time - get_time() as i64; + + self.time_offset + .store(offset, std::sync::atomic::Ordering::Relaxed); + + let mut topics = self.topics.lock().unwrap(); + + topics.topic_ids.clear(); + + for (subuid, data) in topics.subscribers.iter() { + self.send + .send(NtMessage::Text(TextMessage::Subscribe { + topics: vec![data.name.clone()], + subuid: *subuid, + options: data.options.clone(), + })) + .map_err(|_| Error::Send)?; + } + + for (pubuid, data) in topics.publishers.iter() { + self.send + .send(NtMessage::Text(TextMessage::Publish { + name: data.name.to_owned(), + pubuid: *pubuid, + data_type: data.data_type.to_owned(), + properties: Default::default(), + })) + .map_err(|_| Error::Send)?; + } + + for message in backlog { + match message { + NtMessage::Text(message) => match message { + TextMessage::SetProperties { name, update } => self + .send + .send(NtMessage::Text(TextMessage::SetProperties { + name, + update, + })) + .map_err(|_| Error::Send)?, + TextMessage::Subscribe { + mut topics, + subuid, + options, + } => { + self.topics.lock().unwrap().topic_ids.values().for_each( + |name| { + topics + .iter() + .position(|topic| name == topic) + .map(|idx| topics.remove(idx)); + }, + ); + if topics.len() != 0 { + self.send + .send(NtMessage::Text(TextMessage::Subscribe { + topics, + subuid, + options, + })) + .map_err(|_| Error::Send)?; + } + } + TextMessage::Unsubscribe { subuid } => self + .send + .send(NtMessage::Text(TextMessage::Unsubscribe { subuid })) + .map_err(|_| Error::Send)?, + _ => {} + }, + NtMessage::Binary(mut message) => { + message.timestamp = self.get_server_time(); + + self.send + .send(NtMessage::Binary(message)) + .map_err(|_| Error::Send)?; + } + NtMessage::Reconnect(_) => {} + } + } + } + } + } + } + + pub fn start_sync_time(&self) -> Result<()> { + self.send + .send(NtMessage::Binary(BinaryMessage { + id: -1, + timestamp: 0, + data: BinaryData::Int(get_time() as i64), + })) + .map_err(|_| Error::Send)?; + + Ok(()) + } + + pub fn subscribe(&self, topics: Vec, options: SubscriptionOptions) -> Result { + let id = self.new_subuid(); + self.send + .send(NtMessage::Text(TextMessage::Subscribe { + topics, + subuid: id, + options, + })) + .map_err(|_| Error::Send)?; + + Ok(id) + } + + pub fn unsubscribe(&self, id: u32) -> Result<()> { + self.send + .send(NtMessage::Text(TextMessage::Unsubscribe { subuid: id })) + .map_err(|_| Error::Send)?; + + Ok(()) + } + + pub fn publish(&self, name: String, data_type: String, properties: Properties) -> Result { + let id = self.new_pubuid(); + + self.send + .send(NtMessage::Text(TextMessage::Publish { + name, + pubuid: id, + data_type, + properties, + })) + .map_err(|_| Error::Send)?; + + Ok(id) + } + + pub fn unpublish(&self, id: u32) -> Result<()> { + self.send + .send(NtMessage::Text(TextMessage::Unpublish { pubuid: id })) + .map_err(|_| Error::Send)?; + + Ok(()) + } + + pub fn new_subuid(&self) -> u32 { + self.subuid + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + } + + pub fn new_pubuid(&self) -> u32 { + self.pubuid + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + } +} diff --git a/nt/src/lib.rs b/nt/src/lib.rs index 3caac77963..0eb9b6b3a6 100644 --- a/nt/src/lib.rs +++ b/nt/src/lib.rs @@ -1,4 +1,163 @@ -#[allow(warnings)] -pub mod bindings { - include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +use std::{marker::PhantomData, sync::Arc, time::Duration}; + +use flume::{unbounded, Receiver, RecvError, Sender}; +use futures::Future; +use inner::{InnerNetworkTableClient, PublisherData, SubscriberData}; +use publish::Publisher; +use subscribe::Subscriber; +use thiserror::Error; +use types::{ + payload::Payload, BinaryData, BinaryMessage, Properties, SubscriptionOptions, TextMessage, +}; + +pub mod backends; +pub(crate) mod inner; +pub mod publish; +pub mod subscribe; +pub mod time; +pub mod types; + +/// Any type of message that could be sent or recieved from a websocket +#[derive(Debug)] +pub enum NtMessage { + Text(TextMessage), + Binary(BinaryMessage), + Reconnect(Option>), +} + +pub(crate) enum SubscriberUpdate { + Properties(Properties), + Data(BinaryData), + Type(String), +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Error while sending a message")] + Send, + #[error("Error while receiving a message: {0}")] + Receive(#[from] RecvError), + #[error("Other error occured: {0}")] + Other(Box), + #[error("Encountered an unknown frame")] + UnknownFrame, + #[error("Encountered an incorrect type")] + Type, +} + +type Result = std::result::Result; + +/// A generic timer driver +pub trait Timer { + /// Delay for the specified duration + fn time(duration: Duration) -> impl std::future::Future + Send; +} + +/// A generic backend that a client can use. [backend::TokioBackend] is a good example. +pub trait Backend { + /// A type like a join handle that whatever is using the client might need + type Output; + type Error: std::error::Error + 'static + Send; + + /// Using the hostname and client name create a backend that sends [NtMessage] or [Error] to + /// the client and passes on [NtMessage] to the server + fn create( + host: &str, + name: &str, + send: Sender>, + receive: Receiver, + ) -> std::result::Result; +} + +// An instance of a network table client. Based on an [Arc] internally, so cheap to copy +#[derive(Clone)] +pub struct NetworkTableClient { + inner: Arc, +} + +impl NetworkTableClient { + /// Create a new client using the hostname, client name, and a backend type + pub async fn new(host: &str, name: &str) -> Result<(Self, B::Output)> { + let (inner, out) = InnerNetworkTableClient::new::(host, name).await?; + + Ok(( + Self { + inner: Arc::new(inner), + }, + out, + )) + } + + /// This returns a future that should be run on the side, usually, in an async task. This + /// future must remain alive for as long as subscriber and time updates are required + pub fn main_task(&self) -> impl Future> + 'static { + let inner = self.inner.clone(); + + async move { inner.main_loop::().await } + } + + /// Create a subscriber for a topic with a certain payload type + pub fn subscribe( + &self, + name: String, + options: SubscriptionOptions, + ) -> Result> { + let (sender, receiver) = unbounded(); + + let mut topics = self.inner.topics.lock().unwrap(); + + topics.topics.insert(name.clone(), sender); + + let prefix_children = if options.prefix.unwrap_or(false) { + let (prefix_sender, prefix_reveiver) = unbounded(); + topics.prefix_channels.insert(name.clone(), prefix_sender); + Some(prefix_reveiver) + } else { + None + }; + + let id = self.inner.subscribe(vec![name.clone()], options.clone())?; + + topics.subscribers.insert( + id, + SubscriberData { + name: name.clone(), + options, + }, + ); + + Ok(Subscriber { + name, + properties: None, + input: receiver, + id, + client: self.inner.clone(), + phantom: PhantomData, + prefix_children, + }) + } + + /// Create a publisher for a topic with a certain payload type + pub fn publish(&self, name: String) -> Result> { + let id = self.inner.publish( + name.clone(), + P::name().unwrap().to_owned(), + Default::default(), + )?; + + self.inner.topics.lock().unwrap().publishers.insert( + id, + PublisherData { + name: name.clone(), + data_type: P::name().unwrap().to_owned(), + }, + ); + + Ok(Publisher { + name, + id, + client: self.inner.clone(), + phantom: PhantomData, + }) + } } diff --git a/nt/src/main.rs b/nt/src/main.rs index 3ad3e3116f..69eecf1b2d 100644 --- a/nt/src/main.rs +++ b/nt/src/main.rs @@ -1,32 +1,36 @@ -use std::{ffi::CString, thread, time::Duration}; +use std::time::Duration; -fn main() { - unsafe { - let inst = nt::bindings::NT_GetDefaultInstance(); +use nt_rs::{ + backends::tokio::TokioBackend, + publish::Publisher, + subscribe::Subscriber, + time::init_time, + types::{BinaryData, Properties, SubscriptionOptions}, + NetworkTableClient, +}; +use tokio::time::sleep; - let address = CString::new("/home/lvuser/networktables.json").unwrap(); - let local_path = CString::new("").unwrap(); +#[tokio::main] +async fn main() { + init_time(); - let address_ptr = address.as_ptr(); - let local_path_ptr = local_path.as_ptr(); + let (client, task) = NetworkTableClient::new::("localhost", "rust") + .await + .unwrap(); - std::mem::forget(address); - std::mem::forget(local_path); + { + let client = client; - nt::bindings::NT_StartServer(inst, local_path_ptr, address_ptr, 1735, 5810); + let main = tokio::spawn(client.main_task::()); - let mut amount_skipped: u8 = 0; + let publish: Publisher = client.publish("/SmartDashboard/val".to_owned()).unwrap(); - while nt::bindings::NT_GetNetworkMode(inst) - == nt::bindings::NT_NetworkMode_NT_NET_MODE_STARTING - { - thread::sleep(Duration::from_millis(10)); + publish.set(8).unwrap(); - amount_skipped += 1; + main.await; - if amount_skipped > 100 { - panic!("Time out"); - } - } + // main.abort(); // End the main recieve loop } + + task.await.unwrap(); // Wait for the backend to stop } diff --git a/nt/src/publish.rs b/nt/src/publish.rs new file mode 100644 index 0000000000..b2798a5517 --- /dev/null +++ b/nt/src/publish.rs @@ -0,0 +1,54 @@ +use std::{marker::PhantomData, sync::Arc}; + +use crate::{ + inner::InnerNetworkTableClient, + types::{payload::Payload, BinaryMessage, Properties, TextMessage}, + Error, NtMessage, Result, +}; + +pub struct Publisher { + pub(crate) name: String, + pub(crate) id: u32, + pub(crate) client: Arc, + pub(crate) phantom: PhantomData

, +} + +impl Publisher

{ + pub fn set(&self, value: P) -> Result<()> { + self.client + .send + .send(NtMessage::Binary(BinaryMessage { + id: self.id as i64, + timestamp: self.client.get_server_time(), + data: value.to_val(), + })) + .map_err(|_| Error::Send) + } + + pub fn set_properties(&self, props: Properties) -> Result<()> { + self.client + .send + .send(NtMessage::Text(TextMessage::SetProperties { + name: self.name.clone(), + update: props, + })) + .map_err(|_| Error::Send) + } + + pub fn name(&self) -> &str { + &self.name + } +} + +impl Drop for Publisher

{ + fn drop(&mut self) { + let _ = self.client.unpublish(self.id); + + self.client + .topics + .lock() + .unwrap() + .publishers + .remove(&self.id); + } +} diff --git a/nt/src/subscribe.rs b/nt/src/subscribe.rs new file mode 100644 index 0000000000..e52bc048a1 --- /dev/null +++ b/nt/src/subscribe.rs @@ -0,0 +1,197 @@ +use std::{marker::PhantomData, sync::Arc}; + +use flume::Receiver; + +use crate::{ + inner::InnerNetworkTableClient, + types::{payload::Payload, Properties}, + Error, Result, SubscriberUpdate, +}; + +pub struct Subscriber { + pub(crate) name: String, + pub(crate) properties: Option, + pub(crate) input: Receiver, + pub(crate) id: u32, + pub(crate) client: Arc, + pub(crate) prefix_children: Option)>>, + pub(crate) phantom: PhantomData

, +} + +impl Subscriber

{ + fn consume_updates(&mut self) -> Result> { + let mut data = None; + for update in self.input.drain() { + match update { + SubscriberUpdate::Properties(props) => { + if self.properties.is_none() { + self.properties = Some(Default::default()); + } + + self.properties.as_mut().unwrap().update(props); + } + SubscriberUpdate::Data(bin_data) => { + data = Some(P::parse(bin_data).map_err(|_| Error::Type)?); + } + SubscriberUpdate::Type(val) => { + if matches!(P::name(), Some(data_type) if data_type != val) { + return Err(Error::Type); + } + } + } + } + + Ok(data) + } + + /// Wait for a new payload value to become avaliable + pub async fn get(&mut self) -> Result

{ + dbg!("get"); + if !self.input.is_empty() { + if let Some(val) = self.consume_updates()? { + return Ok(val); + } + } + + loop { + let val = self.input.recv_async().await?; + + match val { + SubscriberUpdate::Properties(props) => { + if self.properties.is_none() { + self.properties = Some(Default::default()); + } + + self.properties.as_mut().unwrap().update(props); + } + SubscriberUpdate::Data(val) => { + break P::parse(val).map_err(|_| Error::Type); + } + SubscriberUpdate::Type(ty) => { + if matches!(P::name(), Some(data_type) if data_type != ty) { + return Err(Error::Type); + } + } + } + } + } + + pub fn properties(&self) -> Option<&Properties> { + self.properties.as_ref() + } + + pub fn name(&self) -> &str { + &self.name + } + + pub async fn get_child(&self) -> Result> { + let receiver = self + .prefix_children + .as_ref() + .expect("Tried to get children from a topic wihout the prefix option set"); + + let (name, channel) = receiver.recv_async().await?; + + Ok(PrefixSubscriber { + name, + input: channel, + properties: None, + phantom: PhantomData, + }) + } +} + +impl Drop for Subscriber

{ + fn drop(&mut self) { + let _ = self.client.unsubscribe(self.id); + self.client + .topics + .lock() + .unwrap() + .subscribers + .remove(&self.id); + } +} + +/// A subscriber created as a child of a subscriber +pub struct PrefixSubscriber<'a, P: Payload> { + name: String, + input: Receiver, + properties: Option, + phantom: PhantomData<&'a P>, +} + +impl<'a, P: Payload> PrefixSubscriber<'a, P> { + fn consume_updates(&mut self) -> Result> { + let mut data = None; + for update in self.input.drain() { + match update { + SubscriberUpdate::Properties(props) => { + if self.properties.is_none() { + self.properties = Some(Default::default()); + } + + self.properties.as_mut().unwrap().update(props); + } + SubscriberUpdate::Data(bin_data) => { + data = Some(P::parse(bin_data).map_err(|_| Error::Type)?); + } + SubscriberUpdate::Type(val) => { + if matches!(P::name(), Some(data_type) if data_type != val) { + return Err(Error::Type); + } + } + } + } + + Ok(data) + } + + /// Wait for a new payload value to become avaliable + pub async fn get(&mut self) -> Result

{ + if !self.input.is_empty() { + if let Some(val) = self.consume_updates()? { + return Ok(val); + } + } + + loop { + let val = self.input.recv_async().await?; + + match val { + SubscriberUpdate::Properties(props) => { + if self.properties.is_none() { + self.properties = Some(Default::default()); + } + + self.properties.as_mut().unwrap().update(props); + } + SubscriberUpdate::Data(val) => { + break P::parse(val).map_err(|_| Error::Type); + } + SubscriberUpdate::Type(ty) => { + if matches!(P::name(), Some(data_type) if data_type != ty) { + return Err(Error::Type); + } + } + } + } + } + + pub fn properties(&self) -> Option<&Properties> { + self.properties.as_ref() + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn to_static(self) -> PrefixSubscriber<'static, P> { + PrefixSubscriber { + name: self.name, + input: self.input, + properties: self.properties, + phantom: PhantomData, + } + } +} diff --git a/nt/src/time.rs b/nt/src/time.rs new file mode 100644 index 0000000000..577279fc40 --- /dev/null +++ b/nt/src/time.rs @@ -0,0 +1,24 @@ +#[cfg(not(feature = "wasm"))] +static START_TIME: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub fn get_time() -> u64 { + // TODO: Use FPGA when on robot + #[cfg(not(feature = "wasm"))] + { + START_TIME + .get_or_init(std::time::Instant::now) + .elapsed() + .as_micros() as u64 + } + #[cfg(feature = "wasm")] + { + let millis = web_sys::window().unwrap().performance().unwrap().now(); + + (millis * 1000.0) as u64 + } +} + +pub fn init_time() { + #[cfg(not(feature = "wasm"))] + START_TIME.get_or_init(std::time::Instant::now); +} diff --git a/nt/src/types.rs b/nt/src/types.rs new file mode 100644 index 0000000000..34c9c545ab --- /dev/null +++ b/nt/src/types.rs @@ -0,0 +1,558 @@ +use std::string::FromUtf8Error; + +use rmp::{ + decode::{self, NumValueReadError, ValueReadError}, + encode::{self, ValueWriteError}, +}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +pub mod payload; + +fn should_skip(val: &MissingOrNull) -> bool { + *val == MissingOrNull::Missing +} + +fn skip_none(val: &Option) -> bool { + val.is_none() +} + +/// Each published topic may also have properties associated to it. Properties are represented in +/// the protocol as JSON and thus property values may be any JSON type. Property keys must be +/// strings. The following properties have a defined meaning in this spec. Servers shall support +/// arbitrary properties being set outside of this set. Clients shall ignore properties they do not +/// recognize. Properties are initially set on publish and may be changed (by any client) using +/// [TextMessage::SetProperties] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub struct Properties { + /// If true, the last set value will be periodically saved to persistent storage on the server + /// and be restored during server startup. Topics with this property set to true will not be + /// deleted by the server when the last publisher stops publishing. + #[serde( + with = "missing_or_null_impls", + default, + skip_serializing_if = "should_skip" + )] + pub persistent: MissingOrNull, + + /// Topics with this property set to true will not be deleted by the server when the last + /// publisher stops publishing. + #[serde( + with = "missing_or_null_impls", + default, + skip_serializing_if = "should_skip" + )] + pub retained: MissingOrNull, + // /// If false, the server and clients will not store the value of the topic. This means that + // /// only value updates will be available for the topic. + // #[serde( + // with = "missing_or_null_impls", + // default, + // skip_serializing_if = "should_skip" + // )] + // pub cached: MissingOrNull, +} + +impl Default for Properties { + fn default() -> Self { + Self { + persistent: Default::default(), + retained: Default::default(), + // cached: Default::default(), + } + } +} + +impl Properties { + pub fn update(&mut self, other: Properties) { + self.persistent.update(other.persistent); + self.retained.update(other.retained); + // self.cached.update(other.cached); + } +} + +mod missing_or_null_impls { + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + use super::MissingOrNull; + + pub fn serialize( + value: &MissingOrNull, + serializer: S, + ) -> Result { + >::from(value.to_owned()).serialize(serializer) + } + + pub fn deserialize<'de, D: Deserializer<'de>>( + deserializer: D, + ) -> Result, D::Error> { + >::deserialize(deserializer).map(|option| option.into()) + } +} + +/// Each subscription may have options set. The following options have a defined meaning in this +/// spec. Servers shall preserve arbitrary options, as servers and clients may support arbitrary +/// options outside of this set. Options are set using Subscribe Message ([TextMessage::Subscribe]) +/// and cannot be changed. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SubscriptionOptions { + /// How frequently the server should send changes. The server may send more frequently than + /// this (e.g. use a combined minimum period for all values) or apply a restricted range to + /// this value. The default if unspecified is 100 ms (same as NT 3.0). + #[serde(skip_serializing_if = "skip_none", default)] + pub periodic: Option, + + /// If true, the server should send all value changes over the wire. If false, only the most + /// recent value is sent (same as NT 3.0 behavior). If not specified, defaults to false. + #[serde(skip_serializing_if = "skip_none", default)] + pub all: Option, + + /// If true, the server should not send any value changes over the wire regardless of other + /// options. This is useful for only getting topic announcements. If false, value changes are + /// sent in accordance with other options. If not specified, defaults to false. + #[serde(skip_serializing_if = "skip_none", default)] + pub topicsonly: Option, + + /// If true, any topic starting with the name in the subscription topics list is subscribed to, + /// not just exact matches. If not specified, defaults to false. + #[serde(skip_serializing_if = "skip_none", default)] + pub prefix: Option, +} + +impl SubscriptionOptions { + pub fn periodic(mut self, value: u32) -> Self { + self.periodic = Some(value); + self + } + + pub fn all(mut self, value: bool) -> Self { + self.all = Some(value); + self + } + + pub fn topicsonly(mut self, value: bool) -> Self { + self.topicsonly = Some(value); + self + } + + pub fn prefix(mut self, value: bool) -> Self { + self.prefix = Some(value); + self + } +} + +impl Default for SubscriptionOptions { + fn default() -> Self { + Self { + periodic: None, + all: None, + topicsonly: None, + prefix: None, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "method", content = "params")] +pub enum TextMessage { + /// Sent from a client to the server to indicate the client wants to start publishing values at + /// the given topic. The server shall respond with a Topic Announcement Message + /// ([TextMessage::Announce]), even if the topic was previously announced. The client can start + /// publishing data values via MessagePack messages immediately after sending this message, but + /// the messages will be ignored by the server if the publisher data type does not match the + /// topic data type. + #[serde(rename = "publish")] + Publish { + /// The topic name being published + name: String, + + /// A client-generated unique identifier for this publisher. Use the same UID later to + /// unpublish. This is also the identifier that the client will use in MessagePack messages + /// for this topic. + pubuid: u32, + + /// The requested data type (as a string). + /// + /// If the topic is newly created (e.g. there are no other publishers) this sets the value + /// type. If the topic was previously published, this is ignored. The + /// [TextMessage::Announce] message contains the actual topic value type that the client + /// shall use when publishing values. + /// + /// Implementations should indicate an error if the user tries to publish an incompatible + /// type to that already set for the topic. + #[serde(rename = "type")] + data_type: String, // TODO: Make real type + + /// Initial topic properties. + /// + /// If the topic is newly created (e.g. there are no other publishers) this sets the topic + /// properties. If the topic was previously published, this is ignored. The + /// [TextMessage::Announce] message contains the actual topic properties. Clients can use + /// the [TextMessage::SetProperties] message to change properties after topic creation. + properties: Properties, + }, + + /// Sent from a client to the server to indicate the client wants to stop publishing values for + /// the given topic and publisher. The client should stop publishing data value updates via + /// binary MessagePack messages for this publisher prior to sending this message. + /// + /// When there are no remaining publishers for a non-persistent topic, the server shall delete + /// the topic and send a Topic Removed Message ([TextMessage::Unannounce]) to all clients who + /// have been sent a previous Topic Announcement Message ([TextMessage::Announce]) for the + /// topic. + #[serde(rename = "unpublish")] + Unpublish { + /// The same unique identifier passed to the [TextMessage::Publish] message + pubuid: u32, + }, + + /// Sent from a client to the server to change properties (see [Properties]) for a given topic. + /// The server will send a corresponding Properties Update Message ([TextMessage::Properties]) + /// to all subscribers to the topic (if the topic is published). This message shall be ignored + /// by the server if the topic is not published. + #[serde(rename = "setproperties")] + SetProperties { name: String, update: Properties }, + + /// Sent from a client to the server to indicate the client wants to subscribe to value changes + /// for the specified topics / groups of topics. The server shall send MessagePack messages + /// containing the current values for any existing cached topics upon receipt, and continue + /// sending MessagePack messages for future value changes. If a topic does not yet exist, no + /// message is sent until it is created (via a publish), at which point a Topic Announcement + /// Message ([TextMessage::Announce]) will be sent and MessagePack messages will automatically + /// follow as they are published. + /// + /// Subscriptions may overlap; only one MessagePack message is sent per value change regardless + /// of the number of subscriptions. Sending a subscribe message with the same subscription UID + /// as a previous subscribe message results in updating the subscription (replacing the array + /// of identifiers and updating any specified options). + #[serde(rename = "subscribe")] + Subscribe { + /// One or more topic names or prefixes (if the prefix option is true) to start receiving + /// messages for. + topics: Vec, + + /// A client-generated unique identifier for this subscription. Use the same UID later to + /// unsubscribe. + subuid: u32, + + /// [SubscriptionOptions] + options: SubscriptionOptions, + }, + + /// Sent from a client to the server to indicate the client wants to stop subscribing to + /// messages for the given subscription. + #[serde(rename = "unsubscribe")] + Unsubscribe { + /// The same unique identifier passed to the [TextMessage::Subscribe] message + subuid: u32, + }, + + /// The server shall send this message for each of the following conditions: + /// - To all clients subscribed to a matching prefix when a topic is created + /// - To a client in response to an Publish Request Message ([TextMessage::Publish]) from that client + #[serde(rename = "announce")] + Announce { + name: String, + + /// The identifier that the server will use in MessagePack messages for this topic + id: u32, + + /// The data type for the topic (as a string) + #[serde(rename = "type")] + data_type: String, + + /// If this message was sent in response to a [TextMessage::Publish] message, the Publisher UID provided + /// in that message. Otherwise absent. + pubuid: Option, + + /// Topic [Properties] + properties: Properties, + }, + + /// The server shall send this message when a previously announced (via a Topic Announcement + /// Message ([TextMessage::Announce])) topic is deleted. + #[serde(rename = "unannounce")] + Unannounce { + name: String, + + /// The identifier that the server was using for value updates + id: u32, + }, + + /// The server shall send this message when a previously announced (via a Topic Announcement + /// Message ([TextMessage::Announce])) topic has its properties changed (via Set Properties Message + /// ([TextMessage::SetProperties])). + #[serde(rename = "properties")] + Properties { + name: String, + + /// True if this message is in response to a [TextMessage::SetProperties] message from the + /// same client. Otherwise absent. + ack: bool, + + /// The client shall handle the update value as follows. If a property is not included in + /// the update map, its value is not changed. If a property is provided in the update map + /// with a value of null, the property is deleted. + update: Properties, + }, +} + +#[derive(PartialEq, Clone, Debug)] +pub enum MissingOrNull { + Missing, + Null, + Value(T), +} + +impl Copy for MissingOrNull {} + +impl From> for MissingOrNull { + fn from(value: Option) -> Self { + match value { + Some(val) => MissingOrNull::Value(val), + None => MissingOrNull::Null, + } + } +} + +impl From> for Option { + fn from(value: MissingOrNull) -> Option { + match value { + MissingOrNull::Missing | MissingOrNull::Null => None, + MissingOrNull::Value(val) => Some(val), + } + } +} + +impl Default for MissingOrNull { + fn default() -> Self { + Self::Missing + } +} + +impl MissingOrNull { + pub fn update(&mut self, other: Self) { + if matches!(other, MissingOrNull::Missing) { + return; + } + + *self = other; + } +} + +/// A single binary message that could be sent in a binary websocket frame +#[derive(Debug)] +pub struct BinaryMessage { + pub id: i64, + pub timestamp: u64, + pub data: BinaryData, +} + +impl BinaryMessage { + /// Decode one entire message + pub fn from_reader(reader: &mut R) -> Result { + let len = decode::read_array_len(reader)?; + + if len != 4 { + Err(BinaryMessageError::MessageLen(len)) + } else { + Ok(Self { + id: decode::read_int(reader)?, + timestamp: decode::read_int(reader)?, + data: BinaryData::from_reader(reader)?, + }) + } + } + + /// Enocde this message onto a writer + pub fn to_writer(&self, writer: &mut W) -> Result<(), BinaryMessageError> { + encode::write_array_len(writer, 4)?; + encode::write_sint(writer, self.id)?; + encode::write_uint(writer, self.timestamp)?; + self.data.to_writer(writer)?; + Ok(()) + } +} + +/// All defined types that could be sent in binary frames +#[derive(Debug, Clone)] +pub enum BinaryData { + Boolean(bool), + Double(f64), + Int(i64), + Float(f32), + Str(String), + Bin(Vec), + BoolArray(Vec), + DoubleArray(Vec), + IntArray(Vec), + FloatArray(Vec), + StringArray(Vec), +} + +#[derive(Debug, Error)] +pub enum BinaryMessageError { + #[error("Could not parse number: {0}")] + IntError(#[from] NumValueReadError), + #[error("Could not read value: {0}")] + ValueReadError(#[from] ValueReadError), + #[error("Could not write value: {0}")] + ValueWriteError(#[from] ValueWriteError), + #[error("Unknown data type: {0}")] + UnknownDataType(u8), + #[error("Could not parse utf8 while parsing a string: {0}")] + InvalidUTF8(#[from] FromUtf8Error), + #[error("Encountered an error when reading more data: {0}")] + IoError(#[from] std::io::Error), + #[error("Incorrect binary message length, expected 4, found {0}")] + MessageLen(u32), +} + +impl BinaryData { + /// Decode a single chunk of binary data from a reader + pub fn from_reader(reader: &mut R) -> Result { + let data_type: u8 = decode::read_int(reader)?; + + let data = match data_type { + 0 => BinaryData::Boolean(decode::read_bool(reader)?), + 1 => BinaryData::Double(decode::read_f64(reader)?), + 2 => BinaryData::Int(decode::read_int(reader)?), + 3 => BinaryData::Float(decode::read_f32(reader)?), + 4 => { + let len = decode::read_str_len(reader)?; + let mut data = vec![0; len as usize]; + reader.read_exact(&mut data)?; + + BinaryData::Str(String::from_utf8(data)?) + } + 5 => { + let len = decode::read_bin_len(reader)?; + let mut data = vec![0; len as usize]; + reader.read_exact(&mut data)?; + + BinaryData::Bin(data) + } + 16 => { + let len = decode::read_array_len(reader)?; + + BinaryData::BoolArray( + (0..len) + .map(|_| decode::read_bool(reader)) + .collect::>()?, + ) + } + 17 => { + let len = decode::read_array_len(reader)?; + + BinaryData::DoubleArray( + (0..len) + .map(|_| decode::read_f64(reader)) + .collect::>()?, + ) + } + 18 => { + let len = decode::read_array_len(reader)?; + + BinaryData::IntArray( + (0..len) + .map(|_| decode::read_int(reader)) + .collect::>()?, + ) + } + 19 => { + let len = decode::read_array_len(reader)?; + + BinaryData::FloatArray( + (0..len) + .map(|_| decode::read_f32(reader)) + .collect::>()?, + ) + } + 20 => { + let len = decode::read_array_len(reader)?; + + BinaryData::StringArray( + (0..len) + .map(|_| -> Result { + let len = decode::read_str_len(reader)?; + let mut data = vec![0; len as usize]; + reader.read_exact(&mut data)?; + + Ok(String::from_utf8(data)?) + }) + .collect::>()?, + ) + } + n => return Err(BinaryMessageError::UnknownDataType(n)), + }; + + Ok(data) + } + + /// Encode this binary payload to the wire + pub fn to_writer(&self, writer: &mut W) -> Result<(), BinaryMessageError> { + match self { + BinaryData::Boolean(val) => { + encode::write_uint(writer, 0)?; + encode::write_bool(writer, *val)?; + } + BinaryData::Double(val) => { + encode::write_uint(writer, 1)?; + encode::write_f64(writer, *val)?; + } + BinaryData::Int(val) => { + encode::write_uint(writer, 2)?; + encode::write_sint(writer, *val)?; + } + BinaryData::Float(val) => { + encode::write_uint(writer, 3)?; + encode::write_f32(writer, *val)?; + } + BinaryData::Str(val) => { + encode::write_uint(writer, 4)?; + encode::write_str(writer, &val)?; + } + BinaryData::Bin(val) => { + encode::write_uint(writer, 5)?; + encode::write_bin(writer, &val)?; + } + BinaryData::BoolArray(val) => { + encode::write_uint(writer, 16)?; + encode::write_array_len(writer, val.len() as u32)?; + for val in val { + encode::write_bool(writer, *val)?; + } + } + BinaryData::DoubleArray(val) => { + encode::write_uint(writer, 17)?; + encode::write_array_len(writer, val.len() as u32)?; + for val in val { + encode::write_f64(writer, *val)?; + } + } + BinaryData::IntArray(val) => { + encode::write_uint(writer, 18)?; + encode::write_array_len(writer, val.len() as u32)?; + for val in val { + encode::write_sint(writer, *val)?; + } + } + BinaryData::FloatArray(val) => { + encode::write_uint(writer, 19)?; + encode::write_array_len(writer, val.len() as u32)?; + for val in val { + encode::write_f32(writer, *val)?; + } + } + BinaryData::StringArray(val) => { + encode::write_uint(writer, 20)?; + encode::write_array_len(writer, val.len() as u32)?; + for val in val { + encode::write_str(writer, &val)?; + } + } + }; + + Ok(()) + } +} diff --git a/nt/src/types/payload.rs b/nt/src/types/payload.rs new file mode 100644 index 0000000000..89377b8a91 --- /dev/null +++ b/nt/src/types/payload.rs @@ -0,0 +1,318 @@ +use core::panic; + +use serde::{de::DeserializeOwned, Serialize}; + +use crate::types::BinaryData; + +/// Any type that can be sent directly over network tables +pub trait Payload: Sized { + fn name() -> Option<&'static str>; + fn parse(data: BinaryData) -> Result; + fn to_val(self) -> BinaryData; +} + +impl Payload for bool { + fn name() -> Option<&'static str> { + Some("boolean") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Boolean(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Boolean(self) + } +} + +impl Payload for f64 { + fn name() -> Option<&'static str> { + Some("double") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Double(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Double(self) + } +} + +impl Payload for f32 { + fn name() -> Option<&'static str> { + Some("float") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Float(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Float(self) + } +} + +macro_rules! payload_num { + ($value:ident) => { + impl Payload for $value { + fn name() -> Option<&'static str> { + Some("int") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Int(val) => Ok(val as $value), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Int(self as i64) + } + } + }; +} + +payload_num!(i128); +payload_num!(i64); +payload_num!(i32); +payload_num!(i16); +payload_num!(i8); +payload_num!(u128); +payload_num!(u64); +payload_num!(u32); +payload_num!(u16); +payload_num!(u8); + +impl Payload for String { + fn name() -> Option<&'static str> { + Some("string") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Str(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Str(self) + } +} + +pub struct Json(D); + +impl Payload for Json { + fn name() -> Option<&'static str> { + Some("string") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Str(val) => Ok(Json(serde_json::from_str(&val).map_err(|_| ())?)), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Str(serde_json::to_string(&self.0).unwrap()) + } +} + +impl Payload for Vec { + fn name() -> Option<&'static str> { + Some("raw") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Bin(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Bin(self) + } +} + +pub struct MsgPack(Vec); + +impl Payload for MsgPack { + fn name() -> Option<&'static str> { + Some("msgpack") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Bin(val) => Ok(MsgPack(val)), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Bin(self.0) + } +} + +pub struct Rpc(Vec); + +impl Payload for Rpc { + fn name() -> Option<&'static str> { + Some("rpc") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Bin(val) => Ok(Rpc(val)), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Bin(self.0) + } +} + +pub struct ProtoBuf(Vec); + +impl Payload for ProtoBuf { + fn name() -> Option<&'static str> { + Some("protobuf") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::Bin(val) => Ok(ProtoBuf(val)), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::Bin(self.0) + } +} + +impl Payload for Vec { + fn name() -> Option<&'static str> { + Some("boolean[]") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::BoolArray(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::BoolArray(self) + } +} + +impl Payload for Vec { + fn name() -> Option<&'static str> { + Some("double[]") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::DoubleArray(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::DoubleArray(self) + } +} + +impl Payload for Vec { + fn name() -> Option<&'static str> { + Some("float[]") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::FloatArray(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::FloatArray(self) + } +} + +impl Payload for Vec { + fn name() -> Option<&'static str> { + Some("int[]") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::IntArray(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::IntArray(self) + } +} + +impl Payload for Vec { + fn name() -> Option<&'static str> { + Some("string[]") + } + + fn parse(data: BinaryData) -> Result { + match data { + BinaryData::StringArray(val) => Ok(val), + _ => Err(()), + } + } + + fn to_val(self) -> BinaryData { + BinaryData::StringArray(self) + } +} + +impl Payload for BinaryData { + fn name() -> Option<&'static str> { + None + } + + fn parse(data: BinaryData) -> Result { + Ok(data) + } + + fn to_val(self) -> BinaryData { + self + } +} + +impl Payload for () { + fn name() -> Option<&'static str> { + None + } + + fn parse(_data: BinaryData) -> Result { + Ok(()) + } + + fn to_val(self) -> BinaryData { + panic!("Unit tuple can only be used for subscribers"); + } +} diff --git a/robotrs/Cargo.toml b/robotrs/Cargo.toml index 4450da26d2..63f3b2af9f 100644 --- a/robotrs/Cargo.toml +++ b/robotrs/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] futures = "0.3.28" hal-sys = { path = "../hal-sys" } -nt = { path = "../nt" } +nt-rs = { path = "../nt" } linkme = "0.3.15" thiserror = "1.0.47" tracing = "0.1.37"