Skip to content

Commit b782394

Browse files
example-binario + remove block_on decoding input
1 parent 322c8dc commit b782394

File tree

18 files changed

+274
-212
lines changed

18 files changed

+274
-212
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ members = [
1111
"./examples/tauri/src-tauri",
1212
"./examples/legacy",
1313
"./examples/legacy-compat",
14+
"./examples/binario",
1415
]
1516

1617
[workspace.dependencies]

crates/binario/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ publish = false # TODO: Crate metadata & publish
77

88
[dependencies]
99
binario = "0.0.2"
10-
futures.workspace = true # TODO: Drop this or drop down to `futures-util`
10+
futures-util.workspace = true
1111
rspc = { path = "../../rspc" }
1212
specta = { workspace = true }
1313
tokio = "1.42.0"

crates/binario/client.ts

Lines changed: 0 additions & 17 deletions
This file was deleted.

crates/binario/src/lib.rs

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
html_favicon_url = "https://github.com/specta-rs/rspc/blob/main/.github/logo.png?raw=true"
77
)]
88

9-
use std::pin::Pin;
9+
use std::{error, fmt, marker::PhantomData, pin::Pin};
1010

1111
use binario::{encode, Decode, Encode};
12-
use futures::{executor::block_on, Stream};
12+
use futures_util::{stream, Stream};
1313
use rspc::{
1414
middleware::Middleware, DynInput, ProcedureError, ProcedureStream, ResolverInput,
1515
ResolverOutput,
@@ -34,38 +34,33 @@ impl BinarioInput {
3434
}
3535
}
3636

37-
pub struct Binario<T>(pub T);
37+
pub struct TypedBinarioInput<T>(pub BinarioInput, pub PhantomData<T>);
3838

39-
impl<T: Decode + Type + Send + 'static> ResolverInput for Binario<T> {
39+
impl<T: Decode + Type + Send + 'static> ResolverInput for TypedBinarioInput<T> {
4040
fn data_type(types: &mut TypeCollection) -> DataType {
4141
T::inline(types, Generics::Definition)
4242
}
4343

4444
fn from_input(input: DynInput) -> Result<Self, ProcedureError> {
45-
let stream: BinarioInput = input.value()?;
46-
47-
// TODO: `block_on` bad
48-
match stream.0 {
49-
Repr::Bytes(bytes) => block_on(binario::decode::<T, _>(bytes.as_slice())),
50-
Repr::Stream(stream) => block_on(binario::decode::<T, _>(stream)),
51-
}
52-
.map_err(|err| panic!("{err:?}")) // TODO: Error handling
53-
.map(Self)
45+
Ok(Self(input.value()?, PhantomData))
5446
}
5547
}
5648

5749
// TODO: Streaming instead of this
5850
pub struct BinarioOutput(pub Vec<u8>);
51+
pub struct TypedBinarioOutput<T>(pub T);
5952

