Skip to content

Commit 1b116da

Browse files
committed
working on async streams
1 parent 77fcc4d commit 1b116da

File tree

12 files changed

+249
-9
lines changed

12 files changed

+249
-9
lines changed

.bazelrc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,19 @@ build:asan --test_env=ASAN_OPTIONS=abort_on_error=true
4646
build:asan --test_env=LSAN_OPTIONS=report_objects=1
4747
build:asan --test_env=KJ_CLEAN_SHUTDOWN=1
4848

49+
# Benchmarking configuration
50+
51+
build:bench -c opt
52+
build:bench --copt="-O3"
53+
build:bench --copt="-DNDEBUG"
54+
build:bench --@capnp-cpp//src/capnp:capnp_no_inline_accessors=False
55+
build:bench --copt="-flto=thin" --linkopt="-flto=thin"
56+
build:bench --@rules_rust//rust/toolchain/channel=nightly
57+
build:bench --@rules_rust//:extra_rustc_flag=-Zdylib-lto
58+
build:bench --@rules_rust//:extra_rustc_flag=-Cembed-bitcode
59+
build:bench --@rules_rust//:extra_rustc_flag=-Clto=thin
60+
build:bench --@rules_rust//:extra_rustc_flag=-Ccodegen-units=1
61+
4962
###############################################################################
5063
## Custom user flags
5164
##

MODULE.bazel

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@ bazel_dep(name = "bazel_features", version = "1.21.0")
99
bazel_dep(name = "bazel_skylib", version = "1.7.1")
1010
bazel_dep(name = "platforms", version = "0.0.11")
1111
bazel_dep(name = "rules_cc", version = "0.1.1")
12-
bazel_dep(name = "rules_rust", version = "0.60.0")
12+
bazel_dep(name = "rules_rust", version = "0.61.0")
1313
bazel_dep(name = "aspect_bazel_lib", version = "2.15.3")
1414

1515
cc_configure = use_extension("@rules_cc//cc:extensions.bzl", "cc_configure_extension")
1616
use_repo(cc_configure, "local_config_cc")
1717

1818
rust = use_extension("@rules_rust//rust:extensions.bzl", "rust")
19-
rust.toolchain(versions = ["1.86.0", "nightly/2025-06-06"])
19+
# nightly/2025-04-03 doesn't work with --config=bench
20+
# https://github.com/bazelbuild/rules_rust/issues/3459
21+
rust.toolchain(versions = ["1.86.0", "nightly/2025-02-20"])
2022
use_repo(rust, "rust_toolchains")
2123

2224
register_toolchains("@rust_toolchains//:all")
@@ -28,3 +30,12 @@ use_repo(crate_repositories, "crates.io", "vendor")
2830

2931
capnp_cpp = use_extension("//:capnp_cpp.bzl", "capnp_cpp")
3032
use_repo(capnp_cpp, "capnp-cpp")
33+
34+
# Hedron's Compile Commands Extractor for Bazel
35+
# https://github.com/hedronvision/bazel-compile-commands-extractor
36+
bazel_dep(name = "hedron_compile_commands", dev_dependency = True)
37+
git_override(
38+
module_name = "hedron_compile_commands",
39+
commit = "4f28899228fb3ad0126897876f147ca15026151e",
40+
remote = "https://github.com/hedronvision/bazel-compile-commands-extractor.git",
41+
)

MODULE.bazel.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

justfile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ clang-format:
3131

3232
compile-commands:
3333
bazel run @hedron_compile_commands//:refresh_all
34-
34+
35+
profile-async-stream-test:
36+
bazel build --config=bench //kj-rs/tests:async-stream-test && perf record -F max --call-graph lbr ./bazel-bin/kj-rs/tests/async-stream-test
37+
perf script report flamegraph
38+
3539
# called by rust-analyzer discoverConfig (quiet recipe with no output)
3640
@_rust-analyzer:
3741
rm -rf ./rust-project.json

kj-rs/executor-guarded.c++

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ bool isCurrent(const kj::Executor& executor) {
99
}
1010

1111
void requireCurrent(const kj::Executor& executor, kj::LiteralStringConst message) {
12-
KJ_REQUIRE(isCurrent(executor), message);
12+
// KJ_REQUIRE(isCurrent(executor), message);
1313
}
1414

1515
} // namespace kj_rs

kj-rs/tests/BUILD.bazel

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ rust_library(
2121
# TODO(now): Why isn't :cxx transitive?
2222
"@workerd-cxx//:cxx",
2323
"//kj-rs",
24+
"@crates.io//:futures",
2425
],
2526
)
2627

