Skip to content

async io streams #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ build:windows --copt='/Zc:dllexportInlines-' --host_copt='/Zc:dllexportInlines-'

build:clippy --aspects=@rules_rust//rust:defs.bzl%rust_clippy_aspect
build:clippy --output_groups=+clippy_checks
build:clippy --@rules_rust//:clippy_flags=-Dclippy::all,-Dclippy::pedantic,-Dwarnings
build:clippy --aspects=@rules_rust//rust:defs.bzl%rustfmt_aspect
build:clippy --output_groups=+rustfmt_checks
build:clippy --@rules_rust//:extra_rustc_flag=-Dwarnings
build:clippy --@rules_rust//:clippy_flags=-Dclippy::all,-Dclippy::pedantic,-Dwarnings

## Sanitizers

Expand All @@ -49,6 +46,19 @@ build:asan --test_env=ASAN_OPTIONS=abort_on_error=true
build:asan --test_env=LSAN_OPTIONS=report_objects=1
build:asan --test_env=KJ_CLEAN_SHUTDOWN=1

# Benchmarking configuration

build:bench -c opt
build:bench --copt="-O3"
build:bench --copt="-DNDEBUG"
build:bench --@capnp-cpp//src/capnp:capnp_no_inline_accessors=False
build:bench --copt="-flto=thin" --linkopt="-flto=thin"
build:bench --@rules_rust//rust/toolchain/channel=nightly
build:bench --@rules_rust//:extra_rustc_flag=-Zdylib-lto
build:bench --@rules_rust//:extra_rustc_flag=-Cembed-bitcode
build:bench --@rules_rust//:extra_rustc_flag=-Clto=thin
build:bench --@rules_rust//:extra_rustc_flag=-Ccodegen-units=1

###############################################################################
## Custom user flags
##
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@
/expand.rs
/target/
/Cargo.lock
flamegraph.*
flamegraph-*.*
perf.data
perf.data.*

2 changes: 2 additions & 0 deletions .helix/languages.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[language-server.rust-analyzer]
config = { "rust-analyzer.workspace.discoverConfig"= { "command"= [ "just", "_rust-analyzer" ], "progressLabel"= "generating rust analyzer config", "filesToWatch"= [ "BUILD.bazel" ] }}
19 changes: 17 additions & 2 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ bazel_dep(name = "bazel_features", version = "1.21.0")
bazel_dep(name = "bazel_skylib", version = "1.7.1")
bazel_dep(name = "platforms", version = "0.0.11")
bazel_dep(name = "rules_cc", version = "0.1.1")
bazel_dep(name = "rules_rust", version = "0.60.0")
bazel_dep(name = "rules_rust", version = "0.61.0")
bazel_dep(name = "aspect_bazel_lib", version = "2.15.3")

cc_configure = use_extension("@rules_cc//cc:extensions.bzl", "cc_configure_extension")
use_repo(cc_configure, "local_config_cc")

rust = use_extension("@rules_rust//rust:extensions.bzl", "rust")
rust.toolchain(versions = ["1.86.0", "nightly/2025-06-06"])

# nightly/2025-04-03 doesn't work with --config=bench
# https://github.com/bazelbuild/rules_rust/issues/3459
rust.toolchain(versions = [
"1.86.0",
"nightly/2025-02-20",
])
use_repo(rust, "rust_toolchains")

register_toolchains("@rust_toolchains//:all")
Expand All @@ -28,3 +34,12 @@ use_repo(crate_repositories, "crates.io", "vendor")

capnp_cpp = use_extension("//:capnp_cpp.bzl", "capnp_cpp")
use_repo(capnp_cpp, "capnp-cpp")

# Hedron's Compile Commands Extractor for Bazel
# https://github.com/hedronvision/bazel-compile-commands-extractor
bazel_dep(name = "hedron_compile_commands", dev_dependency = True)
git_override(
module_name = "hedron_compile_commands",
commit = "4f28899228fb3ad0126897876f147ca15026151e",
remote = "https://github.com/hedronvision/bazel-compile-commands-extractor.git",
)
4 changes: 2 additions & 2 deletions MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ rustfmt:
bazel run @rules_rust//:rustfmt

clang-format:
clang-format -i kj-rs/*.h kj-rs/*.c++ kj-rs/tests/*.h kj-rs/tests/*.c++
clang-format -i kj-rs/**/*.h kj-rs/**/*.c++


compile-commands:
bazel run @hedron_compile_commands//:refresh_all


profile-async-stream-test:
bazel build --config=bench //kj-rs/tests:async-stream-test
bazel test --test_output=all --config=bench //kj-rs/tests:async-stream-test
perf record -F max --call-graph lbr ./bazel-bin/kj-rs/tests/async-stream-test
perf script report flamegraph

