Skip to content

Commit 9dd64e6

Browse files
committed
Support streams for blocking functions
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 0edfdb2 commit 9dd64e6

File tree

10 files changed

+249
-75
lines changed

10 files changed

+249
-75
lines changed

src/handler.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,16 @@ pub trait HandlerTrait<Request, Response, Streams> {
155155

156156
pub struct BlockingHandlerMarker<M>(std::marker::PhantomData<M>);
157157

158-
struct BlockingHandlerSystem<Request, Response> {
159-
system: BoxedSystem<BlockingHandler<Request>, Response>,
158+
struct BlockingHandlerSystem<Request, Response, Streams: StreamPack> {
159+
system: BoxedSystem<BlockingHandler<Request, Streams>, Response>,
160160
initialized: bool,
161161
}
162162

163-
impl<Request, Response> HandlerTrait<Request, Response, ()> for BlockingHandlerSystem<Request, Response>
163+
impl<Request, Response, Streams> HandlerTrait<Request, Response, ()> for BlockingHandlerSystem<Request, Response, Streams>
164164
where
165165
Request: 'static + Send + Sync,
166166
Response: 'static + Send + Sync,
167+
Streams: StreamPack,
167168
{
168169
fn handle(&mut self, mut input: HandleRequest) -> Result<(), OperationError> {
169170
let Input { session, data: request } = input.get_request()?;
@@ -172,9 +173,13 @@ where
172173
self.system.initialize(&mut input.world);
173174
}
174175

175-
let response = self.system.run(BlockingHandler { request }, &mut input.world);
176+
let streams = Streams::make_buffer(input.source, input.world);
177+
178+
let response = self.system.run(BlockingHandler { request, streams: streams.clone() }, input.world);
176179
self.system.apply_deferred(&mut input.world);
177180

181+
Streams::process_buffer(streams, input.source, session, input.world, input.roster)?;
182+
178183
input.give_response(session, response)
179184
}
180185
}
@@ -273,20 +278,23 @@ where
273278
}
274279
}
275280

276-
impl<Request, Response, F> AsHandler<BlockingCallbackMarker<(Request, Response)>> for F
281+
impl<Request, Response, Streams, F> AsHandler<BlockingCallbackMarker<(Request, Response, Streams)>> for F
277282
where
278-
F: FnMut(BlockingHandler<Request>) -> Response + 'static + Send,
283+
F: FnMut(BlockingHandler<Request, Streams>) -> Response + 'static + Send,
279284
Request: 'static + Send + Sync,
280285
Response: 'static + Send + Sync,
286+
Streams: StreamPack,
281287
{
282288
type Request = Request;
283289
type Response = Response;
284-
type Streams = ();
290+
type Streams = Streams;
285291

286292
fn as_handler(mut self) -> Handler<Self::Request, Self::Response, Self::Streams> {
287293
let callback = move |mut input: HandleRequest| {
288294
let Input { session, data: request } = input.get_request::<Self::Request>()?;
289-
let response = (self)(BlockingHandler { request });
295+
let streams = Streams::make_buffer(input.source, input.world);
296+
let response = (self)(BlockingHandler { request, streams: streams.clone() });
297+
Streams::process_buffer(streams, input.source, session, input.world, input.roster)?;
290298
input.give_response(session, response)
291299
};
292300
Handler::new(CallbackHandler { callback })
@@ -335,7 +343,7 @@ where
335343
}
336344
}
337345

