Skip to content

Commit 07ac2f0

Browse files
rpc-binario streaming
1 parent b782394 commit 07ac2f0

File tree

11 files changed

+144
-38
lines changed

11 files changed

+144
-38
lines changed

crates/binario/src/lib.rs

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
11
//! rspc-binario: Binario support for rspc
2+
//!
3+
//! TODO:
4+
//! - Support for streaming the result. Right now we encode into a buffer.
5+
//! - `BinarioDeserializeError` should end up as a `ProcedureError::Deserialize` not `ProcedureError::Resolver`
6+
//! - Binario needs impl for `()` for procedures with no input.
7+
//! - Client integration
8+
//! - Cleanup HTTP endpoint on `example-binario`. Maybe don't use HTTP cause Axum's model doesn't mesh with Binario?
9+
//! - Maybe actix-web example to show portability. Might be interesting with the fact that Binario depends on `tokio::AsyncRead`.
10+
//!
211
#![forbid(unsafe_code)]
312
#![cfg_attr(docsrs, feature(doc_cfg))]
413
#![doc(
@@ -9,7 +18,7 @@
918
use std::{error, fmt, marker::PhantomData, pin::Pin};
1019

1120
use binario::{encode, Decode, Encode};
12-
use futures_util::{stream, Stream};
21+
use futures_util::{stream, Stream, StreamExt};
1322
use rspc::{
1423
middleware::Middleware, DynInput, ProcedureError, ProcedureStream, ResolverInput,
1524
ResolverOutput,
@@ -46,23 +55,65 @@ impl<T: Decode + Type + Send + 'static> ResolverInput for TypedBinarioInput<T> {
4655
}
4756
}
4857

49-
// TODO: Streaming instead of this
58+
// TODO: This should probs be a stream not a buffer.
59+
// Binario currently only supports `impl AsyncRead` not `impl Stream`
5060
pub struct BinarioOutput(pub Vec<u8>);
51-
pub struct TypedBinarioOutput<T>(pub T);
61+
pub struct TypedBinarioOutput<T, M>(pub T, pub PhantomData<fn() -> M>);
5262

53-
impl<TError, T: Encode + Type + Send + Sync + 'static> ResolverOutput<TError>
54-
for TypedBinarioOutput<T>
63+
pub(crate) mod sealed {
64+
use super::*;
65+
66+
pub trait ValidBinarioOutput<M>: Send + 'static {
67+
type T: Encode + Send + Sync + 'static;
68+
fn data_type(types: &mut TypeCollection) -> DataType;
69+
fn into_stream(
70+
self,
71+
) -> impl Stream<Item = Result<Self::T, ProcedureError>> + Send + 'static;
72+
}
73+
pub enum ValueMarker {}
74+
impl<T: Encode + Type + Send + Sync + 'static> ValidBinarioOutput<ValueMarker> for T {
75+
type T = T;
76+
fn data_type(types: &mut TypeCollection) -> DataType {
77+
T::inline(types, Generics::Definition)
78+
}
79+
fn into_stream(
80+
self,
81+
) -> impl Stream<Item = Result<Self::T, ProcedureError>> + Send + 'static {
82+
stream::once(async move { Ok(self) })
83+
}
84+
}
85+
pub enum StreamMarker {}
86+
impl<S: Stream + Send + Sync + 'static> ValidBinarioOutput<StreamMarker> for rspc::Stream<S>
87+
where
88+
S::Item: Encode + Type + Send + Sync + 'static,
89+
{
90+
type T = S::Item;
91+
92+
fn data_type(types: &mut TypeCollection) -> DataType {
93+
S::Item::inline(types, Generics::Definition)
94+
}
95+
fn into_stream(
96+
self,
97+
) -> impl Stream<Item = Result<Self::T, ProcedureError>> + Send + 'static {
98+
self.0.map(|v| Ok(v))
99+
}
100+
}
101+
}
102+
103+
impl<TError, M: 'static, T: sealed::ValidBinarioOutput<M>> ResolverOutput<TError>
104+
for TypedBinarioOutput<T, M>
55105
{
56106
type T = BinarioOutput;
57107

58108
fn data_type(types: &mut TypeCollection) -> DataType {
59-
T::inline(types, Generics::Definition)
109+
T::data_type(types)
60110
}
61111

62112
fn into_stream(self) -> impl Stream<Item = Result<Self::T, ProcedureError>> + Send + 'static {
63-
stream::once(async move {
113+
// TODO: Encoding into a buffer is not how Binario is intended to work but it's how rspc needs it.
114+
self.0.into_stream().then(|v| async move {
64115
let mut buf = Vec::new();
65-
encode(&self.0, &mut buf).await.unwrap(); // TODO: Error handling
116+
encode(&v?, &mut buf).await.unwrap(); // TODO: Error handling
66117
Ok(BinarioOutput(buf))
67118
})
68119
}
@@ -74,11 +125,11 @@ impl<TError, T: Encode + Type + Send + Sync + 'static> ResolverOutput<TError>
74125
}
75126
}
76127