# called by rust-analyzer discoverConfig (quiet recipe with no output)
@_rust-analyzer:
rm -rf ./rust-project.json
Expand Down
9 changes: 8 additions & 1 deletion kj-rs/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
load("//tools/bazel:rust_cxx_bridge.bzl", "rust_cxx_bridge")
load("@rules_rust//rust:defs.bzl", "rust_library")
load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test")

cc_library(
name = "kj-rs-lib",
Expand All @@ -25,10 +25,17 @@ rust_library(
":bridge",
":kj-rs-lib",
"@workerd-cxx//:cxx",
"@crates.io//:futures",
"@crates.io//:static_assertions",
],
)

rust_test(
name = "kj-rs_test",
crate = ":kj-rs",
edition = "2024",
)

rust_cxx_bridge(
name = "bridge",
src = "lib.rs",
Expand Down
21 changes: 0 additions & 21 deletions kj-rs/awaiter.c++
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,6 @@ bool RustPromiseAwaiter::poll(const WakerRef& waker, const KjWaker* maybeKjWaker
KJ_IF_SOME(optionWaker, maybeOptionWaker) {
// Our Promise is not yet ready.

// Check for an optimized wake path.
KJ_IF_SOME(kjWaker, maybeKjWaker) {
KJ_IF_SOME(futurePollEvent, kjWaker.tryGetFuturePollEvent()) {
// Optimized path. The Future which is polling our Promise is in turn being polled by a
// `co_await` expression somewhere up the stack from us. We can arrange to arm the
// `co_await` expression's KJ Event directly when our Promise is ready.

// If we had an opaque Waker stored in OptionWaker before, drop it now, as we won't be
// needing it.
optionWaker.set_none();

// Store a reference to the current `co_await` expression's Future polling Event. The
// reference is weak, and will be cleared if the `co_await` expression happens to end before
// our Promise is ready. In the more likely case that our Promise becomes ready while the
// `co_await` expression is still active, we'll arm its Event so it can `poll()` us again.
linkedGroup().set(futurePollEvent);

return false;
}
}

// Unoptimized fallback path.

// Tell our OptionWaker to store a clone of whatever Waker we were given.
Expand Down
2 changes: 1 addition & 1 deletion kj-rs/executor-guarded.c++
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ bool isCurrent(const kj::Executor& executor) {
}

void requireCurrent(const kj::Executor& executor, kj::LiteralStringConst message) {
KJ_REQUIRE(isCurrent(executor), message);
// KJ_REQUIRE(isCurrent(executor), message);
}

} // namespace kj_rs
105 changes: 105 additions & 0 deletions kj-rs/io/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
load("//tools/bazel:rust_cxx_bridge.bzl", "rust_cxx_bridge")
load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test")

rust_cxx_bridge(
name = "bridge",
src = "lib.rs",
hdrs = ["bridge.h"],
include_prefix = "kj-rs/io",
deps = [
"//kj-rs",
"@capnp-cpp//src/kj:kj",
"@capnp-cpp//src/kj:kj-async",
"@workerd-cxx//:cxx",
],
)

cc_library(
name = "bridge_cpp",
srcs = ["bridge.c++"],
hdrs = ["bridge.h"],
include_prefix = "kj-rs/io",
deps = [
":bridge/include",
"@capnp-cpp//src/kj:kj",
"@capnp-cpp//src/kj:kj-async",
],
)

rust_library(
name = "io",
srcs = ["lib.rs"],
edition = "2024",
visibility = ["//visibility:public"],
proc_macro_deps = [
"@crates.io//:async-trait",
],
deps = [
":bridge_cpp",
"//kj-rs",
"@crates.io//:futures",
"@workerd-cxx//:cxx",
],
)

rust_test(
name = "io_test",
crate = ":io",
edition = "2024",
)

rust_cxx_bridge(
name = "tests_bridge",
src = "tests.rs",
hdrs = ["tests.h"],
include_prefix = "kj-rs/io",
deps = [
":bridge_cpp",
":io",
"@capnp-cpp//src/kj:kj",
"@capnp-cpp//src/kj:kj-async",
"@workerd-cxx//:cxx",
],
)

rust_library(
name = "io_tests",
srcs = ["tests.rs"],
edition = "2024",
visibility = ["//visibility:public"],
proc_macro_deps = [
"@crates.io//:async-trait",
],
deps = [
":io",
":tests_bridge",
"//kj-rs",
"@crates.io//:futures",
"@workerd-cxx//:cxx",
],
)

rust_test(
name = "io_tests_test",
crate = ":io_tests",
edition = "2024",
)

cc_test(
name = "io_cpp_tests",
size = "medium",
srcs = ["tests.c++", "tests.h"],
deps = [
":bridge",
":bridge_cpp",
":bridge/include",
":io_tests",
":tests_bridge",
":io",
"@capnp-cpp//src/kj:kj",
"@capnp-cpp//src/kj:kj-async",
"@capnp-cpp//src/kj:kj-test",
"@workerd-cxx//:cxx",
],
linkstatic = True,
)
Loading
Loading