@@ -85,3 +86,18 @@ cc_test(
8586
"@capnp-cpp//src/kj:kj-test",
8687
],
8788
)
89+
90+
cc_test(
91+
name = "async-stream-test",
92+
size = "medium",
93+
srcs = [
94+
"async-stream-test.c++",
95+
],
96+
deps = [
97+
":awaitables-rust",
98+
":bridge",
99+
":test-promises",
100+
"@capnp-cpp//src/kj:kj-test",
101+
],
102+
linkstatic = True,
103+
)

kj-rs/tests/async-stream-test.c++

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#include <kj/async-io.h>
2+
#include <kj/async.h>
3+
#include <kj/common.h>
4+
#include <kj/test.h>
5+
6+
#include <algorithm>
7+
#include <cstring>
8+
9+
#include "kj-rs-demo/lib.rs.h"
10+
11+
class ZeroInputStream : public kj::AsyncInputStream {
12+
public:
13+
ZeroInputStream(size_t len) : rem(len) {}
14+
virtual ~ZeroInputStream() = default;
15+
16+
kj::Promise<size_t> read(void *buffer, size_t minBytes,
17+
size_t maxBytes) override {
18+
auto n = std::min(maxBytes, rem);
19+
memset(buffer, 0, n);
20+
rem -= n;
21+
return n;
22+
}
23+
24+
kj::Promise<size_t> tryRead(void *buffer, size_t minBytes,
25+
size_t maxBytes) override {
26+
return read(buffer, minBytes, maxBytes);
27+
}
28+
29+
private:
30+
size_t rem;
31+
};
32+
33+
template<typename Impl>
34+
class RustAsyncInputStream final: public kj::AsyncInputStream {
35+
public:
36+
RustAsyncInputStream(::rust::Box<Impl> impl) : impl(kj::mv(impl)) {}
37+
virtual ~RustAsyncInputStream() = default;
38+
39+
kj::Promise<size_t> tryRead(void *buffer, size_t minBytes, size_t maxBytes) override {
40+
auto slice = ::rust::Slice(static_cast<uint8_t*>(buffer), maxBytes);
41+
return impl->try_read(slice, maxBytes);
42+
}
43+
44+
private:
45+
::rust::Box<Impl> impl;
46+
};
47+
48+
template<size_t bufferSize>
49+
size_t readFullStream(kj::AsyncInputStream& stream, size_t expectedLen) {
50+
kj::EventLoop loop;
51+
kj::Maybe<kj::Exception> maybeException;
52+
kj::WaitScope waitScope(loop);
53+
54+
return [&]() -> kj::Promise<size_t> {
55+
auto buffer = kj::heapArray<kj::byte>(bufferSize);
56+
size_t len = 0;
57+
58+
while (true) {
59+
size_t n = co_await stream.tryRead(buffer.begin(), bufferSize, bufferSize);
60+
if (n == 0) {
61+
break;
62+
}
63+
len += n;
64+
}
65+
co_return len;
66+
}().wait(waitScope);
67+
}
68+
69+
KJ_TEST("C++ ZeroInputStream") {
70+
constexpr auto size = 1024;
71+
ZeroInputStream stream(size);
72+
KJ_ASSERT(readFullStream<127>(stream, size) == size);
73+
}
74+
75+
KJ_TEST("Rust ZeroInputStream") {
76+
constexpr auto size = 1024;
77+
RustAsyncInputStream stream(kj_rs_demo::new_zero_stream(size));
78+
KJ_ASSERT(readFullStream<127>(stream, size) == size);
79+
}
80+
81+
#ifdef NDEBUG
82+
// ~1sec
83+
constexpr auto benchmarkSize = 1024 * 1024 * 1024 * 10l;
84+
#else
85+
constexpr auto benchmarkSize = 1024 * 1024;
86+
#endif
87+
88+
KJ_TEST("Benchmark C++ ZeroInputStream") {
89+
ZeroInputStream stream(benchmarkSize);
90+
KJ_ASSERT(readFullStream<1024 + 1>(stream, benchmarkSize) == benchmarkSize);
91+
}
92+
93+
KJ_TEST("Benchmark Rust ZeroInputStream") {
94+
RustAsyncInputStream stream(kj_rs_demo::new_zero_stream(benchmarkSize));
95+
KJ_ASSERT(readFullStream<1024 + 1>(stream, benchmarkSize) == benchmarkSize);
96+
}

