Skip to content

Commit 322c8dc

Browse files
rspc-binario prototype
1 parent 6081d0e commit 322c8dc

File tree

12 files changed

+254
-56
lines changed

12 files changed

+254
-56
lines changed

crates/binario/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ edition = "2021"
66
publish = false # TODO: Crate metadata & publish
77

88
[dependencies]
9+
binario = "0.0.2"
10+
futures.workspace = true # TODO: Drop this or drop down to `futures-util`
911
rspc = { path = "../../rspc" }
10-
# rspc-procedure = { path = "../procedure" }
1112
specta = { workspace = true }
13+
tokio = "1.42.0"
1214

1315
# /bin/sh RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features
1416
[package.metadata."docs.rs"]

crates/binario/client.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// TODO: This is not stable, just a demonstration of how it could work.
2+
3+
(async () => {
4+
const resp = await fetch(
5+
"http://localhost:4000/rspc/binario?procedure=binario",
6+
{
7+
method: "POST",
8+
headers: {
9+
"Content-Type": "text/x-binario",
10+
},
11+
// { name: "Oscar" }
12+
body: new Uint8Array([5, 0, 0, 0, 0, 0, 0, 0, 79, 115, 99, 97, 114]),
13+
},
14+
);
15+
16+
console.log(resp.status, await resp.clone().arrayBuffer());
17+
})();

crates/binario/src/lib.rs

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,92 @@
22
#![forbid(unsafe_code)]
33
#![cfg_attr(docsrs, feature(doc_cfg))]
44
#![doc(
5-
html_logo_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png",
6-
html_favicon_url = "https://github.com/oscartbeaumont/rspc/raw/main/docs/public/logo.png"
5+
html_logo_url = "https://github.com/specta-rs/rspc/blob/main/.github/logo.png?raw=true",
6+
html_favicon_url = "https://github.com/specta-rs/rspc/blob/main/.github/logo.png?raw=true"
77
)]
8+
9+
use std::pin::Pin;
10+
11+
use binario::{encode, Decode, Encode};
12+
use futures::{executor::block_on, Stream};
13+
use rspc::{
14+
middleware::Middleware, DynInput, ProcedureError, ProcedureStream, ResolverInput,
15+
ResolverOutput,
16+
};
17+
use specta::{datatype::DataType, Generics, Type, TypeCollection};
18+
use tokio::io::AsyncRead;
19+
20+
enum Repr {
21+
Bytes(Vec<u8>),
22+
Stream(Pin<Box<dyn AsyncRead + Send>>),
23+
}
24+
25+
pub struct BinarioInput(Repr);
26+
27+
impl BinarioInput {
28+
pub fn from_bytes(bytes: Vec<u8>) -> Self {
29+
Self(Repr::Bytes(bytes))
30+
}
31+
32+
pub fn from_stream(stream: impl AsyncRead + Send + 'static) -> Self {
33+
Self(Repr::Stream(Box::pin(stream)))
34+
}
35+
}
36+
37+
pub struct Binario<T>(pub T);
38+
39+
impl<T: Decode + Type + Send + 'static> ResolverInput for Binario<T> {
40+
fn data_type(types: &mut TypeCollection) -> DataType {
41+
T::inline(types, Generics::Definition)
42+
}
43+
44+
fn from_input(input: DynInput) -> Result<Self, ProcedureError> {
45+
let stream: BinarioInput = input.value()?;
46+
47+
// TODO: `block_on` bad
48+
match stream.0 {
49+
Repr::Bytes(bytes) => block_on(binario::decode::<T, _>(bytes.as_slice())),
50+
Repr::Stream(stream) => block_on(binario::decode::<T, _>(stream)),
51+
}
52+
.map_err(|err| panic!("{err:?}")) // TODO: Error handling
53+
.map(Self)
54+
}
55+
}
56+
57+
// TODO: Streaming instead of this
58+
pub struct BinarioOutput(pub Vec<u8>);
59+
60+
impl<TError, T: Encode + Type + Send + Sync + 'static> ResolverOutput<TError> for Binario<T> {
61+
type T = BinarioOutput;
62+
63+
fn data_type(types: &mut TypeCollection) -> DataType {
64+
T::inline(types, Generics::Definition)
65+
}
66+
67+
fn into_stream(self) -> impl Stream<Item = Result<Self::T, ProcedureError>> + Send + 'static {
68+
futures::stream::once(async move {
69+
let mut buf = Vec::new();
70+
encode(&self.0, &mut buf).await.unwrap(); // TODO: Error handling
71+
Ok(BinarioOutput(buf))
72+
})
73+
}
74+
75+
fn into_procedure_stream(
76+
stream: impl Stream<Item = Result<Self::T, ProcedureError>> + Send + 'static,
77+
) -> ProcedureStream {
78+
ProcedureStream::from_stream_value(stream)
79+
}
80+
}
81+
82+
pub fn binario<TError, TCtx, TInput, TResult>(
83+
) -> Middleware<TError, TCtx, Binario<TInput>, Binario<TResult>, TCtx, TInput, TResult>
84+
where
85+
TError: Send + 'static,
86+
TCtx: Send + 'static,
87+
TInput: Decode + Send + 'static,
88+
TResult: Encode + Send + Sync + 'static,
89+
{
90+
Middleware::new(move |ctx: TCtx, input: Binario<TInput>, next| async move {
91+
next.exec(ctx, input.0).await.map(Binario)
92+
})
93+
}

