Skip to content

Commit 7d18d8a

Browse files
committed
Add tests and fixes for streams
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 018a756 commit 7d18d8a

File tree

13 files changed

+171
-31
lines changed

13 files changed

+171
-31
lines changed

src/callback.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ struct BlockingCallbackSystem<Request, Response, Streams: StreamPack> {
179179
initialized: bool,
180180
}
181181

182-
impl<Request, Response, Streams> CallbackTrait<Request, Response, ()> for BlockingCallbackSystem<Request, Response, Streams>
182+
impl<Request, Response, Streams> CallbackTrait<Request, Response, Streams> for BlockingCallbackSystem<Request, Response, Streams>
183183
where
184184
Request: 'static + Send + Sync,
185185
Response: 'static + Send + Sync,
@@ -263,15 +263,16 @@ pub trait AsCallback<M> {
263263
fn as_callback(self) -> Callback<Self::Request, Self::Response, Self::Streams>;
264264
}
265265

266-
impl<Request, Response, M, Sys> AsCallback<BlockingCallbackMarker<(Request, Response, M)>> for Sys
266+
impl<Request, Response, Streams, M, Sys> AsCallback<BlockingCallbackMarker<(Request, Response, Streams, M)>> for Sys
267267
where
268-
Sys: IntoSystem<BlockingCallback<Request>, Response, M>,
268+
Sys: IntoSystem<BlockingCallback<Request, Streams>, Response, M>,
269269
Request: 'static + Send + Sync,
270270
Response: 'static + Send + Sync,
271+
Streams: StreamPack,
271272
{
272273
type Request = Request;
273274
type Response = Response;
274-
type Streams = ();
275+
type Streams = Streams;
275276

276277
fn as_callback(self) -> Callback<Self::Request, Self::Response, Self::Streams> {
277278
Callback::new(BlockingCallbackSystem {

src/channel.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ use crate::{
3131

3232
#[derive(Clone)]
3333
pub struct Channel<Streams: StreamPack = ()> {
34+
/// Stream channels that will let you send stream information. This will
35+
/// usually be a [`StreamChannel`] or a (possibly nested) tuple of
36+
/// `StreamChannel`s, whichever matches the [`StreamPack`] description.
37+
pub streams: Streams::Channel,
3438
inner: Arc<InnerChannel>,
35-
streams: Streams::Channel,
3639
_ignore: std::marker::PhantomData<Streams>,
3740
}
3841

@@ -67,13 +70,6 @@ impl<Streams: StreamPack> Channel<Streams> {
6770

6871
promise
6972
}
70-
71-
/// Get stream channels that will let you send stream information. This will
72-
/// usually be one [`StreamChannel`] or a (possibly nested) tuple of
73-
/// `StreamChannel`s, whichever matches the [`StreamPack`] description.
74-
pub fn streams(&self) -> &Streams::Channel {
75-
&self.streams
76-
}
7773
}
7874

7975
#[derive(Clone)]

src/flush.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ fn collect_from_channels(
125125
(item)(world, roster);
126126
}
127127

128+
roster.process_deferals();
129+
128130
world.get_resource_or_insert_with(|| ServiceLifecycleChannel::new());
129131
world.resource_scope::<ServiceLifecycleChannel, ()>(|world, lifecycles| {
130132
// Clean up the dangling requests of any services that have been despawned.

src/impulse/taken.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ impl<T: 'static + Send + Sync> Impulsive for TakenStream<T> {
9292
let Input { data, .. } = source_mut.take_input::<T>()?;
9393
let stream = source_mut.get::<TakenStream<T>>().or_broken()?;
9494
stream.sender.send(data).ok();
95-
source_mut.despawn_recursive();
9695
Ok(())
9796
}
9897
}

src/input.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,26 @@ impl<T: 'static + Send + Sync> InputBundle<T> {
8484
}
8585

8686
pub trait ManageInput {
87+
/// Give an input to this node. The node will be queued up to immediately
88+
/// process the input.
8789
fn give_input<T: 'static + Send + Sync>(
8890
&mut self,
8991
session: Entity,
9092
data: T,
9193
roster: &mut OperationRoster,
9294
) -> Result<(), OperationError>;
9395

96+
/// Same as [`Self::give_input`], but the wakeup for this node will be
97+
/// deferred until after the [`ChannelQueue`](crate::ChannelQueue) is flushed.
98+
/// This is used for async output to ensure that all async operations are
99+
/// finished being processed before the final output gets processed.
100+
fn defer_input<T: 'static + Send + Sync>(
101+
&mut self,
102+
session: Entity,
103+
data: T,
104+
roster: &mut OperationRoster,
105+
) -> Result<(), OperationError>;
106+
94107
/// Give an input to this node without flagging it in the roster. This
95108
/// should not generally be used. It's only for special cases where we know
96109
/// the node will be manually run after giving this input. It's marked
@@ -181,6 +194,17 @@ impl<'w> ManageInput for EntityMut<'w> {
181194
Ok(())
182195
}
183196

197+
fn defer_input<T: 'static + Send + Sync>(
198+
&mut self,
199+
session: Entity,
200+
data: T,
201+
roster: &mut OperationRoster,
202+
) -> Result<(), OperationError> {
203+
unsafe { self.sneak_input(session, data)?; }
204+
roster.defer(self.id());
205+
Ok(())
206+
}
207+
184208
unsafe fn sneak_input<T: 'static + Send + Sync>(
185209
&mut self,
186210
session: Entity,

src/lib.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub struct BlockingService<Request, Streams: StreamPack = ()> {
128128
}
129129

130130
/// Use this to reduce bracket noise when you need `In<BlockingService<R>>`.
131-
pub type InBlockingService<Request, Streams = ()> = In<BlockingService<Request, Streams>>;
131+
pub type BlockingServiceInput<Request, Streams = ()> = In<BlockingService<Request, Streams>>;
132132

133133
/// Use AsyncService to indicate that your system is an async [`Service`]. Being
134134
/// async means it must return a [`Future<Output=Response>`](std::future::Future)
@@ -152,8 +152,8 @@ pub struct AsyncService<Request, Streams: StreamPack = ()> {
152152
pub session: Entity,
153153
}
154154

155-
/// Use this to reduce backet noise when you need `In<AsyncService<R, S>>`.
156-
pub type InAsyncService<Request, Streams = ()> = In<AsyncService<Request, Streams>>;
155+
/// Use this to reduce backet noise when you need `In<`[`AsyncService<R, S>`]`>`.
156+
pub type AsyncServiceInput<Request, Streams = ()> = In<AsyncService<Request, Streams>>;
157157

158158
/// Use BlockingCallback to indicate that your system is meant to define a
159159
/// blocking [`Callback`]. Callbacks are different from services because they are
@@ -174,6 +174,9 @@ pub struct BlockingCallback<Request, Streams: StreamPack = ()> {
174174
pub session: Entity,
175175
}
176176

177+
/// Use this to reduce bracket noise when you need `In<`[`BlockingCallback<R, S>`]`>`.
178+
pub type BlockingCallbackInput<Request, Streams = ()> = In<BlockingCallback<Request, Streams>>;
179+
177180
/// Use AsyncCallback to indicate that your system or function is meant to define
178181
/// an async [`Callback`]. An async callback is not associated with any entity,
179182
/// and it must return a [`Future<Output=Response>`](std::future::Future) that
@@ -192,6 +195,9 @@ pub struct AsyncCallback<Request, Streams: StreamPack = ()> {
192195
pub session: Entity,
193196
}
194197

198+
/// Use this to reduce bracket noise when you need `In<`[`AsyncCallback<R, S>`]`>`.
199+
pub type AsyncCallbackInput<Request, Streams = ()> = In<AsyncCallback<Request, Streams>>;
200+
195201
/// Use `BlockingMap`` to indicate that your function is a blocking map. A map
196202
/// is not associated with any entity, and it cannot be a Bevy System. These
197203
/// restrictions allow them to be processed more efficiently.
@@ -207,6 +213,9 @@ pub struct BlockingMap<Request, Streams: StreamPack = ()> {
207213
pub session: Entity,
208214
}
209215

216+
/// Use this to reduce the bracket noise when you need `In<`[`BlockingMap<R, S>`]`>`.
217+
pub type BlockingMapInput<Request, Streams = ()> = In<BlockingMap<Request, Streams>>;
218+
210219
/// Use AsyncMap to indicate that your function is an async map. A Map is not
211220
/// associated with any entity, and it cannot be a Bevy System. These
212221
/// restrictions allow them to be processed more efficiently.
@@ -226,3 +235,6 @@ pub struct AsyncMap<Request, Streams: StreamPack = ()> {
226235
/// The unique session ID for the workflow
227236
pub session: Entity,
228237
}
238+
239+
/// Use this to reduce bracket noise when you need `In<`[`AsyncMap<R, S>`]`>`.
240+
pub type AsyncMapInput<Request, Streams = ()> = In<AsyncMap<Request, Streams>>;

src/operation.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ pub struct OperationRoster {
185185
/// Tasks that should be awoken. If the task is already despawned, then
186186
/// it should not be considered an error.
187187
pub(crate) awake: VecDeque<Entity>,
188+
/// Operation sources that should be triggered after the next ChannelQueue
189+
/// flush. This is for the final outputs of polled tasks, to make sure their
190+
/// stream data gets flushed before their final output is flushed.
191+
pub(crate) deferred_queue: VecDeque<Entity>,
188192
/// Operation sources that should be cancelled
189193
pub(crate) cancel: VecDeque<Cancel>,
190194
/// Async services that should pull their next item
@@ -208,6 +212,10 @@ impl OperationRoster {
208212
self.awake.push_back(source);
209213
}
210214

215+
pub fn defer(&mut self, source: Entity) {
216+
self.deferred_queue.push_back(source);
217+
}
218+
211219
pub fn cancel(&mut self, source: Cancel) {
212220
self.cancel.push_back(source);
213221
}
@@ -227,6 +235,7 @@ impl OperationRoster {
227235
pub fn is_empty(&self) -> bool {
228236
self.queue.is_empty()
229237
&& self.awake.is_empty()
238+
&& self.deferred_queue.is_empty()
230239
&& self.cancel.is_empty()
231240
&& self.unblock.is_empty()
232241
&& self.disposed.is_empty()
@@ -246,6 +255,13 @@ impl OperationRoster {
246255
pub fn purge(&mut self, target: Entity) {
247256
self.queue.retain(|e| *e != target);
248257
}
258+
259+
/// Move all items from the deferred queue into the immediate queue
260+
pub fn process_deferals(&mut self) {
261+
for e in self.deferred_queue.drain(..) {
262+
self.queue.push_back(e);
263+
}
264+
}
249265
}
250266

251267
/// Notify the scope manager that a disposal took place. This will prompt the

src/operation/operate_task.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,10 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
224224
&mut Context::from_waker(&waker_ref(&waker))
225225
) {
226226
Poll::Ready(result) => {
227-
// Task has finished
228-
let r = world.entity_mut(target).give_input(session, result, roster);
227+
// Task has finished. We will defer its input until after the
228+
// ChannelQueue has been processed so that any streams from this
229+
// task will be delivered before the final output.
230+
let r = world.entity_mut(target).defer_input(session, result, roster);
229231
world.get_mut::<OperateTask<Response>>(source).or_broken()?.finished_normally = true;
230232
cleanup_task::<Response>(session, source, node, unblock, being_cleaned, world, roster);
231233
r?;

src/service.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ where
365365
#[cfg(test)]
366366
mod tests {
367367
use super::*;
368-
use crate::{BlockingService, InBlockingService, AsyncService, InAsyncService};
368+
use crate::{BlockingService, BlockingServiceInput, AsyncService, AsyncServiceInput};
369369
use bevy::{
370370
prelude::*,
371371
ecs::world::EntityMut,
@@ -485,7 +485,7 @@ mod tests {
485485
}
486486

487487
fn sys_async_service(
488-
In(AsyncService{ request, .. }): InAsyncService<String>,
488+
In(AsyncService{ request, .. }): AsyncServiceInput<String>,
489489
people: Query<&TestPeople>,
490490
) -> impl Future<Output=u64> {
491491
let mut matching_people = Vec::new();
@@ -507,7 +507,7 @@ mod tests {
507507
}
508508

509509
fn sys_blocking_service(
510-
In(BlockingService{ request, provider, .. }): InBlockingService<String>,
510+
In(BlockingService{ request, provider, .. }): BlockingServiceInput<String>,
511511
people: Query<&TestPeople>,
512512
multipliers: Query<&Multiplier>,
513513
) -> u64 {

src/service/async_srv.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717

1818
use crate::{
19-
AsyncService, InAsyncService, IntoService, ServiceTrait, ServiceBundle, ServiceRequest,
19+
AsyncService, AsyncServiceInput, IntoService, ServiceTrait, ServiceBundle, ServiceRequest,
2020
InnerChannel, ChannelQueue, OperationRoster, Blocker,
2121
StreamPack, ServiceBuilder, ChooseAsyncServiceDelivery, OperationRequest,
2222
OperationError, OrBroken, ManageInput, Input, OperateTask,
@@ -358,6 +358,6 @@ where
358358
}
359359
}
360360

361-
fn peel_async<Request>(In(AsyncService { request, .. }): InAsyncService<Request>) -> Request {
361+
fn peel_async<Request>(In(AsyncService { request, .. }): AsyncServiceInput<Request>) -> Request {
362362
request
363363
}

0 commit comments

Comments
 (0)