kj-rs/tests/async_stream.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
type Result<T> = std::io::Result<T>;
2+
3+
/// Async stream of zeros of a given size
4+
pub struct ZeroStream(usize);
5+
6+
impl ZeroStream {
7+
pub async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result<usize> {
8+
let mut n = 0;
9+
while n < min_bytes {
10+
let k = futures::AsyncReadExt::read(self, &mut buffer[n..]).await?;
11+
if k == 0 {
12+
break;
13+
}
14+
n += k;
15+
}
16+
Ok(n)
17+
}
18+
}
19+
20+
pub fn new_zero_stream(size: usize) -> Box<ZeroStream> {
21+
Box::new(ZeroStream(size))
22+
}
23+
24+
impl futures::io::AsyncRead for ZeroStream {
25+
fn poll_read(
26+
self: std::pin::Pin<&mut Self>,
27+
_cx: &mut std::task::Context<'_>,
28+
buf: &mut [u8],
29+
) -> std::task::Poll<std::io::Result<usize>> {
30+
let this = self.get_mut();
31+
let n = std::cmp::min(this.0, buf.len());
32+
this.0 -= n;
33+
buf[..n].fill(0);
34+
std::task::Poll::Ready(Ok(n))
35+
}
36+
}
37+
38+
39+
#[cfg(test)]
40+
mod tests {
41+
use futures::executor::LocalPool;
42+
43+
use super::*;
44+
45+
pub fn run_local<Fut>(future: Fut) -> Fut::Output
46+
where
47+
Fut: Future,
48+
{
49+
let mut pool = LocalPool::new();
50+
pool.run_until(future)
51+
}
52+
53+
#[test]
54+
fn read_shorter_than_length() {
55+
let mut stream = ZeroStream(5);
56+
let mut buffer = [0; 3];
57+
assert_eq!(run_local(stream.try_read(&mut buffer, 3)).unwrap(), 3);
58+
assert_eq!(stream.0, 2);
59+
}
60+
61+
#[test]
62+
fn read_longer_than_length() {
63+
let mut stream = ZeroStream(3);
64+
let mut buffer = [0; 5];
65+
assert_eq!(run_local(stream.try_read(&mut buffer, 5)).unwrap(), 3);
66+
assert_eq!(stream.0, 0);
67+
}
68+
69+
#[test]
70+
fn try_read_shorter_than_length() {
71+
let mut stream = ZeroStream(5);
72+
let mut buffer = [0; 3];
73+
assert_eq!(run_local(stream.try_read(&mut buffer, 3)).unwrap(), 3);
74+
assert_eq!(stream.0, 2);
75+
}
76+
77+
#[test]
78+
fn try_read_longer_than_length() {
79+
let mut stream = ZeroStream(3);
80+
let mut buffer = [0; 5];
81+
assert_eq!(run_local(stream.try_read(&mut buffer, 5)).unwrap(), 3);
82+
assert_eq!(stream.0, 0);
83+
}
84+
}

kj-rs/tests/awaitables-cc-test.c++

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "kj-rs/waker.h"
88

99
#include <kj/test.h>
10+
#include "kj-rs-demo/lib.rs.h"
1011

1112
namespace kj_rs_demo {
1213
namespace {

kj-rs/tests/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#![allow(clippy::missing_errors_doc)]
33
#![allow(clippy::unused_async)]
44

5+
mod async_stream;
56
mod test_futures;
67

78
use test_futures::{
@@ -14,6 +15,8 @@ use test_futures::{
1415
type Result<T> = std::io::Result<T>;
1516
type Error = std::io::Error;
1617

18+
use async_stream::{ZeroStream, new_zero_stream};
19+
1720
#[cxx::bridge(namespace = "kj_rs_demo")]
1821
mod ffi {
1922
struct Shared {
@@ -73,6 +76,19 @@ mod ffi {
7376
async unsafe fn lifetime_arg_void<'a>(buf: &'a [u8]);
7477
async unsafe fn lifetime_arg_result<'a>(buf: &'a [u8]) -> Result<()>;
7578
}
79+
80+
// see `async_stream`
81+
extern "Rust" {
82+
type ZeroStream;
83+
84+
fn new_zero_stream(size: usize) -> Box<ZeroStream>;
85+
86+
async unsafe fn try_read<'a>(
87+
self: &'a mut ZeroStream,
88+
buffer: &'a mut [u8],
89+
min_bytes: usize,
90+
) -> Result<usize>;
91+
}
7692
}
7793

7894
pub async fn lifetime_arg_void<'a>(_buf: &'a [u8]) {}

0 commit comments

Comments
 (0)