crates/procedure/src/dyn_output.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ enum Repr<'a> {
2121
// TODO: `Debug`, etc traits
2222

2323
impl<'a> DynOutput<'a> {
24-
pub fn new_value<T: Send + 'static>(value: &'a mut Option<T>) -> Self {
24+
// TODO: We depend on the type of `T` can we either force it so this can be public?
25+
pub(crate) fn new_value<T: Send + 'static>(value: &'a mut T) -> Self {
2526
Self {
2627
inner: Repr::Value(value),
2728
type_name: type_name::<T>(),
@@ -48,10 +49,10 @@ impl<'a> DynOutput<'a> {
4849
match self.inner {
4950
Repr::Serialize(_) => None,
5051
Repr::Value(v) => v
51-
.downcast_mut::<Option<Result<_, ProcedureError>>>()?
52+
.downcast_mut::<Option<Result<T, ProcedureError>>>()?
5253
.take()
5354
.expect("unreachable")
54-
.expect("unreachable"),
55+
.ok(),
5556
}
5657
}
5758
}

crates/procedure/src/stream.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -469,25 +469,20 @@ impl ProcedureStream {
469469

470470
/// TODO
471471
// TODO: Should error be `String` type?
472-
pub fn map<F: FnMut(Result<DynOutput, ProcedureError>) -> Result<T, String> + Unpin, T>(
472+
pub fn map<F: FnMut(Result<DynOutput, ProcedureError>) -> Result<T, String>, T>(
473473
self,
474474
map: F,
475475
) -> ProcedureStreamMap<F, T> {
476476
ProcedureStreamMap { stream: self, map }
477477
}
478478
}
479479

480-
pub struct ProcedureStreamMap<
481-
F: FnMut(Result<DynOutput, ProcedureError>) -> Result<T, String> + Unpin,
482-
T,
483-
> {
480+
pub struct ProcedureStreamMap<F: FnMut(Result<DynOutput, ProcedureError>) -> Result<T, String>, T> {
484481
stream: ProcedureStream,
485482
map: F,
486483
}
487484

488-
impl<F: FnMut(Result<DynOutput, ProcedureError>) -> Result<T, String> + Unpin, T>
489-
ProcedureStreamMap<F, T>
490-
{
485+
impl<F: FnMut(Result<DynOutput, ProcedureError>) -> Result<T, String>, T> ProcedureStreamMap<F, T> {
491486
/// Start streaming data.
492487
/// Refer to `Self::require_manual_stream` for more information.
493488
pub fn stream(&mut self) {
@@ -509,6 +504,7 @@ impl<F: FnMut(Result<DynOutput, ProcedureError>) -> Result<T, String> + Unpin, T
509504
}
510505
}
511506

507+
// TODO: Drop `Unpin` requirement
512508
impl<F: FnMut(Result<DynOutput, ProcedureError>) -> Result<T, String> + Unpin, T> Stream
513509
for ProcedureStreamMap<F, T>
514510
{

crates/zer/src/lib.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::{
1616

1717
use cookie::Cookie;
1818
use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, Header, Validation};
19+
use rspc::{ProcedureError, ResolverError};
1920
use serde::{de::DeserializeOwned, ser::SerializeStruct, Serialize};
2021
use specta::Type;
2122

@@ -68,7 +69,7 @@ impl<S: Serialize + DeserializeOwned> Zer<S> {
6869
cookie_name: impl Into<Cow<'static, str>>,
6970
secret: &[u8],
7071
cookie: Option<impl AsRef<[u8]>>,
71-
) -> (Self, ZerResponse) {
72+
) -> Result<(Self, ZerResponse), ProcedureError> {
7273
let mut cookies = vec![];
7374
if let Some(cookie) = cookie {
7475
// TODO: Error handling
@@ -79,7 +80,13 @@ impl<S: Serialize + DeserializeOwned> Zer<S> {
7980

8081
let resp_cookies = ResponseCookie::default();
8182

82-
(
83+
// TODO: Being the `ResolverError` makes this not typesafe. We probally need a separate variant.
84+
// return Err(ProcedureError::Resolver(ResolverError::new(
85+
// "zer says bruh",
86+
// None::<std::io::Error>,
87+
// )));
88+
89+
Ok((
8390
Self {
8491
cookie_name: cookie_name.into(),
8592
key: EncodingKey::from_secret(secret),
@@ -91,7 +98,12 @@ impl<S: Serialize + DeserializeOwned> Zer<S> {
9198
ZerResponse {
9299
cookies: resp_cookies,
93100
},
94-
)
101+
))
102+
}
103+
104+
// TODO: Allow one which errors when accessing any of the methods?
105+
pub fn noop() -> Self {
106+
todo!();
95107
}
96108

97109
pub fn session(&self) -> Result<S, UnauthorizedError> {

examples/axum/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,9 @@ serde_json = "1.0.134"
2121
rspc-http = { version = "0.2.1", path = "../../integrations/http" }
2222
streamunordered = "0.5.4"
2323
rspc-zer = { version = "0.0.0", path = "../../crates/zer" }
24+
serde.workspace = true
25+
binario = "0.0.2"
26+
rspc-binario = { version = "0.0.0", path = "../../crates/binario" }
27+
form_urlencoded = "1.2.1"
28+
axum-extra = { version = "0.9.6", features = ["async-read-body"] }
29+
tokio-util = { version = "0.7.13", features = ["compat"] }

examples/axum/src/main.rs

Lines changed: 78 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use axum::{
55
routing::{get, on, post, MethodFilter, MethodRouter},
66
Json,
77
};
8+
use axum_extra::body::AsyncReadBody;
89
use example_core::{mount, Ctx};
9-
use futures::{stream::FuturesUnordered, Stream, StreamExt};
10+
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryStreamExt};
1011
use rspc::{DynOutput, ProcedureError, ProcedureStream, ProcedureStreamMap, Procedures, State};
12+
use rspc_binario::BinarioOutput;
1113
use rspc_invalidation::Invalidator;
1214
use serde_json::{de::SliceRead, value::RawValue, Value};
1315
use std::{
@@ -18,6 +20,7 @@ use std::{
1820
task::{Context, Poll},
1921
};
2022
use streamunordered::{StreamUnordered, StreamYield};
23+
use tokio_util::compat::FuturesAsyncReadCompatExt;
2124
use tower_http::cors::{Any, CorsLayer};
2225

2326
#[tokio::main]
@@ -112,8 +115,64 @@ async fn main() {
112115

113116
pub fn rspc_handler(procedures: Procedures<Ctx>) -> axum::Router {
114117
let mut r = axum::Router::new();
118+
119+
r = r.route(
120+
"/binario",
121+
// This endpoint lacks batching, SFM's, etc but that's fine.
122+
post({
123+
let procedures = procedures.clone();
124+
125+
move |parts: Parts, body: Body| async move {
126+
let invalidator = rspc_invalidation::Invalidator::default();
127+
let (zer, _zer_response) = rspc_zer::Zer::from_request(
128+
"session",
129+
"some_secret".as_ref(),
130+
parts.headers.get("cookie"),
131+
)
132+
.unwrap(); // TODO: Error handling
133+
let ctx = Ctx {
134+
invalidator: invalidator.clone(),
135+
zer,
136+
};
137+
138+
// if parts.headers.get("Content-Type") != Some(&"text/x-binario".parse().unwrap()) {
139+
// // TODO: Error handling
140+
// }
141+
142+
let mut params = form_urlencoded::parse(parts.uri.query().unwrap_or("").as_bytes());
143+
let procedure_name = params
144+
.find(|(key, _)| key == "procedure")
145+
.map(|(_, value)| value)
146+
.unwrap(); // TODO: Error handling
147+
148+
let procedure = procedures.get(&procedure_name).unwrap(); // TODO: Error handling
149+
150+
let stream = procedure.exec_with_value(
151+
ctx.clone(),
152+
rspc_binario::BinarioInput::from_stream(
153+
body.into_data_stream()
154+
.map_err(|err| todo!()) // TODO: Error handling
155+
.into_async_read()
156+
.compat(),
157+
),
158+
);
159+
let mut headers = HeaderMap::new();
160+
headers.insert(header::CONTENT_TYPE, "text/x-binario".parse().unwrap());
161+
162+
(
163+
headers,
164+
Body::from_stream(stream.map(|v| match v {
165+
Ok(v) => Ok(Ok::<_, Infallible>(
166+
v.as_value::<BinarioOutput>().unwrap().0,
167+
)),
168+
Err(err) => todo!("{err:?}"),
169+
})),
170+
)
171+
}
172+
}),
173+
);
174+
115175
// TODO: Support file upload and download
116-
// TODO: `rspc_zer` how worky?
117176

118177
// for (key, procedure) in procedures.clone() {
119178
// r = r.route(
@@ -202,7 +261,8 @@ pub fn rspc_handler(procedures: Procedures<Ctx>) -> axum::Router {
202261
"session",
203262
"some_secret".as_ref(),
204263
parts.headers.get("cookie"),
205-
);
264+
)
265+
.unwrap(); // TODO: Error handling
206266
let ctx = Ctx {
207267
invalidator: invalidator.clone(),
208268
zer,
@@ -229,27 +289,25 @@ pub fn rspc_handler(procedures: Procedures<Ctx>) -> axum::Router {
229289
while let Some(field) = multipart.next_field().await.unwrap() {
230290
let name = field.name().unwrap().to_string(); // TODO: Error handling
231291

232-
// field.headers()
233-
234-
// TODO: Don't use `serde_json::Value`
235-
let input: Value = match field.content_type() {
236-
// TODO:
237-
// Some("application/json") => {
238-
// // TODO: Error handling
239-
// serde_json::from_slice(field.bytes().await.unwrap().as_ref()).unwrap()
240-
// }
241-
// Some(_) => todo!(),
242-
// None => todo!(),
243-
_ => serde_json::from_slice(field.bytes().await.unwrap().as_ref()).unwrap(),
244-
};
245-
246292
let procedure = procedures.get(&*name).unwrap();
247-
// println!("{:?} {:?} {:?}", name, input, procedure);
248293

294+
// TODO: Error handling
249295
spawn(
250296
&mut runtime,
251-
procedure.exec_with_deserializer(ctx.clone(), input),
252-
);
297+
match field.content_type() {
298+
// Some("text/x-binario") => procedure.exec_with_value(
299+
// ctx.clone(),
300+
// // TODO: Stream decoding is pretty rough with multipart so we omit it for now.
301+
// rspc_binario::BinarioStream(field.bytes().await.unwrap().to_vec()),
302+
// ),
303+
_ => procedure.exec_with_deserializer(
304+
ctx.clone(),
305+
// TODO: Don't use `serde_json::Value`
306+
serde_json::from_slice::<Value>(field.bytes().await.unwrap().as_ref())
307+
.unwrap(),
308+
),
309+
},
310+
)
253311
}
254312

255313
// TODO: Move onto `Prototype`???

0 commit comments

Comments
 (0)