338-
fn peel_blocking<Request>(In(BlockingHandler { request }): In<BlockingHandler<Request>>) -> Request {
346+
fn peel_blocking<Request>(In(BlockingHandler { request, .. }): In<BlockingHandler<Request>>) -> Request {
339347
request
340348
}
341349

@@ -348,7 +356,7 @@ where
348356
type Request = Request;
349357
type Response = Response;
350358
fn into_blocking_handler(mut self) -> Handler<Self::Request, Self::Response, ()> {
351-
let f = move |BlockingHandler { request }| {
359+
let f = move |BlockingHandler { request, .. }| {
352360
(self)(request)
353361
};
354362

src/impulse/map.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,25 @@ use crate::{
3434
/// reusable, whereas [`crate::OperateBlockingMap`] is used in workflows which
3535
/// need to be reusable, so it can only support FnMut.
3636
#[derive(Bundle)]
37-
pub(crate) struct ImpulseBlockingMap<F, Request, Response>
37+
pub(crate) struct ImpulseBlockingMap<F, Request, Response, Streams>
3838
where
3939
F: 'static + Send + Sync,
4040
Request: 'static + Send + Sync,
4141
Response: 'static + Send + Sync,
42+
Streams: StreamPack,
4243
{
4344
f: BlockingMapOnceStorage<F>,
4445
target: SingleTargetStorage,
4546
#[bundle(ignore)]
46-
_ignore: std::marker::PhantomData<(Request, Response)>,
47+
_ignore: std::marker::PhantomData<(Request, Response, Streams)>,
4748
}
4849

49-
impl<F, Request, Response> ImpulseBlockingMap<F, Request, Response>
50+
impl<F, Request, Response, Streams> ImpulseBlockingMap<F, Request, Response, Streams>
5051
where
5152
F: 'static + Send + Sync,
5253
Request: 'static + Send + Sync,
5354
Response: 'static + Send + Sync,
55+
Streams: StreamPack,
5456
{
5557
pub(crate) fn new(target: Entity, f: F) -> Self {
5658
Self {
@@ -66,11 +68,12 @@ struct BlockingMapOnceStorage<F> {
6668
f: F,
6769
}
6870

69-
impl<F, Request, Response> Impulsive for ImpulseBlockingMap<F, Request, Response>
71+
impl<F, Request, Response, Streams> Impulsive for ImpulseBlockingMap<F, Request, Response, Streams>
7072
where
7173
Request: 'static + Send + Sync,
7274
Response: 'static + Send + Sync,
73-
F: CallBlockingMapOnce<Request, Response> + 'static + Send + Sync,
75+
Streams: StreamPack,
76+
F: CallBlockingMapOnce<Request, Response, Streams> + 'static + Send + Sync,
7477
{
7578
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
7679
world.entity_mut(source).insert((
@@ -83,12 +86,15 @@ where
8386
fn execute(
8487
OperationRequest { source, world, roster }: OperationRequest,
8588
) -> OperationResult {
89+
let streams = Streams::make_buffer(source, world);
8690
let mut source_mut = world.get_entity_mut(source).or_broken()?;
8791
let target = source_mut.get::<SingleTargetStorage>().or_broken()?.get();
8892
let Input { session, data: request } = source_mut.take_input::<Request>()?;
8993
let f = source_mut.take::<BlockingMapOnceStorage<F>>().or_broken()?.f;
9094

91-
let response = f.call(BlockingMap { request });
95+
let response = f.call(BlockingMap { request, streams: streams.clone() });
96+
97+
Streams::process_buffer(streams, source, session, world, roster)?;
9298

9399
world.get_entity_mut(target).or_broken()?.give_input(session, response, roster)?;
94100
Ok(())

src/lib.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,14 @@ use bevy::prelude::{Entity, In};
125125
/// }
126126
/// ```
127127
#[non_exhaustive]
128-
pub struct BlockingService<Request> {
128+
pub struct BlockingService<Request, Streams: StreamPack = ()> {
129129
pub request: Request,
130130
pub provider: Entity,
131+
pub streams: Streams::Buffer,
131132
}
132133

133134
/// Use this to reduce bracket noise when you need `In<BlockingService<R>>`.
134-
pub type InBlockingService<Request> = In<BlockingService<Request>>;
135+
pub type InBlockingService<Request, Streams = ()> = In<BlockingService<Request, Streams>>;
135136

136137
/// Use AsyncService to indicate that your system is an async service and to
137138
/// specify its input request type. Being async means it must return a
@@ -153,8 +154,9 @@ pub type InAsyncService<Request, Streams = ()> = In<AsyncService<Request, Stream
153154
/// blocking [`Handler`]. Handlers are different from services because they are
154155
/// not associated with any entity.
155156
#[non_exhaustive]
156-
pub struct BlockingHandler<Request> {
157+
pub struct BlockingHandler<Request, Streams: StreamPack = ()> {
157158
pub request: Request,
159+
pub streams: Streams::Buffer,
158160
}
159161

160162
/// Use AsyncHandler to indicate that your system or function is meant to define
@@ -172,8 +174,9 @@ pub struct AsyncHandler<Request, Streams: StreamPack = ()> {
172174
/// and it can only be used once, making it suitable for functions that only
173175
/// implement [`FnOnce`].
174176
#[non_exhaustive]
175-
pub struct BlockingMap<Request> {
177+
pub struct BlockingMap<Request, Streams: StreamPack = ()> {
176178
pub request: Request,
179+
pub streams: Streams::Buffer,
177180
}
178181

179182
/// Use AsyncMap to indicate that your function is meant to define an async

src/map.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,18 @@ pub trait AsMap<M> {
3535
}
3636

3737
/// A trait that all different ways of defining a Blocking Map must funnel into.
38-
pub(crate) trait CallBlockingMap<Request, Response> {
39-
fn call(&mut self, input: BlockingMap<Request>) -> Response;
38+
pub(crate) trait CallBlockingMap<Request, Response, Streams: StreamPack> {
39+
fn call(&mut self, input: BlockingMap<Request, Streams>) -> Response;
4040
}
4141

42-
impl<F, Request, Response> CallBlockingMap<Request, Response> for MapDef<F>
42+
impl<F, Request, Response, Streams> CallBlockingMap<Request, Response, Streams> for MapDef<F>
4343
where
44-
F: FnMut(BlockingMap<Request>) -> Response + 'static + Send + Sync,
44+
F: FnMut(BlockingMap<Request, Streams>) -> Response + 'static + Send + Sync,
4545
Request: 'static + Send + Sync,
4646
Response: 'static + Send + Sync,
47+
Streams: StreamPack,
4748
{
48-
fn call(&mut self, request: BlockingMap<Request>) -> Response {
49+
fn call(&mut self, request: BlockingMap<Request, Streams>) -> Response {
4950
(self.0)(request)
5051
}
5152
}
@@ -55,44 +56,46 @@ where
5556
///
5657
/// Maps cannot contain Bevy Systems; they can only contain objects that
5758
/// implement [`FnMut`].
58-
pub struct BlockingMapDef<Def, Request, Response> {
59+
pub struct BlockingMapDef<Def, Request, Response, Streams> {
5960
def: Def,
60-
_ignore: std::marker::PhantomData<(Request, Response)>,
61+
_ignore: std::marker::PhantomData<(Request, Response, Streams)>,
6162
}
6263

63-
impl<Def, Request, Response> ProvideOnce for BlockingMapDef<Def, Request, Response>
64+
impl<Def, Request, Response, Streams> ProvideOnce for BlockingMapDef<Def, Request, Response, Streams>
6465
where
65-
Def: CallBlockingMap<Request, Response> + 'static + Send + Sync,
66+
Def: CallBlockingMap<Request, Response, Streams> + 'static + Send + Sync,
6667
Request: 'static + Send + Sync,
6768
Response: 'static + Send + Sync,
69+
Streams: StreamPack,
6870
{
6971
type Request = Request;
7072
type Response = Response;
71-
type Streams = ();
73+
type Streams = Streams;
7274

7375
fn connect(self, source: Entity, target: Entity, commands: &mut Commands) {
7476
commands.add(AddOperation::new(source, OperateBlockingMap::new(target, self.def)));
7577
}
7678
}
7779

78-
impl<Def, Request, Response> Provider for BlockingMapDef<Def, Request, Response>
80+
impl<Def, Request, Response, Streams> Provider for BlockingMapDef<Def, Request, Response, Streams>
7981
where
80-
Def: CallBlockingMap<Request, Response> + 'static + Send + Sync,
82+
Def: CallBlockingMap<Request, Response, Streams> + 'static + Send + Sync,
8183
Request: 'static + Send + Sync,
8284
Response: 'static + Send + Sync,
85+
Streams: StreamPack,
8386
{
8487

8588
}
8689

8790
pub struct BlockingMapMarker;
8891

89-
impl<F, Request, Response> AsMap<(Request, Response, BlockingMapMarker)> for F
92+
impl<F, Request, Response, Streams> AsMap<(Request, Response, Streams, BlockingMapMarker)> for F
9093
where
9194
F: FnMut(BlockingMap<Request>) -> Response + 'static + Send + Sync,
9295
Request: 'static + Send + Sync,
9396
Response: 'static + Send + Sync,
9497
{
95-
type MapType = BlockingMapDef<MapDef<F>, Request, Response>;
98+
type MapType = BlockingMapDef<MapDef<F>, Request, Response, Streams>;
9699
fn as_map(self) -> Self::MapType {
97100
BlockingMapDef { def: MapDef(self), _ignore: Default::default() }
98101
}
@@ -104,25 +107,26 @@ pub trait IntoBlockingMap<M> {
104107
fn into_blocking_map(self) -> Self::MapType;
105108
}
106109

107-
impl<F, Request, Response> IntoBlockingMap<(Request, Response)> for F
110+
impl<F, Request, Response, Streams> IntoBlockingMap<(Request, Response, Streams)> for F
108111
where
109112
F: FnMut(Request) -> Response + 'static + Send + Sync,
110113
Request: 'static + Send + Sync,
111114
Response: 'static + Send + Sync,
115+
Streams: StreamPack,
112116
{
113-
type MapType = BlockingMapDef<BlockingMapAdapter<F>, Request, Response>;
117+
type MapType = BlockingMapDef<BlockingMapAdapter<F>, Request, Response, Streams>;
114118
fn into_blocking_map(self) -> Self::MapType {
115119
BlockingMapDef { def: BlockingMapAdapter(self), _ignore: Default::default() }
116120
}
117121
}
118122

119123
pub struct BlockingMapAdapter<F>(F);
120124

121-
impl<F, Request, Response> CallBlockingMap<Request, Response> for BlockingMapAdapter<F>
125+
impl<F, Request, Response> CallBlockingMap<Request, Response, ()> for BlockingMapAdapter<F>
122126
where
123127
F: FnMut(Request) -> Response,
124128
{
125-
fn call(&mut self, BlockingMap{ request }: BlockingMap<Request>) -> Response {
129+
fn call(&mut self, BlockingMap{ request, .. }: BlockingMap<Request, ()>) -> Response {
126130
(self.0)(request)
127131
}
128132
}

src/map_once.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,18 @@ pub trait AsMapOnce<M> {
3535
fn as_map_once(self) -> Self::MapType;
3636
}
3737

38-
pub(crate) trait CallBlockingMapOnce<Request, Response> {
39-
fn call(self, input: BlockingMap<Request>) -> Response;
38+
pub(crate) trait CallBlockingMapOnce<Request, Response, Streams: StreamPack> {
39+
fn call(self, input: BlockingMap<Request, Streams>) -> Response;
4040
}
4141

42-
impl<F, Request, Response> CallBlockingMapOnce<Request, Response> for MapOnceDef<F>
42+
impl<F, Request, Response, Streams> CallBlockingMapOnce<Request, Response, Streams> for MapOnceDef<F>
4343
where
44-
F: FnOnce(BlockingMap<Request>) -> Response + 'static + Send + Sync,
44+
F: FnOnce(BlockingMap<Request, Streams>) -> Response + 'static + Send + Sync,
4545
Request: 'static + Send + Sync,
4646
Response: 'static + Send + Sync,
47+
Streams: StreamPack,
4748
{
48-
fn call(self, input: BlockingMap<Request>) -> Response {
49+
fn call(self, input: BlockingMap<Request, Streams>) -> Response {
4950
(self.0)(input)
5051
}
5152
}
@@ -54,16 +55,17 @@ where
5455
///
5556
/// Maps cannot contain Bevy Systems; they can only contain objects that
5657
/// implement [`FnOnce`].
57-
pub struct BlockingMapOnceDef<Def, Request, Response> {
58+
pub struct BlockingMapOnceDef<Def, Request, Response, Streams> {
5859
def: Def,
59-
_ignore: std::marker::PhantomData<(Request, Response)>,
60+
_ignore: std::marker::PhantomData<(Request, Response, Streams)>,
6061
}
6162

62-
impl<Def, Request, Response> ProvideOnce for BlockingMapOnceDef<Def, Request, Response>
63+
impl<Def, Request, Response, Streams> ProvideOnce for BlockingMapOnceDef<Def, Request, Response, Streams>
6364
where
64-
Def: CallBlockingMapOnce<Request, Response> + 'static + Send + Sync,
65+
Def: CallBlockingMapOnce<Request, Response, Streams> + 'static + Send + Sync,
6566
Request: 'static + Send + Sync,
6667
Response: 'static + Send + Sync,
68+
Streams: StreamPack,
6769
{
6870
type Request = Request;
6971
type Response = Response;
@@ -74,13 +76,14 @@ where
7476
}
7577
}
7678

77-
impl<F, Request, Response> AsMapOnce<(Request, Response, BlockingMapMarker)> for F
79+
impl<F, Request, Response, Streams> AsMapOnce<(Request, Response, Streams, BlockingMapMarker)> for F
7880
where
79-
F: FnOnce(BlockingMap<Request>) -> Response + 'static + Send + Sync,
81+
F: FnOnce(BlockingMap<Request, Streams>) -> Response + 'static + Send + Sync,
8082
Request: 'static + Send + Sync,
8183
Response: 'static + Send + Sync,
84+
Streams: StreamPack,
8285
{
83-
type MapType = BlockingMapOnceDef<MapOnceDef<F>, Request, Response>;
86+
type MapType = BlockingMapOnceDef<MapOnceDef<F>, Request, Response, Streams>;
8487
fn as_map_once(self) -> Self::MapType {
8588
BlockingMapOnceDef { def: MapOnceDef(self), _ignore: Default::default() }
8689
}
@@ -98,19 +101,19 @@ where
98101
Request: 'static + Send + Sync,
99102
Response: 'static + Send + Sync,
100103
{
101-
type MapType = BlockingMapOnceDef<BlockingMapOnceAdapter<F>, Request, Response>;
104+
type MapType = BlockingMapOnceDef<BlockingMapOnceAdapter<F>, Request, Response, ()>;
102105
fn into_blocking_map_once(self) -> Self::MapType {
103106
BlockingMapOnceDef { def: BlockingMapOnceAdapter(self), _ignore: Default::default() }
104107
}
105108
}
106109

107110
pub struct BlockingMapOnceAdapter<F>(F);
108111

109-
impl<F, Request, Response> CallBlockingMapOnce<Request, Response> for BlockingMapOnceAdapter<F>
112+
impl<F, Request, Response> CallBlockingMapOnce<Request, Response, ()> for BlockingMapOnceAdapter<F>
110113
where
111114
F: FnOnce(Request) -> Response,
112115
{
113-
fn call(self, BlockingMap { request }: BlockingMap<Request>) -> Response {
116+
fn call(self, BlockingMap { request, .. }: BlockingMap<Request, ()>) -> Response {
114117
(self.0)(request)
115118
}
116119
}

0 commit comments

Comments
 (0)