77-
pub fn binario<TError, TCtx, TInput, TResult>() -> Middleware<
128+
pub fn binario<TError, TCtx, TInput, TResult, M>() -> Middleware<
78129
TError,
79130
TCtx,
80131
TypedBinarioInput<TInput>,
81-
TypedBinarioOutput<TResult>,
132+
TypedBinarioOutput<TResult, M>,
82133
TCtx,
83134
TInput,
84135
TResult,
@@ -87,7 +138,7 @@ where
87138
TError: From<DeserializeError> + Send + 'static,
88139
TCtx: Send + 'static,
89140
TInput: Decode + Send + 'static,
90-
TResult: Encode + Send + Sync + 'static,
141+
TResult: sealed::ValidBinarioOutput<M>,
91142
{
92143
Middleware::new(
93144
move |ctx: TCtx, input: TypedBinarioInput<TInput>, next| async move {
@@ -97,7 +148,9 @@ where
97148
}
98149
.map_err(DeserializeError)?;
99150

100-
next.exec(ctx, input).await.map(TypedBinarioOutput)
151+
next.exec(ctx, input)
152+
.await
153+
.map(|v| TypedBinarioOutput(v, PhantomData))
101154
},
102155
)
103156
}

crates/procedure/src/dyn_input.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ enum Repr<'a, 'de> {
2121
}
2222

2323
impl<'a, 'de> DynInput<'a, 'de> {
24+
// TODO: Explain invariant on `Option` + enforce it
2425
pub fn new_value<T: Send + 'static>(value: &'a mut Option<T>) -> Self {
2526
Self {
2627
inner: Repr::Value(value),

examples/binario/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ tower-http = { version = "0.6.2", default-features = false, features = [
1515
] }
1616
futures = "0.3"
1717
form_urlencoded = "1.2.1"
18-
axum-extra = { version = "0.9.6", features = ["async-read-body"] }
1918
tokio-util = { version = "0.7.13", features = ["compat"] }
2019
binario = "0.0.2"
2120
pin-project = "1.1.7"

examples/binario/client.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,26 @@
2121
5, 0, 0, 0, 0, 0, 0, 0, 79, 115, 99, 97, 114,
2222
]);
2323
if (!isEqualBytes(result, expected))
24-
throw new Error("Result doesn't match expected value");
24+
throw new Error(`Result doesn't match expected value. Got ${result}`);
2525

2626
console.log("Success!", result);
27+
28+
const resp2 = await fetch(
29+
"http://localhost:4000/rspc/binario?procedure=streaming",
30+
{
31+
method: "POST",
32+
headers: {
33+
"Content-Type": "text/x-binario",
34+
},
35+
// { name: "Oscar" }
36+
body: new Uint8Array([5, 0, 0, 0, 0, 0, 0, 0, 79, 115, 99, 97, 114]),
37+
},
38+
);
39+
if (!resp2.ok) throw new Error(`Failed to fetch ${resp2.status}`);
40+
if (resp2.headers.get("content-type") !== "text/x-binario")
41+
throw new Error("Invalid content type");
42+
43+
console.log(await resp2.arrayBuffer());
2744
})();
2845

2946
function isEqualBytes(bytes1: Uint8Array, bytes2: Uint8Array): boolean {

examples/binario/src/main.rs

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,29 @@ impl<TErr> BaseProcedure<TErr> {
4242
}
4343
}
4444

45+
#[derive(Debug, Clone, binario::Encode, binario::Decode, Type)]
46+
pub struct Input {
47+
name: String,
48+
}
49+
4550
pub fn mount() -> rspc::Router<()> {
46-
rspc::Router::new().procedure("binario", {
47-
#[derive(Debug, binario::Encode, binario::Decode, Type)]
48-
pub struct Input {
49-
name: String,
50-
}
51-
52-
<BaseProcedure>::builder()
53-
.with(rspc_binario::binario())
54-
.query(|_, input: Input| async move { Ok(input) })
55-
})
51+
rspc::Router::new()
52+
.procedure("binario", {
53+
<BaseProcedure>::builder()
54+
.with(rspc_binario::binario())
55+
.query(|_, input: Input| async move { Ok(input) })
56+
})
57+
.procedure("streaming", {
58+
<BaseProcedure>::builder()
59+
.with(rspc_binario::binario())
60+
.query(|_, input: Input| async move {
61+
Ok(rspc::Stream(futures::stream::iter([
62+
input.clone(),
63+
input.clone(),
64+
input,
65+
])))
66+
})
67+
})
5668
}
5769