60-
impl<TError, T: Encode + Type + Send + Sync + 'static> ResolverOutput<TError> for Binario<T> {
53+
impl<TError, T: Encode + Type + Send + Sync + 'static> ResolverOutput<TError>
54+
for TypedBinarioOutput<T>
55+
{
6156
type T = BinarioOutput;
6257

6358
fn data_type(types: &mut TypeCollection) -> DataType {
6459
T::inline(types, Generics::Definition)
6560
}
6661

6762
fn into_stream(self) -> impl Stream<Item = Result<Self::T, ProcedureError>> + Send + 'static {
68-
futures::stream::once(async move {
63+
stream::once(async move {
6964
let mut buf = Vec::new();
7065
encode(&self.0, &mut buf).await.unwrap(); // TODO: Error handling
7166
Ok(BinarioOutput(buf))
@@ -79,15 +74,46 @@ impl<TError, T: Encode + Type + Send + Sync + 'static> ResolverOutput<TError> fo
7974
}
8075
}
8176

82-
pub fn binario<TError, TCtx, TInput, TResult>(
83-
) -> Middleware<TError, TCtx, Binario<TInput>, Binario<TResult>, TCtx, TInput, TResult>
77+
pub fn binario<TError, TCtx, TInput, TResult>() -> Middleware<
78+
TError,
79+
TCtx,
80+
TypedBinarioInput<TInput>,
81+
TypedBinarioOutput<TResult>,
82+
TCtx,
83+
TInput,
84+
TResult,
85+
>
8486
where
85-
TError: Send + 'static,
87+
TError: From<DeserializeError> + Send + 'static,
8688
TCtx: Send + 'static,
8789
TInput: Decode + Send + 'static,
8890
TResult: Encode + Send + Sync + 'static,
8991
{
90-
Middleware::new(move |ctx: TCtx, input: Binario<TInput>, next| async move {
91-
next.exec(ctx, input.0).await.map(Binario)
92-
})
92+
Middleware::new(
93+
move |ctx: TCtx, input: TypedBinarioInput<TInput>, next| async move {
94+
let input = match input.0 .0 {
95+
Repr::Bytes(bytes) => binario::decode::<TInput, _>(bytes.as_slice()).await,
96+
Repr::Stream(stream) => binario::decode::<TInput, _>(stream).await,
97+
}
98+
.map_err(DeserializeError)?;
99+
100+
next.exec(ctx, input).await.map(TypedBinarioOutput)
101+
},
102+
)
103+
}
104+
105+
pub struct DeserializeError(pub std::io::Error);
106+
107+
impl fmt::Debug for DeserializeError {
108+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109+
self.0.fmt(f)
110+
}
93111
}
112+
113+
impl fmt::Display for DeserializeError {
114+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115+
self.0.fmt(f)
116+
}
117+
}
118+
119+
impl error::Error for DeserializeError {}

crates/procedure/src/error.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ use serde::{
55
Serialize, Serializer,
66
};
77

8-
use crate::LegacyErrorInterop;
9-
108
// TODO: Discuss the stability guanrantees of the error handling system. Variant is fixed, message is not.
119

1210
/// TODO
@@ -104,12 +102,6 @@ impl Serialize for ProcedureError {
104102
S: Serializer,
105103
{
106104
if let ProcedureError::Resolver(err) = self {
107-
// if let Some(err) = err.error() {
108-
// if let Some(v) = err.downcast_ref::<LegacyErrorInterop>() {
109-
// return v.0.serialize(serializer);
110-
// }
111-
// }
112-
113105
return err.value().serialize(serializer);
114106
}
115107

examples/axum/Cargo.toml

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,18 @@ publish = false
77
[dependencies]
88
rspc = { path = "../../rspc", features = ["typescript", "rust"] }
99
rspc-axum = { path = "../../integrations/axum", features = ["ws"] }
10+
rspc-devtools = { version = "0.0.0", path = "../../crates/devtools" }
11+
rspc-invalidation = { version = "0.0.0", path = "../../crates/invalidation" }
12+
rspc-http = { version = "0.2.1", path = "../../integrations/http" }
13+
rspc-zer = { version = "0.0.0", path = "../../crates/zer" }
1014
example-core = { path = "../core" }
15+
1116
tokio = { version = "1.42.0", features = ["full"] }
1217
axum = { version = "0.7.9", features = ["multipart"] }
1318
tower-http = { version = "0.6.2", default-features = false, features = [
1419
"cors",
1520
] }
16-
rspc-devtools = { version = "0.0.0", path = "../../crates/devtools" }
17-
rspc-invalidation = { version = "0.0.0", path = "../../crates/invalidation" }
18-
19-
futures = "0.3" # TODO
21+
futures = "0.3"
2022
serde_json = "1.0.134"
21-
rspc-http = { version = "0.2.1", path = "../../integrations/http" }
2223
streamunordered = "0.5.4"
23-
rspc-zer = { version = "0.0.0", path = "../../crates/zer" }
2424
serde.workspace = true
25-
binario = "0.0.2"
26-
rspc-binario = { version = "0.0.0", path = "../../crates/binario" }
27-
form_urlencoded = "1.2.1"
28-
axum-extra = { version = "0.9.6", features = ["async-read-body"] }
29-
tokio-util = { version = "0.7.13", features = ["compat"] }

examples/axum/src/main.rs

Lines changed: 13 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,22 @@
11
use axum::{
22
body::Body,
3-
extract::{Multipart, Request},
4-
http::{header, request::Parts, HeaderMap, HeaderName, StatusCode},
5-
routing::{get, on, post, MethodFilter, MethodRouter},
6-
Json,
3+
extract::Multipart,
4+
http::{header, request::Parts, HeaderMap},
5+
routing::{get, post},
76
};
8-
use axum_extra::body::AsyncReadBody;
97
use example_core::{mount, Ctx};
10-
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryStreamExt};
11-
use rspc::{DynOutput, ProcedureError, ProcedureStream, ProcedureStreamMap, Procedures, State};
12-
use rspc_binario::BinarioOutput;
8+
use futures::{Stream, StreamExt};
9+
use rspc::{DynOutput, ProcedureError, ProcedureStream, ProcedureStreamMap, Procedures};
1310
use rspc_invalidation::Invalidator;
14-
use serde_json::{de::SliceRead, value::RawValue, Value};
11+
use serde_json::Value;
1512
use std::{
1613
convert::Infallible,
17-
future::{poll_fn, Future},
14+
future::poll_fn,
1815
path::PathBuf,
19-
pin::{pin, Pin},
16+
pin::Pin,
2017
task::{Context, Poll},
2118
};
2219
use streamunordered::{StreamUnordered, StreamYield};
23-
use tokio_util::compat::FuturesAsyncReadCompatExt;
2420
use tower_http::cors::{Any, CorsLayer};
2521

2622
#[tokio::main]
@@ -56,39 +52,7 @@ async fn main() {
5652
.allow_origin(Any);
5753

5854
let app = axum::Router::new()
59-
.route("/", get(|| async { "Hello 'rspc'!" }))
60-
// .route(
61-
// "/upload",
62-
// post(|mut multipart: Multipart| async move {
63-
// println!("{:?}", multipart);
64-
// while let Some(field) = multipart.next_field().await.unwrap() {
65-
// println!(
66-
// "{:?} {:?} {:?}",
67-
// field.name().map(|v| v.to_string()),
68-
// field.content_type().map(|v| v.to_string()),
69-
// field.collect::<Vec<_>>().await
70-
// );
71-
// }
72-
// "Done!"
73-
// }),
74-
// )
75-
.route(
76-
"/rspc/custom",
77-
post(|| async move {
78-
// println!("{:?}", multipart);
79-
80-
// while let Some(field) = multipart.next_field().await.unwrap() {
81-
// println!(
82-
// "{:?} {:?} {:?}",
83-
// field.name().map(|v| v.to_string()),
84-
// field.content_type().map(|v| v.to_string()),
85-
// field.collect::<Vec<_>>().await
86-
// );
87-
// }
88-
89-
todo!();
90-
}),
91-
)
55+
.route("/", get(|| async { "rspc 🤝 Axum!" }))
9256
// .nest(
9357
// "/rspc",
9458
// rspc_axum::endpoint(procedures, |parts: Parts| {
@@ -114,63 +78,7 @@ async fn main() {
11478
}
11579

11680
pub fn rspc_handler(procedures: Procedures<Ctx>) -> axum::Router {
117-
let mut r = axum::Router::new();
118-
119-
r = r.route(
120-
"/binario",
121-
// This endpoint lacks batching, SFM's, etc but that's fine.
122-
post({
123-
let procedures = procedures.clone();
124-
125-
move |parts: Parts, body: Body| async move {
126-
let invalidator = rspc_invalidation::Invalidator::default();
127-
let (zer, _zer_response) = rspc_zer::Zer::from_request(
128-
"session",
129-
"some_secret".as_ref(),
130-
parts.headers.get("cookie"),
131-
)
132-
.unwrap(); // TODO: Error handling
133-
let ctx = Ctx {
134-
invalidator: invalidator.clone(),
135-
zer,
136-
};
137-
138-
// if parts.headers.get("Content-Type") != Some(&"text/x-binario".parse().unwrap()) {
139-
// // TODO: Error handling
140-
// }
141-
142-
let mut params = form_urlencoded::parse(parts.uri.query().unwrap_or("").as_bytes());
143-
let procedure_name = params
144-
.find(|(key, _)| key == "procedure")
145-
.map(|(_, value)| value)
146-
.unwrap(); // TODO: Error handling
147-
148-
let procedure = procedures.get(&procedure_name).unwrap(); // TODO: Error handling
149-
150-
let stream = procedure.exec_with_value(
151-
ctx.clone(),
152-
rspc_binario::BinarioInput::from_stream(
153-
body.into_data_stream()
154-
.map_err(|err| todo!()) // TODO: Error handling
155-
.into_async_read()
156-
.compat(),
157-
),
158-
);
159-
let mut headers = HeaderMap::new();
160-
headers.insert(header::CONTENT_TYPE, "text/x-binario".parse().unwrap());
161-
162-
(
163-
headers,
164-
Body::from_stream(stream.map(|v| match v {
165-
Ok(v) => Ok(Ok::<_, Infallible>(
166-
v.as_value::<BinarioOutput>().unwrap().0,
167-
)),
168-
Err(err) => todo!("{err:?}"),
169-
})),
170-
)
171-
}
172-
}),
173-
);
81+
let r = axum::Router::new();
17482

17583
// TODO: Support file upload and download
17684

@@ -426,9 +334,9 @@ impl<TCtx: Unpin + Clone + 'static, E: 'static> Stream for Prototype<TCtx, E> {
426334
}
427335
}
428336

429-
fn encode_msg(a: (), b: (), c: ()) {
430-
todo!();
431-
}
337+
// fn encode_msg(a: (), b: (), c: ()) {
338+
// todo!();
339+
// }
432340

433341
// TODO: support `GET`
434342
// r = r.route(

examples/binario/Cargo.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "example-binario"
3+
version = "0.0.0"
4+
edition = "2021"
5+
publish = false
6+
7+
[dependencies]
8+
rspc = { path = "../../rspc", features = ["typescript", "rust"] }
9+
rspc-binario = { version = "0.0.0", path = "../../crates/binario" }
10+
specta = { workspace = true, features = ["derive"] }
11+
tokio = { version = "1.42.0", features = ["full"] }
12+
axum = { version = "0.7.9", features = ["multipart"] }
13+
tower-http = { version = "0.6.2", default-features = false, features = [
14+
"cors",
15+
] }
16+
futures = "0.3"
17+
form_urlencoded = "1.2.1"
18+
axum-extra = { version = "0.9.6", features = ["async-read-body"] }
19+
tokio-util = { version = "0.7.13", features = ["compat"] }
20+
binario = "0.0.2"
21+
pin-project = "1.1.7"

0 commit comments

Comments
 (0)