Skip to content

Commit 73b9de4

Browse files
committed
rust wrappers
1 parent 6ff6b51 commit 73b9de4

File tree

1 file changed

+208
-7
lines changed

1 file changed

+208
-7
lines changed

kj-rs/io/lib.rs

Lines changed: 208 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -461,16 +461,105 @@ impl<T: AsyncOutputStream + Unpin> futures::io::AsyncWrite for AsyncWriteAdapter
461461
#[allow(clippy::needless_lifetimes)]
462462
pub mod ffi {
463463

464+
// Rust opaque types that can be used from C++
465+
extern "Rust" {
466+
/// Opaque Rust type implementing AsyncInputStream trait
467+
type RustAsyncInputStream;
468+
469+
/// Opaque Rust type implementing AsyncOutputStream trait
470+
type RustAsyncOutputStream;
471+
472+
/// Opaque Rust type implementing AsyncIoStream trait
473+
type RustAsyncIoStream;
474+
475+
// RustAsyncInputStream methods
476+
async unsafe fn try_read<'a>(
477+
self: &'a mut RustAsyncInputStream,
478+
buffer: &'a mut [u8],
479+
min_bytes: usize,
480+
) -> Result<usize>;
481+
482+
fn try_get_length(self: &RustAsyncInputStream) -> u64; // Return 0 if unknown
483+
484+
async unsafe fn pump_to<'a>(
485+
self: &'a mut RustAsyncInputStream,
486+
output: &'a mut RustAsyncOutputStream,
487+
amount: u64,
488+
) -> Result<u64>;
489+
490+
// RustAsyncOutputStream methods
491+
async unsafe fn write<'a>(
492+
self: &'a mut RustAsyncOutputStream,
493+
buffer: &'a [u8]
494+
) -> Result<()>;
495+
496+
async unsafe fn write_vectored<'a>(
497+
self: &'a mut RustAsyncOutputStream,
498+
pieces: &'a [&'a [u8]],
499+
) -> Result<()>;
500+
501+
async unsafe fn try_pump_from<'a>(
502+
self: &'a mut RustAsyncOutputStream,
503+
input: &'a mut RustAsyncInputStream,
504+
amount: u64,
505+
) -> Result<u64>; // Returns 0 if not supported
506+
507+
async unsafe fn when_write_disconnected<'a>(
508+
self: &'a mut RustAsyncOutputStream
509+
) -> Result<()>;
510+
511+
// RustAsyncIoStream methods - inherited from AsyncInputStream
512+
async unsafe fn try_read<'a>(
513+
self: &'a mut RustAsyncIoStream,
514+
buffer: &'a mut [u8],
515+
min_bytes: usize,
516+
) -> Result<usize>;
517+
518+
fn try_get_length(self: &RustAsyncIoStream) -> u64;
519+
520+
async unsafe fn pump_to<'a>(
521+
self: &'a mut RustAsyncIoStream,
522+
output: &'a mut RustAsyncOutputStream,
523+
amount: u64,
524+
) -> Result<u64>;
525+
526+
// RustAsyncIoStream methods - inherited from AsyncOutputStream
527+
async unsafe fn write<'a>(
528+
self: &'a mut RustAsyncIoStream,
529+
buffer: &'a [u8]
530+
) -> Result<()>;
531+
532+
async unsafe fn write_vectored<'a>(
533+
self: &'a mut RustAsyncIoStream,
534+
pieces: &'a [&'a [u8]],
535+
) -> Result<()>;
536+
537+
async unsafe fn try_pump_from<'a>(
538+
self: &'a mut RustAsyncIoStream,
539+
input: &'a mut RustAsyncInputStream,
540+
amount: u64,
541+
) -> Result<u64>;
542+
543+
async unsafe fn when_write_disconnected<'a>(
544+
self: &'a mut RustAsyncIoStream
545+
) -> Result<()>;
546+
547+
// RustAsyncIoStream methods - specific to IoStream
548+
async unsafe fn shutdown_write<'a>(self: &'a mut RustAsyncIoStream) -> Result<()>;
549+
550+
fn abort_read(self: &mut RustAsyncIoStream);
551+
}
552+
464553
unsafe extern "C++" {
465554
include!("kj-rs/io/bridge.h");
466555

467-
/// Opaque C++ type representing CxxAsyncInputStream
556+
/// Opaque C++ type representing kj::AsyncInputStream
468557
type CxxAsyncInputStream;
469558

470-
/// Opaque C++ type representing CxxAsyncOutputStream
559+
/// Opaque C++ type representing kj::AsyncOutputStream
471560
type CxxAsyncOutputStream;
472561

473-
/// Opaque C++ type representing CxxAsyncIoStream
562+
/// Opaque C++ type representing kj::AsyncIoStream
474563
type CxxAsyncIoStream;
475564

476565
// CxxAsyncInputStream methods
@@ -544,12 +633,8 @@ pub mod ffi {
544633

545634
fn abort_read(self: Pin<&mut CxxAsyncIoStream>);
546635
}
547-
548-
impl UniquePtr<CxxAsyncInputStream> {}
549636
}
550637

