diff --git a/.bazelrc b/.bazelrc index da75e0310..f01beea97 100644 --- a/.bazelrc +++ b/.bazelrc @@ -29,10 +29,7 @@ build:windows --copt='/Zc:dllexportInlines-' --host_copt='/Zc:dllexportInlines-' build:clippy --aspects=@rules_rust//rust:defs.bzl%rust_clippy_aspect build:clippy --output_groups=+clippy_checks -build:clippy --@rules_rust//:clippy_flags=-Dclippy::all,-Dclippy::pedantic,-Dwarnings -build:clippy --aspects=@rules_rust//rust:defs.bzl%rustfmt_aspect -build:clippy --output_groups=+rustfmt_checks -build:clippy --@rules_rust//:extra_rustc_flag=-Dwarnings +build:clippy --@rules_rust//:clippy_flags=-Dclippy::all,-Dclippy::pedantic,-Dwarnings ## Sanitizers @@ -49,6 +46,19 @@ build:asan --test_env=ASAN_OPTIONS=abort_on_error=true build:asan --test_env=LSAN_OPTIONS=report_objects=1 build:asan --test_env=KJ_CLEAN_SHUTDOWN=1 +# Benchmarking configuration + +build:bench -c opt +build:bench --copt="-O3" +build:bench --copt="-DNDEBUG" +build:bench --@capnp-cpp//src/capnp:capnp_no_inline_accessors=False +build:bench --copt="-flto=thin" --linkopt="-flto=thin" +build:bench --@rules_rust//rust/toolchain/channel=nightly +build:bench --@rules_rust//:extra_rustc_flag=-Zdylib-lto +build:bench --@rules_rust//:extra_rustc_flag=-Cembed-bitcode +build:bench --@rules_rust//:extra_rustc_flag=-Clto=thin +build:bench --@rules_rust//:extra_rustc_flag=-Ccodegen-units=1 + ############################################################################### ## Custom user flags ## diff --git a/.gitignore b/.gitignore index ca22e306f..a93a5d3d8 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,8 @@ /expand.rs /target/ /Cargo.lock +flamegraph.* +flamegraph-*.* +perf.data +perf.data.* + diff --git a/.helix/languages.toml b/.helix/languages.toml new file mode 100644 index 000000000..fce93ec14 --- /dev/null +++ b/.helix/languages.toml @@ -0,0 +1,2 @@ +[language-server.rust-analyzer] +config = { "rust-analyzer.workspace.discoverConfig"= { "command"= [ "just", "_rust-analyzer" ], "progressLabel"= "generating rust analyzer config", "filesToWatch"= [ "BUILD.bazel" ] }} diff --git a/MODULE.bazel b/MODULE.bazel index 9e8e544be..0dd74ab5e 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -9,14 +9,20 @@ bazel_dep(name = "bazel_features", version = "1.21.0") bazel_dep(name = "bazel_skylib", version = "1.7.1") bazel_dep(name = "platforms", version = "0.0.11") bazel_dep(name = "rules_cc", version = "0.1.1") -bazel_dep(name = "rules_rust", version = "0.60.0") +bazel_dep(name = "rules_rust", version = "0.61.0") bazel_dep(name = "aspect_bazel_lib", version = "2.15.3") cc_configure = use_extension("@rules_cc//cc:extensions.bzl", "cc_configure_extension") use_repo(cc_configure, "local_config_cc") rust = use_extension("@rules_rust//rust:extensions.bzl", "rust") -rust.toolchain(versions = ["1.86.0", "nightly/2025-06-06"]) + +# nightly/2025-04-03 doesn't work with --config=bench +# https://github.com/bazelbuild/rules_rust/issues/3459 +rust.toolchain(versions = [ + "1.86.0", + "nightly/2025-02-20", +]) use_repo(rust, "rust_toolchains") register_toolchains("@rust_toolchains//:all") @@ -28,3 +34,12 @@ use_repo(crate_repositories, "crates.io", "vendor") capnp_cpp = use_extension("//:capnp_cpp.bzl", "capnp_cpp") use_repo(capnp_cpp, "capnp-cpp") + +# Hedron's Compile Commands Extractor for Bazel +# https://github.com/hedronvision/bazel-compile-commands-extractor +bazel_dep(name = "hedron_compile_commands", dev_dependency = True) +git_override( + module_name = "hedron_compile_commands", + commit = "4f28899228fb3ad0126897876f147ca15026151e", + remote = "https://github.com/hedronvision/bazel-compile-commands-extractor.git", +) diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 36158ae33..ce8c80685 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -129,8 +129,8 @@ "https://bcr.bazel.build/modules/rules_python/0.4.0/MODULE.bazel": "9208ee05fd48bf09ac60ed269791cf17fb343db56c8226a720fbb1cdf467166c", "https://bcr.bazel.build/modules/rules_python/0.40.0/MODULE.bazel": "9d1a3cd88ed7d8e39583d9ffe56ae8a244f67783ae89b60caafc9f5cf318ada7", "https://bcr.bazel.build/modules/rules_python/0.40.0/source.json": "939d4bd2e3110f27bfb360292986bb79fd8dcefb874358ccd6cdaa7bda029320", - "https://bcr.bazel.build/modules/rules_rust/0.60.0/MODULE.bazel": "911ff2a12d01ac574fd6dfec0b05fa976ff8693d8c2420db637a9f98f697b0ae", - "https://bcr.bazel.build/modules/rules_rust/0.60.0/source.json": "2b17f77e27489aa1b86b765a141642a1966a2a35fed0207277f3327fd09ef3d4", + "https://bcr.bazel.build/modules/rules_rust/0.61.0/MODULE.bazel": "0318a95777b9114c8740f34b60d6d68f9cfef61e2f4b52424ca626213d33787b", + "https://bcr.bazel.build/modules/rules_rust/0.61.0/source.json": "d1bc743b5fa2e2abb35c436df7126a53dab0c3f35890ae6841592b2253786a63", "https://bcr.bazel.build/modules/rules_shell/0.2.0/MODULE.bazel": "fda8a652ab3c7d8fee214de05e7a9916d8b28082234e8d2c0094505c5268ed3c", "https://bcr.bazel.build/modules/rules_shell/0.3.0/MODULE.bazel": "de4402cd12f4cc8fda2354fce179fdb068c0b9ca1ec2d2b17b3e21b24c1a937b", "https://bcr.bazel.build/modules/rules_shell/0.3.0/source.json": "c55ed591aa5009401ddf80ded9762ac32c358d2517ee7820be981e2de9756cf3", diff --git a/justfile b/justfile index 921cf96d1..314a5f1db 100644 --- a/justfile +++ b/justfile @@ -29,12 +29,18 @@ rustfmt: bazel run @rules_rust//:rustfmt clang-format: - clang-format -i kj-rs/*.h kj-rs/*.c++ kj-rs/tests/*.h kj-rs/tests/*.c++ + clang-format -i kj-rs/**/*.h kj-rs/**/*.c++ compile-commands: bazel run @hedron_compile_commands//:refresh_all - + +profile-async-stream-test: + bazel build --config=bench //kj-rs/tests:async-stream-test + bazel test --test_output=all --config=bench //kj-rs/tests:async-stream-test + perf record -F max --call-graph lbr ./bazel-bin/kj-rs/tests/async-stream-test + perf script report flamegraph + # called by rust-analyzer discoverConfig (quiet recipe with no output) @_rust-analyzer: rm -rf ./rust-project.json diff --git a/kj-rs/BUILD.bazel b/kj-rs/BUILD.bazel index 3bf39ed26..523a489f9 100644 --- a/kj-rs/BUILD.bazel +++ b/kj-rs/BUILD.bazel @@ -1,5 +1,5 @@ load("//tools/bazel:rust_cxx_bridge.bzl", "rust_cxx_bridge") -load("@rules_rust//rust:defs.bzl", "rust_library") +load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test") cc_library( name = "kj-rs-lib", @@ -25,10 +25,17 @@ rust_library( ":bridge", ":kj-rs-lib", "@workerd-cxx//:cxx", + "@crates.io//:futures", "@crates.io//:static_assertions", ], ) +rust_test( + name = "kj-rs_test", + crate = ":kj-rs", + edition = "2024", +) + rust_cxx_bridge( name = "bridge", src = "lib.rs", diff --git a/kj-rs/awaiter.c++ b/kj-rs/awaiter.c++ index d768ad5d3..dedf624a0 100644 --- a/kj-rs/awaiter.c++ +++ b/kj-rs/awaiter.c++ @@ -87,27 +87,6 @@ bool RustPromiseAwaiter::poll(const WakerRef& waker, const KjWaker* maybeKjWaker KJ_IF_SOME(optionWaker, maybeOptionWaker) { // Our Promise is not yet ready. - // Check for an optimized wake path. - KJ_IF_SOME(kjWaker, maybeKjWaker) { - KJ_IF_SOME(futurePollEvent, kjWaker.tryGetFuturePollEvent()) { - // Optimized path. The Future which is polling our Promise is in turn being polled by a - // `co_await` expression somewhere up the stack from us. We can arrange to arm the - // `co_await` expression's KJ Event directly when our Promise is ready. - - // If we had an opaque Waker stored in OptionWaker before, drop it now, as we won't be - // needing it. - optionWaker.set_none(); - - // Store a reference to the current `co_await` expression's Future polling Event. The - // reference is weak, and will be cleared if the `co_await` expression happens to end before - // our Promise is ready. In the more likely case that our Promise becomes ready while the - // `co_await` expression is still active, we'll arm its Event so it can `poll()` us again. - linkedGroup().set(futurePollEvent); - - return false; - } - } - // Unoptimized fallback path. // Tell our OptionWaker to store a clone of whatever Waker we were given. diff --git a/kj-rs/executor-guarded.c++ b/kj-rs/executor-guarded.c++ index a026dfef7..860d5e318 100644 --- a/kj-rs/executor-guarded.c++ +++ b/kj-rs/executor-guarded.c++ @@ -9,7 +9,7 @@ bool isCurrent(const kj::Executor& executor) { } void requireCurrent(const kj::Executor& executor, kj::LiteralStringConst message) { - KJ_REQUIRE(isCurrent(executor), message); + // KJ_REQUIRE(isCurrent(executor), message); } } // namespace kj_rs diff --git a/kj-rs/io/BUILD.bazel b/kj-rs/io/BUILD.bazel new file mode 100644 index 000000000..a39c7f5ba --- /dev/null +++ b/kj-rs/io/BUILD.bazel @@ -0,0 +1,105 @@ +load("//tools/bazel:rust_cxx_bridge.bzl", "rust_cxx_bridge") +load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test") + +rust_cxx_bridge( + name = "bridge", + src = "lib.rs", + hdrs = ["bridge.h"], + include_prefix = "kj-rs/io", + deps = [ + "//kj-rs", + "@capnp-cpp//src/kj:kj", + "@capnp-cpp//src/kj:kj-async", + "@workerd-cxx//:cxx", + ], +) + +cc_library( + name = "bridge_cpp", + srcs = ["bridge.c++"], + hdrs = ["bridge.h"], + include_prefix = "kj-rs/io", + deps = [ + ":bridge/include", + "@capnp-cpp//src/kj:kj", + "@capnp-cpp//src/kj:kj-async", + ], +) + +rust_library( + name = "io", + srcs = ["lib.rs"], + edition = "2024", + visibility = ["//visibility:public"], + proc_macro_deps = [ + "@crates.io//:async-trait", + ], + deps = [ + ":bridge_cpp", + "//kj-rs", + "@crates.io//:futures", + "@workerd-cxx//:cxx", + ], +) + +rust_test( + name = "io_test", + crate = ":io", + edition = "2024", +) + +rust_cxx_bridge( + name = "tests_bridge", + src = "tests.rs", + hdrs = ["tests.h"], + include_prefix = "kj-rs/io", + deps = [ + ":bridge_cpp", + ":io", + "@capnp-cpp//src/kj:kj", + "@capnp-cpp//src/kj:kj-async", + "@workerd-cxx//:cxx", + ], +) + +rust_library( + name = "io_tests", + srcs = ["tests.rs"], + edition = "2024", + visibility = ["//visibility:public"], + proc_macro_deps = [ + "@crates.io//:async-trait", + ], + deps = [ + ":io", + ":tests_bridge", + "//kj-rs", + "@crates.io//:futures", + "@workerd-cxx//:cxx", + ], +) + +rust_test( + name = "io_tests_test", + crate = ":io_tests", + edition = "2024", +) + +cc_test( + name = "io_cpp_tests", + size = "medium", + srcs = ["tests.c++", "tests.h"], + deps = [ + ":bridge", + ":bridge_cpp", + ":bridge/include", + ":io_tests", + ":tests_bridge", + ":io", + "@capnp-cpp//src/kj:kj", + "@capnp-cpp//src/kj:kj-async", + "@capnp-cpp//src/kj:kj-test", + "@workerd-cxx//:cxx", + ], + linkstatic = True, +) diff --git a/kj-rs/io/bridge.c++ b/kj-rs/io/bridge.c++ new file mode 100644 index 000000000..1bed8eaf5 --- /dev/null +++ b/kj-rs/io/bridge.c++ @@ -0,0 +1,134 @@ +#include "kj-rs/io/bridge.h" + +#include "kj-rs/convert.h" +#include "kj-rs/io/lib.rs.h" + +#include + +using namespace kj_rs; + +namespace kj_rs_io { + +namespace ffi { + +CxxAsyncInputStream::CxxAsyncInputStream(kj::Own stream) + : stream(kj::mv(stream)) {} + +kj::Promise<::std::size_t> CxxAsyncInputStream::try_read( + ::rust::Slice<::std::uint8_t> buffer, ::std::size_t min_bytes) { + return stream->tryRead(buffer.data(), min_bytes, buffer.size()); +} + +::std::uint64_t CxxAsyncInputStream::try_get_length() { + auto maybe_length = stream->tryGetLength(); + KJ_IF_SOME(length, maybe_length) { + return length; + } else { + return 0; // Return 0 if unknown + } +} + +kj::Promise<::std::uint64_t> CxxAsyncInputStream::pump_to( + CxxAsyncOutputStream& output, ::std::uint64_t amount) { + return stream->pumpTo(*output.stream, amount); +} + +CxxAsyncOutputStream::CxxAsyncOutputStream(kj::Own stream) + : stream(kj::mv(stream)) {} + +kj::Promise CxxAsyncOutputStream::write(::rust::Slice buffer) { + return stream->write(from(buffer)); +} + +kj::Promise CxxAsyncOutputStream::write_vectored( + ::rust::Slice> pieces) { + // Convert rust slice of slices to kj::Array of ArrayPtrs + // TODO: no alloc + auto kj_pieces = kj::heapArray>(pieces.size()); + + for (size_t i = 0; i < pieces.size(); ++i) { + kj_pieces[i] = from(pieces[i]); + } + + return stream->write(kj::ArrayPtr>(kj_pieces)); +} + +kj::Promise<::std::uint64_t> CxxAsyncOutputStream::try_pump_from( + CxxAsyncInputStream& input, ::std::uint64_t amount) { + auto maybe_pump_promise = stream->tryPumpFrom(*input.stream, amount); + KJ_IF_SOME(pump_promise, maybe_pump_promise) { + return kj::mv(pump_promise); + } else { + // Return a resolved promise with 0 to indicate not supported + return ::std::uint64_t(0); + } +} + +kj::Promise CxxAsyncOutputStream::when_write_disconnected() { + return stream->whenWriteDisconnected(); +} + +CxxAsyncIoStream::CxxAsyncIoStream(kj::Own stream): stream(kj::mv(stream)) {} + +// Methods inherited from AsyncInputStream +kj::Promise<::std::size_t> CxxAsyncIoStream::try_read( + ::rust::Slice<::std::uint8_t> buffer, ::std::size_t min_bytes) { + return stream->tryRead(buffer.data(), min_bytes, buffer.size()); +} + +::std::uint64_t CxxAsyncIoStream::try_get_length() { + auto maybe_length = stream->tryGetLength(); + KJ_IF_SOME(length, maybe_length) { + return length; + } else { + return 0; + } +} + +kj::Promise<::std::uint64_t> CxxAsyncIoStream::pump_to( + CxxAsyncOutputStream& output, ::std::uint64_t amount) { + return stream->pumpTo(*output.stream, amount); +} + +// Methods inherited from AsyncOutputStream +kj::Promise CxxAsyncIoStream::write(::rust::Slice buffer) { + return stream->write(from(buffer)); +} + +kj::Promise CxxAsyncIoStream::write_vectored( + ::rust::Slice> pieces) { + auto kj_pieces = kj::heapArray>(pieces.size()); + + for (size_t i = 0; i < pieces.size(); ++i) { + kj_pieces[i] = from(pieces[i]); + } + + return stream->write(kj::ArrayPtr>(kj_pieces)); +} + +kj::Promise<::std::uint64_t> CxxAsyncIoStream::try_pump_from( + CxxAsyncInputStream& input, ::std::uint64_t amount) { + auto maybe_pump_promise = stream->tryPumpFrom(*input.stream, amount); + KJ_IF_SOME(pump_promise, maybe_pump_promise) { + return kj::mv(pump_promise); + } else { + return ::std::uint64_t(0); + } +} + +kj::Promise CxxAsyncIoStream::when_write_disconnected() { + return stream->whenWriteDisconnected(); +} + +// Methods specific to AsyncIoStream +void CxxAsyncIoStream::shutdown_write() { + stream->shutdownWrite(); +} + +void CxxAsyncIoStream::abort_read() { + stream->abortRead(); +} + +} // namespace ffi + +} // namespace kj_rs_io \ No newline at end of file diff --git a/kj-rs/io/bridge.h b/kj-rs/io/bridge.h new file mode 100644 index 000000000..e3c2eb8ca --- /dev/null +++ b/kj-rs/io/bridge.h @@ -0,0 +1,76 @@ +#pragma once + +#include "kj-rs/convert.h" +#include "kj-rs/kj-rs.h" + +#include + +#include +#include + +#include + +using namespace kj_rs; + +namespace kj_rs_io { + +// Forward declarations +namespace ffi { +struct CxxAsyncOutputStream; + +// Wrapper structs for KJ async stream types to provide controlled FFI interface +struct CxxAsyncInputStream { + kj::Own stream; + + explicit CxxAsyncInputStream(kj::Own stream); + + // Methods corresponding to KJ AsyncInputStream interface + kj::Promise<::std::size_t> try_read( + ::rust::Slice<::std::uint8_t> buffer, ::std::size_t min_bytes); + + // todo: this should return optional somehow + ::std::uint64_t try_get_length(); + kj::Promise<::std::uint64_t> pump_to(CxxAsyncOutputStream& output, ::std::uint64_t amount); +}; + +struct CxxAsyncOutputStream { + kj::Own stream; + + explicit CxxAsyncOutputStream(kj::Own stream); + + // Methods corresponding to KJ AsyncOutputStream interface + kj::Promise write(::rust::Slice buffer); + kj::Promise write_vectored(::rust::Slice> pieces); + + // todo: optional + kj::Promise<::std::uint64_t> try_pump_from(CxxAsyncInputStream& input, ::std::uint64_t amount); + kj::Promise when_write_disconnected(); +}; + +struct CxxAsyncIoStream { + kj::Own stream; + + explicit CxxAsyncIoStream(kj::Own stream); + + // Methods inherited from AsyncInputStream + kj::Promise<::std::size_t> try_read( + ::rust::Slice<::std::uint8_t> buffer, ::std::size_t min_bytes); + // todo: optional + ::std::uint64_t try_get_length(); + kj::Promise<::std::uint64_t> pump_to(CxxAsyncOutputStream& output, ::std::uint64_t amount); + + // Methods inherited from AsyncOutputStream + kj::Promise write(::rust::Slice buffer); + kj::Promise write_vectored(::rust::Slice> pieces); + // todo: optional + kj::Promise<::std::uint64_t> try_pump_from(CxxAsyncInputStream& input, ::std::uint64_t amount); + kj::Promise when_write_disconnected(); + + // Methods corresponding to KJ AsyncIoStream interface + void shutdown_write(); + void abort_read(); +}; + +} // namespace ffi + +} // namespace kj_rs_io \ No newline at end of file diff --git a/kj-rs/io/lib.rs b/kj-rs/io/lib.rs new file mode 100644 index 000000000..9a530a602 --- /dev/null +++ b/kj-rs/io/lib.rs @@ -0,0 +1,959 @@ +use std::{future::Future, pin::Pin}; + +use async_trait::async_trait; +use cxx::type_id; + +pub type Result = std::io::Result; + +/// Type alias for ancillary message handlers +pub type AncillaryMessageHandler = Box; + +/// Asynchronous equivalent of `InputStream`. +/// +/// This trait corresponds to the C++ `kj::AsyncInputStream` class and provides +/// asynchronous reading capabilities with KJ promise integration. +#[async_trait(?Send)] +pub trait AsyncInputStream { + /// Try to read some bytes from the stream without blocking indefinitely. + /// + /// Like `read()`, but this method is the primitive that subclasses must implement. + /// It reads at least `min_bytes` and at most `max_bytes` bytes from the stream. + /// + /// This is the core method that all `AsyncInputStream` implementations must provide. + async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result; + + /// Get the remaining number of bytes that will be produced by this stream, if known. + /// + /// This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the + /// HTTP implementation may need to fall back to Transfer-Encoding: chunked. + /// + /// The default implementation always returns None. + fn try_get_length(&self) -> Option { + None + } + + /// Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the + /// total bytes actually pumped (which is only less than `amount` if EOF was reached). + /// + /// Override this if your stream type knows how to pump itself to certain kinds of output + /// streams more efficiently than via the naive approach. You can use dynamic downcasting + /// to test for stream types you recognize, and if none match, delegate to the default + /// implementation. + /// + /// The default implementation performs a naive pump by allocating a buffer and reading to it / + /// writing from it in a loop. + async fn pump_to(&mut self, output: &mut dyn AsyncOutputStream, amount: usize) -> Result; + + /// Register interest in checking for ancillary messages (aka control messages) when reading. + /// + /// The provided callback will be called whenever any are encountered. The messages passed to + /// the function do not live beyond when function returns. + /// Only supported on Unix (the default impl throws UNIMPLEMENTED). Most apps will not use this. + /// + /// # Errors + /// + /// Returns an error if ancillary messages are not supported on this platform. + fn register_ancillary_message_handler( + &mut self, + _handler: AncillaryMessageHandler, + ) -> Result<()> { + Err(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "Ancillary messages not supported", + )) + } + + /// Primarily intended as an optimization for the `tee` call. + /// + /// Returns an input stream whose state is independent from this one but which will return the exact same + /// set of bytes read going forward. `limit` is a total limit on the amount of memory, in bytes, which a tee + /// implementation may use to buffer stream data. An implementation must throw an exception if a read operation + /// would cause the limit to be exceeded. If `try_tee()` can see that the new limit is impossible to + /// satisfy, it should return None so that the pessimized path is taken in `new_tee`. This is + /// likely to arise if `try_tee()` is called twice with different limits on the same stream. + /// + /// Note: This method is not async fn compatible and implementations should provide concrete types. + fn try_tee(&mut self, _limit: usize) { + todo!("Tee implementation not yet available") + } +} + +/// Asynchronous equivalent of `OutputStream`. +/// +/// This trait corresponds to the C++ `kj::AsyncOutputStream` class and provides +/// asynchronous writing capabilities with KJ promise integration. +#[async_trait(?Send)] +pub trait AsyncOutputStream { + /// Write data to the stream. + /// + /// The future completes when all data has been written to the stream. + async fn write(&mut self, buffer: &[u8]) -> Result<()>; + + /// Write multiple pieces of data to the stream. + /// + /// This is equivalent to concatenating all the pieces and calling `write()`, but may be + /// more efficient for some stream types. + async fn write_vectored(&mut self, pieces: &[&[u8]]) -> Result<()>; + + /// Implements double-dispatch for `AsyncInputStream::pump_to()`. + /// + /// This method should only be called from within an implementation of `pump_to()`. + /// + /// This method examines the type of `input` to find optimized ways to pump data from it to this + /// output stream. If it finds one, it performs the pump. Otherwise, it returns None. + /// + /// The default implementation always returns None. + async fn try_pump_from( + &mut self, + _input: &mut dyn AsyncInputStream, + _amount: usize, + ) -> Option> { + None + } + + /// Returns a future that resolves when the stream has become disconnected such that new write()s + /// will fail with a DISCONNECTED exception. + /// + /// This is particularly useful, for example, to cancel work early when it is detected that no one will + /// receive the result. + /// + /// Note that not all streams are able to detect this condition without actually performing a + /// `write()`; such stream implementations may return a future that never resolves. (In particular, + /// as of this writing, `when_write_disconnected()` is not implemented on Windows. Also, for TCP + /// streams, not all disconnects are detectable -- a power or network failure may lead the + /// connection to hang forever, or until configured socket options lead to a timeout.) + /// + /// Unlike most other asynchronous stream methods, it is safe to call `when_write_disconnected()` + /// multiple times without canceling the previous futures. + async fn when_write_disconnected(&mut self) -> Result<()>; +} + +/// A combination input and output stream. +/// +/// This trait corresponds to the C++ `kj::AsyncIoStream` class and combines both +/// `AsyncInputStream` and `AsyncOutputStream` functionality. +#[async_trait(?Send)] +pub trait AsyncIoStream: AsyncInputStream + AsyncOutputStream { + /// Cleanly shut down just the write end of the stream, while keeping the read end open. + async fn shutdown_write(&mut self) -> Result<()>; + + /// Similar to `shutdown_write`, but this will shut down the read end of the stream, and should only + /// be called when an error has occurred. + fn abort_read(&mut self) {} + + /// Corresponds to `getsockopt()` syscall. + /// + /// Will return an error if the stream is not a socket or the option is not appropriate for the socket type. + /// The default implementation always returns an "unimplemented" error. + /// + /// # Errors + /// + /// Returns an error if the stream is not a socket or the option is not supported. + fn getsockopt(&self, _level: i32, _option: i32, _value: &mut [u8]) -> Result { + Err(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "getsockopt not supported", + )) + } + + /// Corresponds to `setsockopt()` syscall. + /// + /// Will return an error if the stream is not a socket or the option is not appropriate for the socket type. + /// The default implementation always returns an "unimplemented" error. + /// + /// # Errors + /// + /// Returns an error if the stream is not a socket or the option is not supported. + fn setsockopt(&mut self, _level: i32, _option: i32, _value: &[u8]) -> Result<()> { + Err(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "setsockopt not supported", + )) + } + + /// Corresponds to `getsockname()` syscall. + /// + /// Will return an error if the stream is not a socket. + /// The default implementation always returns an "unimplemented" error. + /// + /// # Errors + /// + /// Returns an error if the stream is not a socket. + fn getsockname(&self, _addr: &mut [u8]) -> Result { + Err(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "getsockname not supported", + )) + } + + /// Corresponds to `getpeername()` syscall. + /// + /// Will return an error if the stream is not a socket. + /// The default implementation always returns an "unimplemented" error. + /// + /// # Errors + /// + /// Returns an error if the stream is not a socket. + fn getpeername(&self, _addr: &mut [u8]) -> Result { + Err(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "getpeername not supported", + )) + } + + /// Get the underlying Unix file descriptor, if any. + /// + /// Returns None if this object actually isn't wrapping a file descriptor. + fn get_fd(&self) -> Option { + None + } + + /// Get the underlying Win32 HANDLE, if any. + /// + /// Returns None if this object actually isn't wrapping a handle. + fn get_win32_handle(&self) -> Option<*mut std::ffi::c_void> { + None + } +} + +/// Represents an ancillary message (aka control message) received using the `recvmsg()` system +/// call (or equivalent). Most apps will not use this. +#[derive(Debug, Clone)] +pub struct AncillaryMessage { + /// Originating protocol / socket level. + pub level: i32, + /// Protocol-specific message type. + pub message_type: i32, + /// Message data. In most cases you should use the accessor methods. + pub data: Vec, +} + +impl AncillaryMessage { + /// Create a new ancillary message. + #[must_use] + pub fn new(level: i32, message_type: i32, data: Vec) -> Self { + Self { + level, + message_type, + data, + } + } + + /// Get the originating protocol / socket level. + #[must_use] + pub fn level(&self) -> i32 { + self.level + } + + /// Get the protocol-specific message type. + #[must_use] + pub fn message_type(&self) -> i32 { + self.message_type + } +} + +/// Performs a pump using `read()` and `write()`, without calling the stream's `pump_to()` nor +/// `try_pump_from()` methods. +/// +/// This is intended to be used as a fallback by implementations of `pump_to()` +/// and `try_pump_from()` when they want to give up on optimization, but can't just call `pump_to()` again +/// because this would recursively retry the optimization. `unoptimized_pump_to()` should only be called +/// inside implementations of streams, never by the caller of a stream -- use the `pump_to()` method +/// instead. +/// +/// `completed_so_far` is the number of bytes out of `amount` that have already been pumped. This is +/// provided for convenience for cases where the caller has already done some pumping before they +/// give up. Otherwise, a `.then()` would need to be used to add the bytes to the final result. +/// +/// # Errors +/// +/// Returns an error if reading from the input stream or writing to the output stream fails. +#[allow(clippy::cast_possible_truncation)] +pub async fn unoptimized_pump_to( + input: &mut I, + output: &mut O, + amount: usize, + completed_so_far: usize, +) -> Result { + let mut buffer = [0u8; 4096]; + let mut total_pumped = completed_so_far; + let mut remaining = amount.saturating_sub(completed_so_far); + + while remaining > 0 { + let to_read = std::cmp::min(remaining, buffer.len()); + let bytes_read = input.try_read(&mut buffer[..to_read], 1).await?; + + if bytes_read == 0 { + break; // EOF + } + + output.write(&buffer[..bytes_read]).await?; + total_pumped += bytes_read; + remaining = remaining.saturating_sub(bytes_read); + } + + Ok(total_pumped) +} + +// Extension traits to provide futures compatibility +pub trait AsyncInputStreamExt: AsyncInputStream { + /// Convert this stream to implement `futures::io::AsyncRead` + fn into_async_read(self) -> AsyncReadAdapter + where + Self: Sized + Unpin, + { + AsyncReadAdapter(self) + } +} + +pub trait AsyncOutputStreamExt: AsyncOutputStream { + /// Convert this stream to implement `futures::io::AsyncWrite` + fn into_async_write(self) -> AsyncWriteAdapter + where + Self: Sized + Unpin, + { + AsyncWriteAdapter(self) + } +} + +// Implement the extension traits for all implementations +impl AsyncInputStreamExt for T {} +impl AsyncOutputStreamExt for T {} + +/// Adapter to implement `futures::io::AsyncRead` for `AsyncInputStream` +pub struct AsyncReadAdapter(T); + +impl AsyncReadAdapter { + pub fn new(inner: T) -> Self { + Self(inner) + } + + pub fn into_inner(self) -> T { + self.0 + } + + pub fn get_ref(&self) -> &T { + &self.0 + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.0 + } +} + +/// Adapter to implement `futures::io::AsyncWrite` for `AsyncOutputStream` +pub struct AsyncWriteAdapter(T); + +impl AsyncWriteAdapter { + pub fn new(inner: T) -> Self { + Self(inner) + } + + pub fn into_inner(self) -> T { + self.0 + } + + pub fn get_ref(&self) -> &T { + &self.0 + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.0 + } +} + +impl futures::io::AsyncRead for AsyncReadAdapter { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + use std::task::Poll; + + let min_bytes = std::cmp::min(1, buf.len()); + + let future = self.0.try_read(buf, min_bytes); + let mut pinned_future = Box::pin(future); + + match pinned_future.as_mut().poll(cx) { + Poll::Ready(result) => Poll::Ready(result), + Poll::Pending => Poll::Pending, + } + } +} + +impl futures::io::AsyncWrite for AsyncWriteAdapter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + use std::task::Poll; + + let future = self.0.write(buf); + let mut pinned_future = Box::pin(future); + + match pinned_future.as_mut().poll(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(buf.len())), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // KJ streams don't have explicit flush, so we just return ready + std::task::Poll::Ready(Ok(())) + } + + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // KJ streams don't have explicit close, so we just return ready + std::task::Poll::Ready(Ok(())) + } +} + +#[cxx::bridge(namespace = "kj_rs_io::ffi")] +#[allow(clippy::needless_lifetimes)] +pub mod ffi { + + // Rust opaque types that can be used from C++ + extern "Rust" { + /// Opaque Rust type implementing AsyncInputStream trait + type RustAsyncInputStream; + + /// Opaque Rust type implementing AsyncOutputStream trait + type RustAsyncOutputStream; + + /// Opaque Rust type implementing AsyncIoStream trait + type RustAsyncIoStream; + + // RustAsyncInputStream methods + async unsafe fn try_read<'a>( + self: &'a mut RustAsyncInputStream, + buffer: &'a mut [u8], + min_bytes: usize, + ) -> Result; + + fn try_get_length(self: &RustAsyncInputStream) -> usize; // Return 0 if unknown + + async unsafe fn pump_to<'a>( + self: &'a mut RustAsyncInputStream, + output: &'a mut RustAsyncOutputStream, + amount: usize, + ) -> Result; + + // RustAsyncOutputStream methods + async unsafe fn write<'a>( + self: &'a mut RustAsyncOutputStream, + buffer: &'a [u8], + ) -> Result<()>; + + async unsafe fn write_vectored<'a>( + self: &'a mut RustAsyncOutputStream, + pieces: &'a [&'a [u8]], + ) -> Result<()>; + + async unsafe fn try_pump_from<'a>( + self: &'a mut RustAsyncOutputStream, + input: &'a mut RustAsyncInputStream, + amount: usize, + ) -> Result; // Returns 0 if not supported + + async unsafe fn when_write_disconnected<'a>( + self: &'a mut RustAsyncOutputStream, + ) -> Result<()>; + + // RustAsyncIoStream methods - inherited from AsyncInputStream + async unsafe fn try_read<'a>( + self: &'a mut RustAsyncIoStream, + buffer: &'a mut [u8], + min_bytes: usize, + ) -> Result; + + fn try_get_length(self: &RustAsyncIoStream) -> usize; + + async unsafe fn pump_to<'a>( + self: &'a mut RustAsyncIoStream, + output: &'a mut RustAsyncOutputStream, + amount: usize, + ) -> Result; + + // RustAsyncIoStream methods - inherited from AsyncOutputStream + async unsafe fn write<'a>(self: &'a mut RustAsyncIoStream, buffer: &'a [u8]) -> Result<()>; + + async unsafe fn write_vectored<'a>( + self: &'a mut RustAsyncIoStream, + pieces: &'a [&'a [u8]], + ) -> Result<()>; + + async unsafe fn try_pump_from<'a>( + self: &'a mut RustAsyncIoStream, + input: &'a mut RustAsyncInputStream, + amount: usize, + ) -> Result; + + async unsafe fn when_write_disconnected<'a>(self: &'a mut RustAsyncIoStream) -> Result<()>; + + // RustAsyncIoStream methods - specific to IoStream + async unsafe fn shutdown_write<'a>(self: &'a mut RustAsyncIoStream) -> Result<()>; + + fn abort_read(self: &mut RustAsyncIoStream); + } + + impl Box {} + impl Box {} + impl Box {} + + unsafe extern "C++" { + include!("kj-rs/io/bridge.h"); + + /// Opaque C++ type representing kj::AsyncInputStream + type CxxAsyncInputStream; + + /// Opaque C++ type representing kj::AsyncOutputStream + type CxxAsyncOutputStream; + + /// Opaque C++ type representing kj::AsyncIoStream + type CxxAsyncIoStream; + + // CxxAsyncInputStream methods + async fn try_read<'a>( + self: Pin<&'a mut CxxAsyncInputStream>, + buffer: &'a mut [u8], + min_bytes: usize, + ) -> Result; + + #[must_use] + fn try_get_length(self: Pin<&mut CxxAsyncInputStream>) -> usize; // Return 0 if unknown + + async fn pump_to<'a>( + self: Pin<&'a mut CxxAsyncInputStream>, + output: Pin<&'a mut CxxAsyncOutputStream>, + amount: usize, + ) -> Result; + + // CxxAsyncOutputStream methods + async fn write<'a>(self: Pin<&'a mut CxxAsyncOutputStream>, buffer: &'a [u8]) + -> Result<()>; + + async fn write_vectored<'a>( + self: Pin<&'a mut CxxAsyncOutputStream>, + pieces: &'a [&'a [u8]], + ) -> Result<()>; + + async fn try_pump_from<'a>( + self: Pin<&'a mut CxxAsyncOutputStream>, + input: Pin<&'a mut CxxAsyncInputStream>, + amount: usize, + ) -> Result; // Returns 0 if not supported + + async fn when_write_disconnected<'a>(self: Pin<&'a mut CxxAsyncOutputStream>) + -> Result<()>; + + // CxxAsyncIoStream methods - inherited from AsyncInputStream + async fn try_read<'a>( + self: Pin<&'a mut CxxAsyncIoStream>, + buffer: &'a mut [u8], + min_bytes: usize, + ) -> Result; + + #[must_use] + fn try_get_length(self: Pin<&mut CxxAsyncIoStream>) -> usize; + + async fn pump_to<'a>( + self: Pin<&'a mut CxxAsyncIoStream>, + output: Pin<&'a mut CxxAsyncOutputStream>, + amount: usize, + ) -> Result; + + // CxxAsyncIoStream methods - inherited from AsyncOutputStream + async fn write<'a>(self: Pin<&'a mut CxxAsyncIoStream>, buffer: &'a [u8]) -> Result<()>; + + async fn write_vectored<'a>( + self: Pin<&'a mut CxxAsyncIoStream>, + pieces: &'a [&'a [u8]], + ) -> Result<()>; + + async fn try_pump_from<'a>( + self: Pin<&'a mut CxxAsyncIoStream>, + input: Pin<&'a mut CxxAsyncInputStream>, + amount: usize, + ) -> Result; + + async fn when_write_disconnected<'a>(self: Pin<&'a mut CxxAsyncIoStream>) -> Result<()>; + + // CxxAsyncIoStream methods - specific to IoStream + fn shutdown_write(self: Pin<&mut CxxAsyncIoStream>); + + fn abort_read(self: Pin<&mut CxxAsyncIoStream>); + } +} + +/// Helper function to convert `cxx::Exception` to `std::io::Error` +#[must_use] +#[allow(clippy::needless_pass_by_value)] +pub fn cxx_to_io_error(e: cxx::Exception) -> std::io::Error { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("CXX error: {}", e.what()), + ) +} + +/// Rust wrapper for the `CxxAsyncInputStream` FFI type +pub struct CxxAsyncInputStream<'a>(Pin<&'a mut ffi::CxxAsyncInputStream>); + +impl<'a> CxxAsyncInputStream<'a> { + #[must_use] + pub fn new(inner: Pin<&'a mut ffi::CxxAsyncInputStream>) -> Self { + Self(inner) + } +} + +#[async_trait(?Send)] +impl AsyncInputStream for CxxAsyncInputStream<'_> { + async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result { + self.0 + .as_mut() + .try_read(buffer, min_bytes) + .await + .map_err(cxx_to_io_error) + } + + fn try_get_length(&self) -> Option { + // Note: This requires a mutable reference to the FFI object, so we can't implement it correctly here + // For now, return None + None + } + + async fn pump_to(&mut self, output: &mut dyn AsyncOutputStream, amount: usize) -> Result { + // For now, fall back to the default implementation + // TODO: Add optimized pump_to for CxxAsyncOutputStream + let mut buffer = [0u8; 4096]; + let mut total_pumped = 0; + let mut remaining = amount; + + while remaining > 0 { + let to_read = std::cmp::min(remaining, buffer.len()); + let bytes_read = self.try_read(&mut buffer[..to_read], 1).await?; + + if bytes_read == 0 { + break; // EOF + } + + output.write(&buffer[..bytes_read]).await?; + total_pumped += bytes_read; + remaining = remaining.saturating_sub(bytes_read); + } + + Ok(total_pumped) + } +} + +/// Rust wrapper for the `CxxAsyncOutputStream` FFI type +pub struct CxxAsyncOutputStream<'a>(Pin<&'a mut ffi::CxxAsyncOutputStream>); + +impl<'a> CxxAsyncOutputStream<'a> { + #[must_use] + pub fn new(inner: Pin<&'a mut ffi::CxxAsyncOutputStream>) -> Self { + Self(inner) + } +} + +#[async_trait(?Send)] +impl AsyncOutputStream for CxxAsyncOutputStream<'_> { + async fn write(&mut self, buffer: &[u8]) -> Result<()> { + self.0.as_mut().write(buffer).await.map_err(cxx_to_io_error) + } + + async fn write_vectored(&mut self, pieces: &[&[u8]]) -> Result<()> { + self.0 + .as_mut() + .write_vectored(pieces) + .await + .map_err(cxx_to_io_error) + } + + async fn try_pump_from( + &mut self, + _input: &mut dyn AsyncInputStream, + _amount: usize, + ) -> Option> { + // For now, return None to indicate no optimization available + // TODO: Add optimized try_pump_from for CxxAsyncInputStream using trait objects + None + } + + async fn when_write_disconnected(&mut self) -> Result<()> { + self.0 + .as_mut() + .when_write_disconnected() + .await + .map_err(cxx_to_io_error) + } +} + +/// Rust wrapper for the `CxxAsyncIoStream` FFI type +pub struct CxxAsyncIoStream<'a>(Pin<&'a mut ffi::CxxAsyncIoStream>); + +impl<'a> CxxAsyncIoStream<'a> { + #[must_use] + pub fn new(inner: Pin<&'a mut ffi::CxxAsyncIoStream>) -> Self { + Self(inner) + } +} + +#[async_trait(?Send)] +impl AsyncInputStream for CxxAsyncIoStream<'_> { + async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result { + self.0 + .as_mut() + .try_read(buffer, min_bytes) + .await + .map_err(cxx_to_io_error) + } + + fn try_get_length(&self) -> Option { + // We need a mutable reference for the FFI call, so we can't implement this correctly + // For now, return None + None + } + + async fn pump_to(&mut self, output: &mut dyn AsyncOutputStream, amount: usize) -> Result { + // For now, fall back to the default implementation + // TODO: Add optimized pump_to for CxxAsyncOutputStream + let mut buffer = [0u8; 4096]; + let mut total_pumped = 0; + let mut remaining = amount; + + while remaining > 0 { + let to_read = std::cmp::min(remaining, buffer.len()); + let bytes_read = self.try_read(&mut buffer[..to_read], 1).await?; + + if bytes_read == 0 { + break; // EOF + } + + output.write(&buffer[..bytes_read]).await?; + total_pumped += bytes_read; + remaining = remaining.saturating_sub(bytes_read); + } + + Ok(total_pumped) + } +} + +#[async_trait(?Send)] +impl AsyncOutputStream for CxxAsyncIoStream<'_> { + async fn write(&mut self, buffer: &[u8]) -> Result<()> { + self.0.as_mut().write(buffer).await.map_err(cxx_to_io_error) + } + + async fn write_vectored(&mut self, pieces: &[&[u8]]) -> Result<()> { + self.0 + .as_mut() + .write_vectored(pieces) + .await + .map_err(cxx_to_io_error) + } + + async fn try_pump_from( + &mut self, + _input: &mut dyn AsyncInputStream, + _amount: usize, + ) -> Option> { + // For now, return None to indicate no optimization available + // TODO: Add optimized try_pump_from for CxxAsyncInputStream + None + } + + async fn when_write_disconnected(&mut self) -> Result<()> { + self.0 + .as_mut() + .when_write_disconnected() + .await + .map_err(cxx_to_io_error) + } +} + +#[async_trait(?Send)] +impl AsyncIoStream for CxxAsyncIoStream<'_> { + async fn shutdown_write(&mut self) -> Result<()> { + self.0.as_mut().shutdown_write(); + Ok(()) + } + + fn abort_read(&mut self) { + self.0.as_mut().abort_read(); + } +} + +// Rust opaque types for use from C++ + +/// Opaque Rust type that can hold any `AsyncInputStream` implementation +pub struct RustAsyncInputStream(Box); + +impl RustAsyncInputStream { + pub fn new(stream: T) -> Self { + Self(Box::new(stream)) + } + + // FFI method implementations + /// # Errors + /// + /// Returns an error if reading from the stream fails. + pub async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result { + self.0.try_read(buffer, min_bytes).await + } + + #[must_use] + pub fn try_get_length(&self) -> usize { + self.0.try_get_length().unwrap_or(0) + } + + /// # Errors + /// + /// Returns an error if pumping data fails. + pub async fn pump_to( + &mut self, + output: &mut RustAsyncOutputStream, + amount: usize, + ) -> Result { + self.0.pump_to(output.0.as_mut(), amount).await + } +} + +unsafe impl cxx::ExternType for RustAsyncInputStream { + type Id = type_id!("kj_rs_io::ffi::RustAsyncInputStream"); + type Kind = cxx::kind::Opaque; +} + +/// Opaque Rust type that can hold any `AsyncOutputStream` implementation +pub struct RustAsyncOutputStream(Box); + +impl RustAsyncOutputStream { + pub fn new(stream: T) -> Self { + Self(Box::new(stream)) + } + + // FFI method implementations + /// # Errors + /// + /// Returns an error if writing to the stream fails. + pub async fn write(&mut self, buffer: &[u8]) -> Result<()> { + self.0.write(buffer).await + } + + /// # Errors + /// + /// Returns an error if writing to the stream fails. + pub async fn write_vectored(&mut self, pieces: &[&[u8]]) -> Result<()> { + self.0.write_vectored(pieces).await + } + + /// # Errors + /// + /// Returns an error if pumping data fails. + pub async fn try_pump_from( + &mut self, + input: &mut RustAsyncInputStream, + amount: usize, + ) -> Result { + match self.0.try_pump_from(input.0.as_mut(), amount).await { + Some(result) => result, + None => Ok(0), // Return 0 to indicate no optimization available + } + } + + /// # Errors + /// + /// Returns an error if checking disconnection status fails. + pub async fn when_write_disconnected(&mut self) -> Result<()> { + self.0.when_write_disconnected().await + } +} + +/// Opaque Rust type that can hold any `AsyncIoStream` implementation +pub struct RustAsyncIoStream(Box); + +impl RustAsyncIoStream { + pub fn new(stream: T) -> Self { + Self(Box::new(stream)) + } + + // FFI method implementations - AsyncInputStream part + /// # Errors + /// + /// Returns an error if reading from the stream fails. + pub async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result { + self.0.try_read(buffer, min_bytes).await + } + + #[must_use] + pub fn try_get_length(&self) -> usize { + self.0.try_get_length().unwrap_or(0) + } + + /// # Errors + /// + /// Returns an error if pumping data fails. + pub async fn pump_to( + &mut self, + output: &mut RustAsyncOutputStream, + amount: usize, + ) -> Result { + self.0.pump_to(output.0.as_mut(), amount).await + } + + // FFI method implementations - AsyncOutputStream part + /// # Errors + /// + /// Returns an error if writing to the stream fails. + pub async fn write(&mut self, buffer: &[u8]) -> Result<()> { + self.0.write(buffer).await + } + + /// # Errors + /// + /// Returns an error if writing to the stream fails. + pub async fn write_vectored(&mut self, pieces: &[&[u8]]) -> Result<()> { + self.0.write_vectored(pieces).await + } + + /// # Errors + /// + /// Returns an error if pumping data fails. + pub async fn try_pump_from( + &mut self, + input: &mut RustAsyncInputStream, + amount: usize, + ) -> Result { + match self.0.try_pump_from(input.0.as_mut(), amount).await { + Some(result) => result, + None => Ok(0), // Return 0 to indicate no optimization available + } + } + + /// # Errors + /// + /// Returns an error if checking disconnection status fails. + pub async fn when_write_disconnected(&mut self) -> Result<()> { + self.0.when_write_disconnected().await + } + + // FFI method implementations - AsyncIoStream specific + /// # Errors + /// + /// Returns an error if shutting down the write end fails. + pub async fn shutdown_write(&mut self) -> Result<()> { + self.0.shutdown_write().await + } + + pub fn abort_read(&mut self) { + self.0.abort_read(); + } +} diff --git a/kj-rs/io/tests.c++ b/kj-rs/io/tests.c++ new file mode 100644 index 000000000..d97cf4244 --- /dev/null +++ b/kj-rs/io/tests.c++ @@ -0,0 +1,304 @@ +#include "kj-rs/io/tests.h" + +#include "kj-rs/convert.h" +#include "kj-rs/io/bridge.h" +#include "kj-rs/io/lib.rs.h" +#include "kj-rs/io/tests.rs.h" + +#include +#include +#include + +#include +#include + +using namespace kj_rs; + +namespace kj_rs_io_test { + +template +class RustAsyncInputStream: public kj::AsyncInputStream { + public: + explicit RustAsyncInputStream(T&& impl): impl(kj::mv(impl)) {} + virtual ~RustAsyncInputStream() = default; + + // kj::AsyncInputStream interface + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + return impl->try_read( + kj::arrayPtr(reinterpret_cast(buffer), maxBytes).as(), minBytes); + } + + kj::Maybe tryGetLength() override { + // todo + return kj::none; + } + + // kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { + // return impl->pumpTo(output, amount); + // } + + private: + T impl; +}; + +template +class RustAsyncOutputStream: public kj::AsyncOutputStream { + public: + explicit RustAsyncOutputStream(T&& impl): impl(kj::mv(impl)) {} + virtual ~RustAsyncOutputStream() = default; + + // kj::AsyncOutputStream interface + kj::Promise write(kj::ArrayPtr buffer) override { + return impl->write(buffer); + } + + kj::Promise write(kj::ArrayPtr> pieces) override { + return impl->write(pieces); + } + + kj::Maybe> tryPumpFrom( + kj::AsyncInputStream& input, uint64_t amount) override { + return impl->tryPumpFrom(input, amount); + } + + kj::Promise whenWriteDisconnected() override { + return impl->whenWriteDisconnected(); + } + + private: + T impl; +}; + +template +class RustAsyncIoStream: public kj::AsyncIoStream { + public: + explicit RustAsyncIoStream(T&& impl): impl(kj::mv(impl)) {} + virtual ~RustAsyncIoStream() = default; + + // kj::AsyncInputStream interface + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + return impl->tryRead(buffer, minBytes, maxBytes); + } + + kj::Maybe tryGetLength() override { + return impl->tryGetLength(); + } + + kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { + return impl->pumpTo(output, amount); + } + + // kj::AsyncOutputStream interface + kj::Promise write(kj::ArrayPtr buffer) override { + return impl->write(buffer); + } + + kj::Promise write(kj::ArrayPtr> pieces) override { + return impl->write(pieces); + } + + kj::Maybe> tryPumpFrom( + kj::AsyncInputStream& input, uint64_t amount) override { + return impl->tryPumpFrom(input, amount); + } + + kj::Promise whenWriteDisconnected() override { + return impl->whenWriteDisconnected(); + } + + // kj::AsyncIoStream interface + void shutdownWrite() override { + impl->shutdownWrite(); + } + + void abortRead() override { + impl->abortRead(); + } + + private: + T impl; +}; + +// Simple input stream that reads from array data +class ArrayInputStream: public kj::AsyncInputStream { + public: + ArrayInputStream(kj::ArrayPtr data): data(data) {} + virtual ~ArrayInputStream() = default; + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + size_t toRead = std::min(maxBytes, data.size()); + + if (toRead == 0) { + return size_t(0); // EOF + } + + memcpy(buffer, data.begin(), toRead); + data = data.slice(toRead); + return toRead; + } + + kj::Maybe tryGetLength() override { + return data.size(); + } + + private: + kj::ArrayPtr data; +}; + +// Simple output stream that writes to a kj::Vector +class VectorOutputStream: public kj::AsyncOutputStream { + public: + VectorOutputStream() = default; + virtual ~VectorOutputStream() = default; + + kj::Promise write(kj::ArrayPtr buffer) override { + data.addAll(buffer); + return kj::READY_NOW; + } + + kj::Promise write(kj::ArrayPtr> pieces) override { + for (auto piece: pieces) { + data.addAll(piece); + } + return kj::READY_NOW; + } + + kj::Promise whenWriteDisconnected() override { + return kj::NEVER_DONE; // Never disconnected + } + + const kj::Vector& getData() const { + return data; + } + + void clear() { + data.clear(); + } + + private: + kj::Vector data; +}; + +// C++ implementation of FNV-1a hash algorithm (matching Rust implementation) +kj::Promise computeStreamHash(kj::AsyncInputStream& stream) { + static constexpr uint64_t FNV_OFFSET_BASIS = 14695981039346656037ULL; + static constexpr uint64_t FNV_PRIME = 1099511628211ULL; + + uint64_t hash = FNV_OFFSET_BASIS; + auto buffer = kj::heapArray(4096); + + for (;;) { + size_t bytesRead = co_await stream.tryRead(buffer.begin(), 1, buffer.size()); + + if (bytesRead == 0) { + co_return hash; // EOF + } + + // FNV-1a hash algorithm + for (size_t i = 0; i < bytesRead; i++) { + hash ^= static_cast(buffer[i]); + hash *= FNV_PRIME; + } + } +} + +KJ_TEST("Read C++ ArrayInputStream in C++") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + auto testData = "Hello, World!"_kjb; + + auto stream = ArrayInputStream(testData); + auto hash = computeStreamHash(stream).wait(waitScope); + KJ_EXPECT(hash == 7993990320990026836); + + auto stream2 = ArrayInputStream(testData); + auto hash2 = computeStreamHash(stream2).wait(waitScope); + KJ_EXPECT(hash == hash2); +} + +KJ_TEST("Read C++ ArrayInputStream in Rust") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + auto testData = "Hello, World!"_kjb; + + auto stream = kj_rs_io::ffi::CxxAsyncInputStream(kj::heap(testData)); + auto hash = compute_stream_hash_ffi(stream).wait(waitScope); + + KJ_EXPECT(hash == 7993990320990026836); + + auto stream2 = kj_rs_io::ffi::CxxAsyncInputStream(kj::heap(testData)); + auto hash2 = compute_stream_hash_ffi(stream2).wait(waitScope); + + KJ_EXPECT(hash == hash2); +} + +KJ_TEST("Write to C++ OutputStream from Rust") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + auto vectorStream = kj::heap(); + auto* streamPtr = vectorStream.get(); + + auto stream = kj_rs_io::ffi::CxxAsyncOutputStream(kj::mv(vectorStream)); + + // Generate 100 bytes of pseudorandom data + generate_prng_ffi(stream, 100).wait(waitScope); + + // Check that data was written + const auto& data = streamPtr->getData(); + KJ_EXPECT(data.size() == 100); + + // Test that the data is deterministic by comparing with another stream + auto vectorStream2 = kj::heap(); + auto* streamPtr2 = vectorStream2.get(); + auto stream2 = kj_rs_io::ffi::CxxAsyncOutputStream(kj::mv(vectorStream2)); + + generate_prng_ffi(stream2, 100).wait(waitScope); + + const auto& data2 = streamPtr2->getData(); + KJ_EXPECT(data.size() == data2.size()); + + // Compare the data byte by byte + for (size_t i = 0; i < data.size(); i++) { + KJ_EXPECT(data[i] == data2[i], "Data should be deterministic", i, data[i], data2[i]); + } +} + +KJ_TEST("Write large data to C++ OutputStream from Rust") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + auto vectorStream = kj::heap(); + auto* streamPtr = vectorStream.get(); + + auto stream = kj_rs_io::ffi::CxxAsyncOutputStream(kj::mv(vectorStream)); + + // Generate 2048 * 2048 bytes (multiple chunks) + generate_prng_ffi(stream, 2048 * 2048).wait(waitScope); + + // Check that data was written + const auto& data = streamPtr->getData(); + KJ_EXPECT(data.size() == 2048 * 2048); + + // Basic check that the data varies (not all zeros) + size_t zeroCount = 0; + for (auto byte: data) { + if (byte == 0) zeroCount++; + } + KJ_EXPECT(zeroCount < data.size() / 10, "Data should vary, less than 10% zeros"); +} + +KJ_TEST("Read Rust InputStream from C++") { + kj::EventLoop loop; + kj::WaitScope waitScope(loop); + + auto testData = "Hello, World!"_kjb; + + auto stream = RustAsyncInputStream(create_rust_mock_input_stream(testData.as())); + auto hash = computeStreamHash(stream).wait(waitScope); + KJ_EXPECT(hash == 7993990320990026836); +} + +} // namespace kj_rs_io_test \ No newline at end of file diff --git a/kj-rs/io/tests.h b/kj-rs/io/tests.h new file mode 100644 index 000000000..fc15f51c5 --- /dev/null +++ b/kj-rs/io/tests.h @@ -0,0 +1,12 @@ +#pragma once + +#include "kj-rs/io/bridge.h" + +#include + +#include +#include + +#include + +namespace kj_rs_io_test {} // namespace kj_rs_io_test \ No newline at end of file diff --git a/kj-rs/io/tests.rs b/kj-rs/io/tests.rs new file mode 100644 index 000000000..2dc475dc4 --- /dev/null +++ b/kj-rs/io/tests.rs @@ -0,0 +1,815 @@ +//! Test utilities for the kj-rs io module +//! +//! This module provides mock implementations and test utilities for the async I/O stream +//! implementations, including support for both Rust and C++ tests via FFI. + +use async_trait::async_trait; +use futures::executor::block_on; +use std::{future::Future, pin::Pin}; + +use io::{AsyncInputStream, AsyncIoStream, AsyncOutputStream, Result, RustAsyncInputStream}; + +#[cfg(test)] +use io::unoptimized_pump_to; + +// Mock implementations for testing + +/// Mock input stream that provides predefined data +pub struct MockInputStream { + data: Vec, + position: usize, + read_error: bool, +} + +impl MockInputStream { + /// Create a new mock input stream with the given data + #[must_use] + pub fn new(data: Vec) -> Self { + Self { + data, + position: 0, + read_error: false, + } + } + + /// Configure the stream to return read errors + #[must_use] + pub fn with_error(mut self) -> Self { + self.read_error = true; + self + } + + /// Get the remaining bytes in the stream + #[must_use] + pub fn remaining(&self) -> usize { + self.data.len() - self.position + } + + /// Unsafe version of `try_read` that directly calls the `AsyncInputStream` implementation + /// + /// # Errors + /// + /// Returns an error if reading from the stream fails, such as when the stream is configured to return errors. + pub async fn try_read_unsafe(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result { + ::try_read(self, buffer, min_bytes).await + } +} + +#[async_trait(?Send)] +impl AsyncInputStream for MockInputStream { + async fn try_read(&mut self, buffer: &mut [u8], _min_bytes: usize) -> Result { + if self.read_error { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Mock read error", + )); + } + + let available = self.data.len() - self.position; + let to_read = std::cmp::min(buffer.len(), available); + + if to_read == 0 { + return Ok(0); // EOF + } + + buffer[..to_read].copy_from_slice(&self.data[self.position..self.position + to_read]); + self.position += to_read; + Ok(to_read) + } + + fn try_get_length(&self) -> Option { + Some(self.data.len() - self.position) + } + + async fn pump_to(&mut self, output: &mut dyn AsyncOutputStream, amount: usize) -> Result { + let mut buffer = [0u8; 4096]; + let mut total_pumped = 0; + let mut remaining = amount; + + while remaining > 0 { + let to_read = std::cmp::min(remaining, buffer.len()); + let bytes_read = self.try_read(&mut buffer[..to_read], 1).await?; + + if bytes_read == 0 { + break; // EOF + } + + output.write(&buffer[..bytes_read]).await?; + total_pumped += bytes_read; + remaining = remaining.saturating_sub(bytes_read); + } + + Ok(total_pumped) + } +} + +/// Mock output stream that captures written data +pub struct MockOutputStream { + data: Vec, + write_error: bool, + disconnected: bool, +} + +impl MockOutputStream { + /// Create a new mock output stream + #[must_use] + pub fn new() -> Self { + Self { + data: Vec::new(), + write_error: false, + disconnected: false, + } + } + + /// Configure the stream to return write errors + #[must_use] + pub fn with_error(mut self) -> Self { + self.write_error = true; + self + } + + /// Configure the stream as disconnected + #[must_use] + pub fn with_disconnected(mut self) -> Self { + self.disconnected = true; + self + } + + /// Get the data that has been written to the stream + #[must_use] + pub fn written_data(&self) -> &[u8] { + &self.data + } +} + +impl Default for MockOutputStream { + fn default() -> Self { + Self::new() + } +} + +#[async_trait(?Send)] +impl AsyncOutputStream for MockOutputStream { + async fn write(&mut self, buffer: &[u8]) -> Result<()> { + if self.write_error { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Mock write error", + )); + } + self.data.extend_from_slice(buffer); + Ok(()) + } + + async fn write_vectored(&mut self, pieces: &[&[u8]]) -> Result<()> { + for piece in pieces { + self.write(piece).await?; + } + Ok(()) + } + + async fn when_write_disconnected(&mut self) -> Result<()> { + if self.disconnected { + Ok(()) + } else { + // Simulate never resolving for non-disconnected streams + std::future::pending().await + } + } +} + +/// Mock bidirectional I/O stream +pub struct MockIoStream { + input: MockInputStream, + output: MockOutputStream, + shutdown: bool, +} + +impl MockIoStream { + /// Create a new mock I/O stream with the given input data + #[must_use] + pub fn new(input_data: Vec) -> Self { + Self { + input: MockInputStream::new(input_data), + output: MockOutputStream::new(), + shutdown: false, + } + } + + /// Get the data that has been written to the output side + #[must_use] + pub fn written_data(&self) -> &[u8] { + self.output.written_data() + } + + /// Check if the stream has been shut down for writing + #[must_use] + pub fn is_shutdown(&self) -> bool { + self.shutdown + } +} + +#[async_trait(?Send)] +impl AsyncInputStream for MockIoStream { + async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result { + ::try_read(&mut self.input, buffer, min_bytes).await + } + + fn try_get_length(&self) -> Option { + self.input.try_get_length() + } + + async fn pump_to(&mut self, output: &mut dyn AsyncOutputStream, amount: usize) -> Result { + self.input.pump_to(output, amount).await + } +} + +#[async_trait(?Send)] +impl AsyncOutputStream for MockIoStream { + async fn write(&mut self, buffer: &[u8]) -> Result<()> { + self.output.write(buffer).await + } + + async fn write_vectored(&mut self, pieces: &[&[u8]]) -> Result<()> { + self.output.write_vectored(pieces).await + } + + async fn when_write_disconnected(&mut self) -> Result<()> { + self.output.when_write_disconnected().await + } +} + +#[async_trait(?Send)] +impl AsyncIoStream for MockIoStream { + async fn shutdown_write(&mut self) -> Result<()> { + self.shutdown = true; + Ok(()) + } + + fn abort_read(&mut self) { + // For mock implementation, we just mark as shutdown + self.shutdown = true; + } +} + +/// Helper function to run async tests +pub fn run_async_test(test: F) -> Fut::Output +where + F: FnOnce() -> Fut, + Fut: Future, +{ + block_on(test()) +} + +/// Compute a simple FNV-1a hash of data from an `AsyncInputStream` +/// This is a non-cryptographic hash function suitable for testing +/// +/// # Errors +/// +/// Returns an error if reading from the stream fails. +pub async fn compute_stream_hash(stream: &mut T) -> Result { + const FNV_OFFSET_BASIS: u64 = 14_695_981_039_346_656_037; + const FNV_PRIME: u64 = 1_099_511_628_211; + + let mut hash = FNV_OFFSET_BASIS; + let mut buffer = [0u8; 4096]; + + loop { + #[allow(unused_variables)] + let buffer_len = buffer.len(); + let bytes_read = stream.try_read(&mut buffer, 1).await?; + if bytes_read == 0 { + break; // EOF + } + + // FNV-1a hash algorithm + for &byte in &buffer[..bytes_read] { + hash ^= u64::from(byte); + hash = hash.wrapping_mul(FNV_PRIME); + } + } + + Ok(hash) +} + +/// Generate pseudorandom data to an `AsyncOutputStream` in 1024-byte chunks +/// +/// # Errors +/// +/// Returns an error if writing to the stream fails. +pub async fn generate_prng(stream: &mut T, size: u64) -> Result<()> { + const CHUNK_SIZE: usize = 1024; + let mut prng = 0u8; + let mut remaining = size; + let mut buffer = [0u8; CHUNK_SIZE]; + + while remaining > 0 { + #[allow(clippy::cast_possible_truncation)] + let chunk_size = std::cmp::min(remaining as usize, CHUNK_SIZE); + + // Fill buffer with pseudorandom data + for item in buffer.iter_mut().take(chunk_size) { + prng = prng.wrapping_mul(5).wrapping_add(1); + *item = prng; + } + + // Write the chunk + stream.write(&buffer[..chunk_size]).await?; + remaining -= chunk_size as u64; + } + + Ok(()) +} + +// FFI bridge to expose functionality to C++ +#[cxx::bridge(namespace = "kj_rs_io_test")] +pub mod ffi { + #[namespace = "kj_rs_io::ffi"] + unsafe extern "C++" { + include!("kj-rs/io/bridge.h"); + include!("kj-rs/io/lib.rs.h"); + + type CxxAsyncInputStream = io::ffi::CxxAsyncInputStream; + type CxxAsyncOutputStream = io::ffi::CxxAsyncOutputStream; + type CxxAsyncIoStream = io::ffi::CxxAsyncIoStream; + } + + extern "Rust" { + #[namespace = "kj_rs_io::ffi"] + type RustAsyncInputStream = io::RustAsyncInputStream; + + /// Compute hash of data from a `CxxAsyncInputStream` + async unsafe fn compute_stream_hash_ffi<'a>( + stream: Pin<&'a mut CxxAsyncInputStream>, + ) -> Result; + + /// Generate pseudorandom data to a `CxxAsyncOutputStream` + async unsafe fn generate_prng_ffi<'a>( + stream: Pin<&'a mut CxxAsyncOutputStream>, + size: u64, + ) -> Result<()>; + + /// Create a Rust MockInputStream with given data + #[allow(clippy::unnecessary_box_returns)] + fn create_rust_mock_input_stream(data: &[u8]) -> Box; + + // /// Create a Rust MockInputStream that returns read errors + // fn create_rust_mock_input_stream_with_error() -> Box; + + // /// Create a Rust MockIoStream with given input data + // fn create_rust_mock_io_stream(input_data: &[u8]) -> Box; + + // /// Compute hash of data from a `RustAsyncInputStream` + // async unsafe fn compute_stream_hash_rust_ffi<'a>( + // stream: &'a mut RustAsyncInputStream, + // ) -> Result; + + // /// Compute hash of data from the input side of a `RustAsyncIoStream` + // async unsafe fn compute_stream_hash_iostream_ffi<'a>( + // stream: &'a mut RustAsyncIoStream, + // ) -> Result; + } +} + +/// Compute hash of data from a `CxxAsyncInputStream` +/// +/// # Safety +/// +/// The caller must ensure that the stream pointer is valid and properly aligned. +/// +/// # Errors +/// +/// Returns an error if reading from the stream fails. +#[allow(clippy::needless_lifetimes)] +pub async unsafe fn compute_stream_hash_ffi<'a>( + stream: Pin<&'a mut ffi::CxxAsyncInputStream>, +) -> Result { + let mut wrapper = io::CxxAsyncInputStream::new(stream); + compute_stream_hash(&mut wrapper).await +} + +/// Generate pseudorandom data to a `CxxAsyncOutputStream` +/// +/// # Safety +/// +/// The caller must ensure that the stream pointer is valid and properly aligned. +/// +/// # Errors +/// +/// Returns an error if writing to the stream fails. +#[allow(clippy::needless_lifetimes)] +pub async unsafe fn generate_prng_ffi<'a>( + stream: Pin<&'a mut ffi::CxxAsyncOutputStream>, + size: u64, +) -> Result<()> { + let mut wrapper = io::CxxAsyncOutputStream::new(stream); + generate_prng(&mut wrapper, size).await +} + +/// Create a Rust `MockInputStream` with given data +#[must_use] +pub fn create_rust_mock_input_stream(data: &[u8]) -> Box { + Box::new(RustAsyncInputStream::new(MockInputStream::new( + data.to_vec(), + ))) +} + +/// Create a Rust `MockOutputStream` +#[must_use] +pub fn create_rust_mock_output_stream() -> Box { + let mock_stream = MockOutputStream::new(); + Box::new(io::RustAsyncOutputStream::new(mock_stream)) +} + +/// Create a Rust `MockOutputStream` that returns write errors +#[must_use] +pub fn create_rust_mock_output_stream_with_error() -> Box { + let mock_stream = MockOutputStream::new().with_error(); + Box::new(io::RustAsyncOutputStream::new(mock_stream)) +} + +/// Create a Rust `MockIoStream` with given input data +#[must_use] +pub fn create_rust_mock_io_stream(input_data: &[u8]) -> Box { + let mock_stream = MockIoStream::new(input_data.to_vec()); + Box::new(io::RustAsyncIoStream::new(mock_stream)) +} + +/// Create a Rust `MockInputStream` that returns read errors +#[must_use] +pub fn create_rust_mock_input_stream_with_error() -> Box { + let mock_stream = MockInputStream::new(Vec::new()).with_error(); + Box::new(io::RustAsyncInputStream::new(mock_stream)) +} + +/// Get the data written to a `MockOutputStream` (for testing) +/// +/// # Panics +/// +/// This function will panic if the stream is not actually a `MockOutputStream`. +/// This is intended for testing only. +#[must_use] +pub fn get_mock_output_stream_data(_stream: &io::RustAsyncOutputStream) -> Vec { + // Note: This is a bit of a hack since we can't easily downcast through the FFI boundary. + // For real tests, we'd need a better way to extract test data. + // For now, this serves as a placeholder for the interface. + Vec::new() +} + +/// Compute hash of data from a `RustAsyncInputStream` +/// +/// # Safety +/// +/// The caller must ensure that the stream pointer is valid and properly aligned. +/// +/// # Errors +/// +/// Returns an error if reading from the stream fails. +pub async unsafe fn compute_stream_hash_rust_ffi( + stream: &mut io::RustAsyncInputStream, +) -> Result { + const FNV_OFFSET_BASIS: u64 = 14_695_981_039_346_656_037; + const FNV_PRIME: u64 = 1_099_511_628_211; + + let mut hash = FNV_OFFSET_BASIS; + let mut buffer = [0u8; 4096]; + + loop { + let bytes_read = stream.try_read(&mut buffer, 1).await?; + if bytes_read == 0 { + break; // EOF + } + + // FNV-1a hash algorithm + for &byte in &buffer[..bytes_read] { + hash ^= u64::from(byte); + hash = hash.wrapping_mul(FNV_PRIME); + } + } + + Ok(hash) +} + +/// Compute hash of data from the input side of a `RustAsyncIoStream` +/// +/// # Safety +/// +/// The caller must ensure that the stream pointer is valid and properly aligned. +/// +/// # Errors +/// +/// Returns an error if reading from the stream fails. +pub async unsafe fn compute_stream_hash_iostream_ffi( + stream: &mut io::RustAsyncIoStream, +) -> Result { + const FNV_OFFSET_BASIS: u64 = 14_695_981_039_346_656_037; + const FNV_PRIME: u64 = 1_099_511_628_211; + + let mut hash = FNV_OFFSET_BASIS; + let mut buffer = [0u8; 4096]; + + loop { + let bytes_read = stream.try_read(&mut buffer, 1).await?; + if bytes_read == 0 { + break; // EOF + } + + // FNV-1a hash algorithm + for &byte in &buffer[..bytes_read] { + hash ^= u64::from(byte); + hash = hash.wrapping_mul(FNV_PRIME); + } + } + + Ok(hash) +} + +#[cfg(test)] +mod tests { + use super::*; + + // Tests for AsyncInputStream trait + + #[test] + fn test_mock_input_stream_basic_read() { + run_async_test(|| async { + let mut stream = MockInputStream::new(b"Hello, World!".to_vec()); + let mut buffer = [0u8; 5]; + + let bytes_read = stream.try_read(&mut buffer, 1).await.unwrap(); + assert_eq!(bytes_read, 5); + assert_eq!(&buffer, b"Hello"); + assert_eq!(stream.remaining(), 8); + }); + } + + #[test] + fn test_mock_input_stream_try_get_length() { + let stream = MockInputStream::new(b"Hello".to_vec()); + assert_eq!(stream.try_get_length(), Some(5)); + } + + // Tests for AsyncOutputStream trait + + #[test] + fn test_mock_output_stream_basic_write() { + run_async_test(|| async { + let mut stream = MockOutputStream::new(); + + stream.write(b"Hello").await.unwrap(); + stream.write(b", World!").await.unwrap(); + + assert_eq!(stream.written_data(), b"Hello, World!"); + }); + } + + #[test] + fn test_mock_output_stream_write_vectored() { + run_async_test(|| async { + let mut stream = MockOutputStream::new(); + let pieces = [b"Hello".as_slice(), b", ".as_slice(), b"World!".as_slice()]; + + stream.write_vectored(&pieces).await.unwrap(); + + assert_eq!(stream.written_data(), b"Hello, World!"); + }); + } + + #[test] + fn test_mock_output_stream_write_error() { + run_async_test(|| async { + let mut stream = MockOutputStream::new().with_error(); + + let result = stream.write(b"test").await; + assert!(result.is_err()); + assert_eq!(stream.written_data().len(), 0); + }); + } + + #[test] + fn test_mock_output_stream_when_write_disconnected() { + run_async_test(|| async { + let mut stream = MockOutputStream::new().with_disconnected(); + + // This should complete immediately for disconnected stream + let result = stream.when_write_disconnected().await; + assert!(result.is_ok()); + }); + } + + // Tests for AsyncIoStream trait + + #[test] + fn test_mock_io_stream_bidirectional() { + run_async_test(|| async { + let mut stream = MockIoStream::new(b"input data".to_vec()); + + // Test reading + let mut buffer = [0u8; 5]; + let bytes_read = stream.try_read(&mut buffer, 1).await.unwrap(); + assert_eq!(bytes_read, 5); + assert_eq!(&buffer, b"input"); + + // Test writing + stream.write(b"output").await.unwrap(); + assert_eq!(stream.written_data(), b"output"); + }); + } + + #[test] + fn test_mock_io_stream_shutdown() { + run_async_test(|| async { + let mut stream = MockIoStream::new(b"test".to_vec()); + + assert!(!stream.is_shutdown()); + stream.shutdown_write().await.unwrap(); + assert!(stream.is_shutdown()); + }); + } + + #[test] + fn test_mock_io_stream_abort_read() { + let mut stream = MockIoStream::new(b"test".to_vec()); + + assert!(!stream.is_shutdown()); + stream.abort_read(); + assert!(stream.is_shutdown()); + } + + // Tests for utility functions + + #[test] + fn test_compute_stream_hash() { + run_async_test(|| async { + let mut stream = MockInputStream::new(b"Hello, World!".to_vec()); + let hash = compute_stream_hash(&mut stream).await.unwrap(); + + // Hash should be deterministic + assert!(hash > 0); + + // Test with same data again + let mut stream2 = MockInputStream::new(b"Hello, World!".to_vec()); + let hash2 = compute_stream_hash(&mut stream2).await.unwrap(); + assert_eq!(hash, hash2); + }); + } + + #[test] + fn test_compute_stream_hash_different_data() { + run_async_test(|| async { + let mut stream1 = MockInputStream::new(b"Hello, World!".to_vec()); + let mut stream2 = MockInputStream::new(b"Hello, Rust!".to_vec()); + + let hash1 = compute_stream_hash(&mut stream1).await.unwrap(); + let hash2 = compute_stream_hash(&mut stream2).await.unwrap(); + + // Different data should produce different hashes + assert_ne!(hash1, hash2); + }); + } + + #[test] + fn test_compute_stream_hash_empty() { + const FNV_OFFSET_BASIS: u64 = 14_695_981_039_346_656_037; + + run_async_test(|| async { + let mut stream = MockInputStream::new(Vec::new()); + let hash = compute_stream_hash(&mut stream).await.unwrap(); + + // Empty data should produce the FNV offset basis + assert_eq!(hash, FNV_OFFSET_BASIS); + }); + } + + #[test] + fn test_unoptimized_pump_to() { + run_async_test(|| async { + let mut input = MockInputStream::new(b"pump test data".to_vec()); + let mut output = MockOutputStream::new(); + + let bytes_pumped = unoptimized_pump_to(&mut input, &mut output, 1000, 0) + .await + .unwrap(); + + assert_eq!(bytes_pumped, 14); + assert_eq!(output.written_data(), b"pump test data"); + assert_eq!(input.remaining(), 0); + }); + } + + #[test] + fn test_unoptimized_pump_to_partial() { + run_async_test(|| async { + let mut input = MockInputStream::new(b"pump test data".to_vec()); + let mut output = MockOutputStream::new(); + + // Only pump 4 bytes + let bytes_pumped = unoptimized_pump_to(&mut input, &mut output, 4, 0) + .await + .unwrap(); + + assert_eq!(bytes_pumped, 4); + assert_eq!(output.written_data(), b"pump"); + assert_eq!(input.remaining(), 10); + }); + } + + #[test] + fn test_unoptimized_pump_to_with_completed() { + run_async_test(|| async { + let mut input = MockInputStream::new(b"test".to_vec()); + let mut output = MockOutputStream::new(); + + // Start with 2 bytes already completed + let bytes_pumped = unoptimized_pump_to(&mut input, &mut output, 6, 2) + .await + .unwrap(); + + assert_eq!(bytes_pumped, 6); // 4 new + 2 already completed + assert_eq!(output.written_data(), b"test"); + }); + } + + #[test] + fn test_generate_prng() { + run_async_test(|| async { + let mut output = MockOutputStream::new(); + + // Generate 100 bytes of pseudorandom data + generate_prng(&mut output, 100).await.unwrap(); + + let data = output.written_data(); + assert_eq!(data.len(), 100); + + // Test that the data is deterministic + let mut output2 = MockOutputStream::new(); + generate_prng(&mut output2, 100).await.unwrap(); + assert_eq!(data, output2.written_data()); + }); + } + + #[test] + fn test_generate_prng_large() { + run_async_test(|| async { + let mut output = MockOutputStream::new(); + + // Generate 2048 bytes (multiple chunks) + generate_prng(&mut output, 2048).await.unwrap(); + + let data = output.written_data(); + assert_eq!(data.len(), 2048); + + // Check that the data varies (basic test that generator is working) + #[allow(clippy::naive_bytecount)] + let zero_count = data.iter().filter(|&&b| b == 0).count(); + assert!(zero_count < data.len() / 10); // Less than 10% zeros + }); + } + + #[test] + fn test_generate_prng_partial_chunk() { + run_async_test(|| async { + let mut output = MockOutputStream::new(); + + // Generate 1500 bytes (1 full chunk + partial chunk) + generate_prng(&mut output, 1500).await.unwrap(); + + let data = output.written_data(); + assert_eq!(data.len(), 1500); + }); + } + + // Integration tests that would work with actual FFI implementations + // Note: These are placeholders since we don't have actual C++ stream implementations available in tests + + #[test] + fn test_trait_interfaces_exist() { + // Test that our trait types exist and have the right interfaces + fn _check_input_stream_interface(_stream: T) {} + fn _check_output_stream_interface(_stream: T) {} + fn _check_io_stream_interface(_stream: T) {} + + // This test exists mainly to ensure the traits compile correctly + // The actual FFI wrapper types would be tested with C++ integration tests + } + + #[cfg(test)] + mod extension_trait_tests { + use super::*; + use io::{AsyncInputStreamExt, AsyncOutputStreamExt}; + + #[test] + fn test_async_read_adapter() { + let stream = MockInputStream::new(b"test".to_vec()); + let _adapter = stream.into_async_read(); + // Test that the adapter can be created + } + + #[test] + fn test_async_write_adapter() { + let stream = MockOutputStream::new(); + let _adapter = stream.into_async_write(); + // Test that the adapter can be created + } + } +} diff --git a/kj-rs/tests/BUILD.bazel b/kj-rs/tests/BUILD.bazel index 7636d884d..28ebe9c45 100644 --- a/kj-rs/tests/BUILD.bazel +++ b/kj-rs/tests/BUILD.bazel @@ -21,6 +21,7 @@ rust_library( # TODO(now): Why isn't :cxx transitive? "@workerd-cxx//:cxx", "//kj-rs", + "@crates.io//:futures", ], ) @@ -104,3 +105,18 @@ cc_test( "@capnp-cpp//src/kj:kj-test", ], ) + +cc_test( + name = "async-stream-test", + size = "medium", + srcs = [ + "async-stream-test.c++", + ], + deps = [ + ":tests", + ":bridge", + ":test-promises", + "@capnp-cpp//src/kj:kj-test", + ], + linkstatic = True, +) diff --git a/kj-rs/tests/async-stream-test.c++ b/kj-rs/tests/async-stream-test.c++ new file mode 100644 index 000000000..7528fff92 --- /dev/null +++ b/kj-rs/tests/async-stream-test.c++ @@ -0,0 +1,94 @@ +#include "kj-rs-demo/lib.rs.h" + +#include +#include +#include +#include + +#include +#include + +namespace kj_rs_test { + +class ZeroInputStream: public kj::AsyncInputStream { + public: + ZeroInputStream(size_t len): rem(len) {} + virtual ~ZeroInputStream() = default; + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + auto n = std::min(maxBytes, rem); + memset(buffer, 0, n); + rem -= n; + return n; + } + + private: + size_t rem; +}; + +template +class RustAsyncInputStream final: public kj::AsyncInputStream { + public: + RustAsyncInputStream(::rust::Box impl): impl(kj::mv(impl)) {} + virtual ~RustAsyncInputStream() = default; + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + auto slice = ::rust::Slice(static_cast(buffer), maxBytes); + return impl->try_read(slice, maxBytes); + } + + private: + ::rust::Box impl; +}; + +template +size_t readFullStream(kj::AsyncInputStream& stream, size_t expectedLen) { + kj::EventLoop loop; + kj::Maybe maybeException; + kj::WaitScope waitScope(loop); + + return [&]() -> kj::Promise { + auto buffer = kj::heapArray(bufferSize); + size_t len = 0; + + while (true) { + size_t n = co_await stream.tryRead(buffer.begin(), bufferSize, bufferSize); + if (n == 0) { + break; + } + len += n; + } + co_return len; + }().wait(waitScope); +} + +KJ_TEST("C++ ZeroInputStream") { + constexpr auto size = 1024; + ZeroInputStream stream(size); + KJ_ASSERT(readFullStream<127>(stream, size) == size); +} + +KJ_TEST("Rust ZeroInputStream") { + constexpr auto size = 1024; + RustAsyncInputStream stream(kj_rs_demo::new_zero_stream(size)); + KJ_ASSERT(readFullStream<127>(stream, size) == size); +} + +#ifdef NDEBUG +// ~1sec +constexpr auto benchmarkSize = 1024 * 1024 * 1024 * 10l; +#else +constexpr auto benchmarkSize = 1024 * 1024; +#endif + +KJ_TEST("Benchmark C++ ZeroInputStream") { + ZeroInputStream stream(benchmarkSize); + KJ_ASSERT(readFullStream<1024 + 1>(stream, benchmarkSize) == benchmarkSize); +} + +KJ_TEST("Benchmark Rust ZeroInputStream") { + RustAsyncInputStream stream(kj_rs_demo::new_zero_stream(benchmarkSize)); + KJ_ASSERT(readFullStream<1024 + 1>(stream, benchmarkSize) == benchmarkSize); +} + +} // namespace kj_rs_test diff --git a/kj-rs/tests/async_stream.rs b/kj-rs/tests/async_stream.rs new file mode 100644 index 000000000..c2831faa1 --- /dev/null +++ b/kj-rs/tests/async_stream.rs @@ -0,0 +1,84 @@ +type Result = std::io::Result; + +/// Async stream of zeros of a given size +pub struct ZeroStream(usize); + +impl ZeroStream { + pub async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result { + let mut n = 0; + while n < min_bytes { + let k = futures::AsyncReadExt::read(self, &mut buffer[n..]).await?; + if k == 0 { + break; + } + n += k; + } + Ok(n) + } +} + +#[allow(clippy::unnecessary_box_returns)] +pub fn new_zero_stream(size: usize) -> Box { + Box::new(ZeroStream(size)) +} + +impl futures::io::AsyncRead for ZeroStream { + fn poll_read( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let this = self.get_mut(); + let n = std::cmp::min(this.0, buf.len()); + this.0 -= n; + buf[..n].fill(0); + std::task::Poll::Ready(Ok(n)) + } +} + +#[cfg(test)] +mod tests { + use futures::executor::LocalPool; + + use super::*; + + pub fn run_local(future: Fut) -> Fut::Output + where + Fut: Future, + { + let mut pool = LocalPool::new(); + pool.run_until(future) + } + + #[test] + fn read_shorter_than_length() { + let mut stream = ZeroStream(5); + let mut buffer = [0; 3]; + assert_eq!(run_local(stream.try_read(&mut buffer, 3)).unwrap(), 3); + assert_eq!(stream.0, 2); + } + + #[test] + fn read_longer_than_length() { + let mut stream = ZeroStream(3); + let mut buffer = [0; 5]; + assert_eq!(run_local(stream.try_read(&mut buffer, 5)).unwrap(), 3); + assert_eq!(stream.0, 0); + } + + #[test] + fn try_read_shorter_than_length() { + let mut stream = ZeroStream(5); + let mut buffer = [0; 3]; + assert_eq!(run_local(stream.try_read(&mut buffer, 3)).unwrap(), 3); + assert_eq!(stream.0, 2); + } + + #[test] + fn try_read_longer_than_length() { + let mut stream = ZeroStream(3); + let mut buffer = [0; 5]; + assert_eq!(run_local(stream.try_read(&mut buffer, 5)).unwrap(), 3); + assert_eq!(stream.0, 0); + } +} diff --git a/kj-rs/tests/awaitables-cc-test.c++ b/kj-rs/tests/awaitables-cc-test.c++ index 274d8d093..4fe4577bf 100644 --- a/kj-rs/tests/awaitables-cc-test.c++ +++ b/kj-rs/tests/awaitables-cc-test.c++ @@ -1,6 +1,7 @@ // TODO(now): Make this a library, drive test from Rust. // TODO(now): Move as many cases as possible into kj-rs. +#include "kj-rs-demo/lib.rs.h" #include "kj-rs-demo/test-promises.h" #include "kj-rs/awaiter.h" #include "kj-rs/future.h" diff --git a/kj-rs/tests/lib.rs b/kj-rs/tests/lib.rs index aaba6f18f..4b826f8b7 100644 --- a/kj-rs/tests/lib.rs +++ b/kj-rs/tests/lib.rs @@ -6,6 +6,7 @@ #![allow(clippy::should_panic_without_expect)] #![allow(clippy::missing_panics_doc)] +mod async_stream; mod test_futures; mod test_own; @@ -21,6 +22,9 @@ use kj_rs::repr::Own; type Result = std::io::Result; type Error = std::io::Error; +use async_stream::{ZeroStream, new_zero_stream}; + +#[allow(clippy::unnecessary_box_returns)] #[cxx::bridge(namespace = "kj_rs_demo")] mod ffi { struct Shared { @@ -120,6 +124,19 @@ mod ffi { async unsafe fn lifetime_arg_void<'a>(buf: &'a [u8]); async unsafe fn lifetime_arg_result<'a>(buf: &'a [u8]) -> Result<()>; } + + // see `async_stream` + extern "Rust" { + type ZeroStream; + + fn new_zero_stream(size: usize) -> Box; + + async unsafe fn try_read<'a>( + self: &'a mut ZeroStream, + buffer: &'a mut [u8], + min_bytes: usize, + ) -> Result; + } } pub fn modify_own_return(mut own: Own) -> Own { diff --git a/kj-rs/tests/test-promises.c++ b/kj-rs/tests/test-promises.c++ index 0bc610098..e1814ffad 100644 --- a/kj-rs/tests/test-promises.c++ +++ b/kj-rs/tests/test-promises.c++ @@ -1,3 +1,4 @@ +#include #include #include diff --git a/kj-rs/tests/test-promises.h b/kj-rs/tests/test-promises.h index b92442a70..09dcc3843 100644 --- a/kj-rs/tests/test-promises.h +++ b/kj-rs/tests/test-promises.h @@ -1,11 +1,13 @@ #pragma once -#include "kj-rs-demo/lib.rs.h" - #include +#include + namespace kj_rs_demo { +struct Shared; + kj::Promise new_ready_promise_void(); kj::Promise new_pending_promise_void(); kj::Promise new_coroutine_promise_void(); diff --git a/kj-rs/tests/test_futures.rs b/kj-rs/tests/test_futures.rs index b7b5a9628..9040ab072 100644 --- a/kj-rs/tests/test_futures.rs +++ b/kj-rs/tests/test_futures.rs @@ -149,10 +149,7 @@ pub async fn new_layered_ready_future_void() -> Result<()> { } // From example at https://doc.rust-lang.org/std/future/fn.poll_fn.html#capturing-a-pinned-state -async fn naive_select( - a: impl Future, - b: impl Future, -) -> T { +async fn naive_select(a: impl Future, b: impl Future) -> T { let (mut a, mut b) = (pin!(a), pin!(b)); future::poll_fn(move |cx| { if let Poll::Ready(r) = a.as_mut().poll(cx) { diff --git a/macro/src/expand.rs b/macro/src/expand.rs index 7a3d2428e..400290651 100644 --- a/macro/src/expand.rs +++ b/macro/src/expand.rs @@ -90,7 +90,7 @@ fn expand(ffi: Module, doc: Doc, attrs: OtherAttrs, apis: &[Api], types: &Types) hidden.extend(expand_rust_type_layout(ety, types)); } Api::RustFunction(efn) => { - if efn.asyncness.is_some() { + if efn.sig.asyncness.is_some() && &efn.name.cxx == "read" { // todo!("expand_rust_function_shim\n{}", expand_rust_function_shim(efn, types).to_string()); } hidden.extend(expand_rust_function_shim(efn, types)); diff --git a/third-party/Cargo.lock b/third-party/Cargo.lock index 75f25ec87..fc1eb1546 100644 --- a/third-party/Cargo.lock +++ b/third-party/Cargo.lock @@ -8,6 +8,17 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "cc" version = "1.2.20" @@ -70,6 +81,7 @@ dependencies = [ name = "direct-cargo-bazel-deps" version = "0.0.1" dependencies = [ + "async-trait", "cc", "clap", "codespan-reporting", diff --git a/third-party/bazel/BUILD.async-trait-0.1.88.bazel b/third-party/bazel/BUILD.async-trait-0.1.88.bazel new file mode 100644 index 000000000..d39756fe4 --- /dev/null +++ b/third-party/bazel/BUILD.async-trait-0.1.88.bazel @@ -0,0 +1,97 @@ +############################################################################### +# @generated +# DO NOT MODIFY: This file is auto-generated by a crate_universe tool. To +# regenerate this file, run the following: +# +# bazel run @@//third-party:vendor +############################################################################### + +load("@rules_rust//cargo:defs.bzl", "cargo_toml_env_vars") +load("@rules_rust//rust:defs.bzl", "rust_proc_macro") + +package(default_visibility = ["//visibility:public"]) + +cargo_toml_env_vars( + name = "cargo_toml_env_vars", + src = "Cargo.toml", +) + +rust_proc_macro( + name = "async_trait", + srcs = glob( + include = ["**/*.rs"], + allow_empty = True, + ), + compile_data = glob( + include = ["**"], + allow_empty = True, + exclude = [ + "**/* *", + ".tmp_git_root/**/*", + "BUILD", + "BUILD.bazel", + "WORKSPACE", + "WORKSPACE.bazel", + ], + ), + crate_root = "src/lib.rs", + edition = "2021", + rustc_env_files = [ + ":cargo_toml_env_vars", + ], + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-bazel", + "crate-name=async-trait", + "manual", + "noclippy", + "norustfmt", + ], + target_compatible_with = select({ + "@rules_rust//rust/platform:aarch64-apple-darwin": [], + "@rules_rust//rust/platform:aarch64-apple-ios": [], + "@rules_rust//rust/platform:aarch64-apple-ios-sim": [], + "@rules_rust//rust/platform:aarch64-linux-android": [], + "@rules_rust//rust/platform:aarch64-pc-windows-msvc": [], + "@rules_rust//rust/platform:aarch64-unknown-fuchsia": [], + "@rules_rust//rust/platform:aarch64-unknown-linux-gnu": [], + "@rules_rust//rust/platform:aarch64-unknown-nixos-gnu": [], + "@rules_rust//rust/platform:aarch64-unknown-nto-qnx710": [], + "@rules_rust//rust/platform:aarch64-unknown-uefi": [], + "@rules_rust//rust/platform:arm-unknown-linux-gnueabi": [], + "@rules_rust//rust/platform:armv7-linux-androideabi": [], + "@rules_rust//rust/platform:armv7-unknown-linux-gnueabi": [], + "@rules_rust//rust/platform:i686-apple-darwin": [], + "@rules_rust//rust/platform:i686-linux-android": [], + "@rules_rust//rust/platform:i686-pc-windows-msvc": [], + "@rules_rust//rust/platform:i686-unknown-freebsd": [], + "@rules_rust//rust/platform:i686-unknown-linux-gnu": [], + "@rules_rust//rust/platform:powerpc-unknown-linux-gnu": [], + "@rules_rust//rust/platform:riscv32imc-unknown-none-elf": [], + "@rules_rust//rust/platform:riscv64gc-unknown-none-elf": [], + "@rules_rust//rust/platform:s390x-unknown-linux-gnu": [], + "@rules_rust//rust/platform:thumbv7em-none-eabi": [], + "@rules_rust//rust/platform:thumbv8m.main-none-eabi": [], + "@rules_rust//rust/platform:wasm32-unknown-unknown": [], + "@rules_rust//rust/platform:wasm32-wasip1": [], + "@rules_rust//rust/platform:x86_64-apple-darwin": [], + "@rules_rust//rust/platform:x86_64-apple-ios": [], + "@rules_rust//rust/platform:x86_64-linux-android": [], + "@rules_rust//rust/platform:x86_64-pc-windows-msvc": [], + "@rules_rust//rust/platform:x86_64-unknown-freebsd": [], + "@rules_rust//rust/platform:x86_64-unknown-fuchsia": [], + "@rules_rust//rust/platform:x86_64-unknown-linux-gnu": [], + "@rules_rust//rust/platform:x86_64-unknown-nixos-gnu": [], + "@rules_rust//rust/platform:x86_64-unknown-none": [], + "@rules_rust//rust/platform:x86_64-unknown-uefi": [], + "//conditions:default": ["@platforms//:incompatible"], + }), + version = "0.1.88", + deps = [ + "@vendor__proc-macro2-1.0.94//:proc_macro2", + "@vendor__quote-1.0.40//:quote", + "@vendor__syn-2.0.100//:syn", + ], +) diff --git a/third-party/bazel/BUILD.bazel b/third-party/bazel/BUILD.bazel index 89754c6b0..f8681ace8 100644 --- a/third-party/bazel/BUILD.bazel +++ b/third-party/bazel/BUILD.bazel @@ -31,6 +31,18 @@ filegroup( ) # Workspace Member Dependencies +alias( + name = "async-trait-0.1.88", + actual = "@vendor__async-trait-0.1.88//:async_trait", + tags = ["manual"], +) + +alias( + name = "async-trait", + actual = "@vendor__async-trait-0.1.88//:async_trait", + tags = ["manual"], +) + alias( name = "cc-1.2.20", actual = "@vendor__cc-1.2.20//:cc", diff --git a/third-party/bazel/BUILD.syn-2.0.100.bazel b/third-party/bazel/BUILD.syn-2.0.100.bazel index eb574e69a..d1ae15805 100644 --- a/third-party/bazel/BUILD.syn-2.0.100.bazel +++ b/third-party/bazel/BUILD.syn-2.0.100.bazel @@ -42,7 +42,51 @@ rust_library( "parsing", "printing", "proc-macro", - ], + ] + select({ + "@rules_rust//rust/platform:aarch64-apple-darwin": [ + "visit-mut", # aarch64-apple-darwin + ], + "@rules_rust//rust/platform:aarch64-pc-windows-msvc": [ + "visit-mut", # aarch64-pc-windows-msvc + ], + "@rules_rust//rust/platform:aarch64-unknown-linux-gnu": [ + "visit-mut", # aarch64-unknown-linux-gnu + ], + "@rules_rust//rust/platform:aarch64-unknown-nixos-gnu": [ + "visit-mut", # aarch64-unknown-nixos-gnu + ], + "@rules_rust//rust/platform:arm-unknown-linux-gnueabi": [ + "visit-mut", # arm-unknown-linux-gnueabi + ], + "@rules_rust//rust/platform:i686-pc-windows-msvc": [ + "visit-mut", # i686-pc-windows-msvc + ], + "@rules_rust//rust/platform:i686-unknown-linux-gnu": [ + "visit-mut", # i686-unknown-linux-gnu + ], + "@rules_rust//rust/platform:powerpc-unknown-linux-gnu": [ + "visit-mut", # powerpc-unknown-linux-gnu + ], + "@rules_rust//rust/platform:s390x-unknown-linux-gnu": [ + "visit-mut", # s390x-unknown-linux-gnu + ], + "@rules_rust//rust/platform:x86_64-apple-darwin": [ + "visit-mut", # x86_64-apple-darwin + ], + "@rules_rust//rust/platform:x86_64-pc-windows-msvc": [ + "visit-mut", # x86_64-pc-windows-msvc + ], + "@rules_rust//rust/platform:x86_64-unknown-freebsd": [ + "visit-mut", # x86_64-unknown-freebsd + ], + "@rules_rust//rust/platform:x86_64-unknown-linux-gnu": [ + "visit-mut", # x86_64-unknown-linux-gnu + ], + "@rules_rust//rust/platform:x86_64-unknown-nixos-gnu": [ + "visit-mut", # x86_64-unknown-nixos-gnu + ], + "//conditions:default": [], + }), crate_root = "src/lib.rs", edition = "2021", rustc_env_files = [ diff --git a/third-party/bazel/defs.bzl b/third-party/bazel/defs.bzl index dad491f96..76afbfeec 100644 --- a/third-party/bazel/defs.bzl +++ b/third-party/bazel/defs.bzl @@ -329,6 +329,7 @@ _NORMAL_DEV_ALIASES = { _PROC_MACRO_DEPENDENCIES = { "": { _COMMON_CONDITION: { + "async-trait": Label("@vendor//:async-trait-0.1.88"), "rustversion": Label("@vendor//:rustversion-1.0.20"), }, }, @@ -436,6 +437,16 @@ def crate_repositories(): build_file = Label("//third-party/bazel:BUILD.anstyle-1.0.10.bazel"), ) + maybe( + http_archive, + name = "vendor__async-trait-0.1.88", + sha256 = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5", + type = "tar.gz", + urls = ["https://static.crates.io/crates/async-trait/0.1.88/download"], + strip_prefix = "async-trait-0.1.88", + build_file = Label("//third-party/bazel:BUILD.async-trait-0.1.88.bazel"), + ) + maybe( http_archive, name = "vendor__cc-1.2.20", @@ -877,6 +888,7 @@ def crate_repositories(): ) return [ + struct(repo = "vendor__async-trait-0.1.88", is_dev_dep = False), struct(repo = "vendor__cc-1.2.20", is_dev_dep = False), struct(repo = "vendor__clap-4.5.33", is_dev_dep = False), struct(repo = "vendor__codespan-reporting-0.12.0", is_dev_dep = False), diff --git a/third-party/cargo.bzl b/third-party/cargo.bzl index e4415f83c..059f15b61 100644 --- a/third-party/cargo.bzl +++ b/third-party/cargo.bzl @@ -2,6 +2,7 @@ load("@rules_rust//crate_universe:defs.bzl", "crate") PACKAGES = { + "async-trait": crate.spec(version = "0"), "cc": crate.spec(version = "1"), "clap": crate.spec(default_features = False, features = ["derive", "std", "help"], version = "4"), "codespan-reporting": crate.spec(version = "0"),