5870
#[tokio::main]
@@ -104,21 +116,37 @@ pub fn rspc_binario_handler(procedures: Procedures<()>) -> axum::Router {
104116
ctx.clone(),
105117
rspc_binario::BinarioInput::from_stream(
106118
body.into_data_stream()
107-
.map_err(|err| todo!()) // TODO: Error handling
119+
.map_err(|_err| todo!()) // TODO: Error handling
108120
.into_async_read()
109121
.compat(),
110122
),
111123
);
112124
let mut headers = HeaderMap::new();
113125
headers.insert(header::CONTENT_TYPE, "text/x-binario".parse().unwrap());
114126

127+
let mut first = true;
115128
(
116129
headers,
117-
Body::from_stream(stream.map(|v| match v {
118-
Ok(v) => Ok(Ok::<_, Infallible>(
119-
v.as_value::<BinarioOutput>().unwrap().0,
120-
)),
121-
Err(err) => todo!("{err:?}"),
130+
Body::from_stream(stream.map(move |v| {
131+
let buf = match v {
132+
Ok(v) => Ok(Ok::<_, Infallible>(
133+
v.as_value::<BinarioOutput>().unwrap().0,
134+
)),
135+
Err(err) => todo!("{err:?}"),
136+
};
137+
138+
if first {
139+
first = false;
140+
buf
141+
} else {
142+
buf.map(|v| {
143+
v.map(|mut v| {
144+
let mut buf = vec!['\n' as u8, '\n' as u8];
145+
buf.append(&mut v);
146+
buf
147+
})
148+
})
149+
}
122150
})),
123151
)
124152
}

examples/bindings.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export type Procedures = {
1111
newstuffser: { kind: "query", input: any, output: any, error: any },
1212
sfmPost: { kind: "query", input: any, output: any, error: any },
1313
sfmPostEdit: { kind: "query", input: any, output: any, error: any },
14+
streamInStreamInStreamInStream: { kind: "query", input: any, output: any, error: any },
1415
validator: { kind: "query", input: any, output: any, error: any },
1516
withoutBaseProcedure: { kind: "query", input: any, output: any, error: any },
1617
}

examples/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ async-stream = "0.3.6"
2020
thiserror = "2.0.9"
2121
validator = { version = "0.19.0", features = ["derive"] }
2222
anyhow = "1.0.95"
23+
futures.workspace = true

examples/core/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,14 @@ pub fn mount() -> Router<Ctx> {
197197
.procedure("me", {
198198
<BaseProcedure>::builder().query(|ctx, _: ()| async move { Ok(ctx.zer.session()?) })
199199
})
200+
.procedure("streamInStreamInStreamInStream", {
201+
// You would never actually do this but it's just checking how the system behaves
202+
<BaseProcedure>::builder().query(|_, _: ()| async move {
203+
Ok(rspc::Stream(rspc::Stream(rspc::Stream(
204+
futures::stream::once(async move { Ok(42) }),
205+
))))
206+
})
207+
})
200208

201209
// .procedure("fileupload", {
202210
// <BaseProcedure>::builder().query(|_, _: File| async { Ok(env!("CARGO_PKG_VERSION")) })

rspc/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//!
77
//! Checkout the official docs at <https://rspc.dev>. This documentation is generally written **for authors of middleware and adapter**.
88
//!
9-
#![forbid(unsafe_code)]
9+
// #![forbid(unsafe_code)] // TODO
1010
#![cfg_attr(docsrs, feature(doc_cfg))]
1111
#![doc(
1212
html_logo_url = "https://github.com/specta-rs/rspc/raw/main/.github/logo.png",

rspc/src/procedure/resolver_output.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@ where
8181
TErr: Error,
8282
S: Stream<Item = Result<T, TErr>> + Send + 'static,
8383
T: ResolverOutput<TErr>,
84-
// Should prevent nesting `Stream`s
85-
T::T: Serialize + Send + Sync + 'static,
8684
{
8785
type T = T::T;
8886

@@ -100,6 +98,6 @@ where
10098
fn into_procedure_stream(
10199
stream: impl Stream<Item = Result<Self::T, ProcedureError>> + Send + 'static,
102100
) -> ProcedureStream {
103-
ProcedureStream::from_stream(stream)
101+
T::into_procedure_stream(stream)
104102
}
105103
}

0 commit comments

Comments
 (0)