551-
// Rust wrapper types that implement the AsyncInputStream, AsyncOutputStream, and AsyncIoStream traits
552-
553638
/// Helper function to convert `cxx::Exception` to `std::io::Error`
554639
#[must_use]
555640
#[allow(clippy::needless_pass_by_value)]
@@ -725,3 +810,119 @@ impl<'a> AsyncIoStream for CxxAsyncIoStream<'a> {
725810
self.inner.as_mut().abort_read();
726811
}
727812
}
813+
814+
// Rust opaque types for use from C++
815+
816+
/// Opaque Rust type that can hold any AsyncInputStream implementation
817+
pub struct RustAsyncInputStream {
818+
inner: Box<dyn AsyncInputStream + Send>,
819+
}
820+
821+
impl RustAsyncInputStream {
822+
pub fn new<T: AsyncInputStream + Send + 'static>(stream: T) -> Self {
823+
Self {
824+
inner: Box::new(stream),
825+
}
826+
}
827+
828+
// FFI method implementations
829+
pub async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result<usize> {
830+
self.inner.try_read(buffer, min_bytes).await
831+
}
832+
833+
pub fn try_get_length(&self) -> u64 {
834+
self.inner.try_get_length().unwrap_or(0)
835+
}
836+
837+
pub async fn pump_to(&mut self, output: &mut RustAsyncOutputStream, amount: u64) -> Result<u64> {
838+
self.inner.pump_to(&mut *output.inner, amount).await
839+
}
840+
}
841+
842+
/// Opaque Rust type that can hold any AsyncOutputStream implementation
843+
pub struct RustAsyncOutputStream {
844+
inner: Box<dyn AsyncOutputStream + Send>,
845+
}
846+
847+
impl RustAsyncOutputStream {
848+
pub fn new<T: AsyncOutputStream + Send + 'static>(stream: T) -> Self {
849+
Self {
850+
inner: Box::new(stream),
851+
}
852+
}
853+
854+
// FFI method implementations
855+
pub async fn write(&mut self, buffer: &[u8]) -> Result<()> {
856+
self.inner.write(buffer).await
857+
}
858+
859+
pub async fn write_vectored(&mut self, pieces: &[&[u8]]) -> Result<()> {
860+
self.inner.write_vectored(pieces).await
861+
}
862+
863+
pub async fn try_pump_from(&mut self, input: &mut RustAsyncInputStream, amount: u64) -> Result<u64> {
864+
match self.inner.try_pump_from(&mut *input.inner, amount).await {
865+
Some(result) => result,
866+
None => Ok(0), // Return 0 to indicate no optimization available
867+
}
868+
}
869+
870+
pub async fn when_write_disconnected(&mut self) -> Result<()> {
871+
self.inner.when_write_disconnected().await
872+
}
873+
}
874+
875+
/// Opaque Rust type that can hold any AsyncIoStream implementation
876+
pub struct RustAsyncIoStream {
877+
inner: Box<dyn AsyncIoStream + Send>,
878+
}
879+
880+
impl RustAsyncIoStream {
881+
pub fn new<T: AsyncIoStream + Send + 'static>(stream: T) -> Self {
882+
Self {
883+
inner: Box::new(stream),
884+
}
885+
}
886+
887+
// FFI method implementations - AsyncInputStream part
888+
pub async fn try_read(&mut self, buffer: &mut [u8], min_bytes: usize) -> Result<usize> {
889+
self.inner.try_read(buffer, min_bytes).await
890+
}
891+
892+
pub fn try_get_length(&self) -> u64 {
893+
self.inner.try_get_length().unwrap_or(0)
894+
}
895+
896+
pub async fn pump_to(&mut self, output: &mut RustAsyncOutputStream, amount: u64) -> Result<u64> {
897+
self.inner.pump_to(&mut *output.inner, amount).await
898+
}
899+
900+
// FFI method implementations - AsyncOutputStream part
901+
pub async fn write(&mut self, buffer: &[u8]) -> Result<()> {
902+
self.inner.write(buffer).await
903+
}
904+
905+
pub async fn write_vectored(&mut self, pieces: &[&[u8]]) -> Result<()> {
906+
self.inner.write_vectored(pieces).await
907+
}
908+
909+
pub async fn try_pump_from(&mut self, input: &mut RustAsyncInputStream, amount: u64) -> Result<u64> {
910+
match self.inner.try_pump_from(&mut *input.inner, amount).await {
911+
Some(result) => result,
912+
None => Ok(0), // Return 0 to indicate no optimization available
913+
}
914+
}
915+
916+
pub async fn when_write_disconnected(&mut self) -> Result<()> {
917+
self.inner.when_write_disconnected().await
918+
}
919+
920+
// FFI method implementations - AsyncIoStream specific
921+
pub async fn shutdown_write(&mut self) -> Result<()> {
922+
self.inner.shutdown_write().await
923+
}
924+
925+
pub fn abort_read(&mut self) {
926+
self.inner.abort_read();
927+
}
928+
}

0 commit comments

Comments
 (0)