Skip to content

Commit edfdddc

Browse files
committed
Creating API for spawning workflows
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent b6500ce commit edfdddc

20 files changed

+632
-202
lines changed

src/chain.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use std::future::Future;
1919

2020
use crate::{
21-
UnusedTarget, Receive, PerformOperation,
22-
ForkClone, Chosen, ApplyLabel, Stream, Provider,
21+
UnusedTarget, Receive, AddOperation,
22+
ForkClone, Chosen, ApplyLabel, StreamPack, Provider,
2323
AsMap, IntoBlockingMap, IntoAsyncMap, EnterCancel,
2424
DetachDependency, DisposeOnCancel, Promise, Noop,
2525
Cancelled, ForkTargetStorage,
@@ -71,7 +71,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
7171
/// Have the impulse chain run until it is finished without holding onto any
7272
/// [`Promise`]. The final output will be automatically disposed.
7373
pub fn detach(self) {
74-
self.commands.add(PerformOperation::new(
74+
self.commands.add(AddOperation::new(
7575
self.target,
7676
Receive::<Response>::new(None, true),
7777
));
@@ -84,7 +84,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
8484
/// end of the chain.
8585
pub fn take(self) -> Promise<Response> {
8686
let (promise, sender) = Promise::new();
87-
self.commands.add(PerformOperation::new(
87+
self.commands.add(AddOperation::new(
8888
self.target,
8989
Receive::new(Some(sender), false),
9090
));
@@ -98,7 +98,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
9898
/// [`Chain::take`] together.
9999
pub fn detach_and_take(self) -> Promise<Response> {
100100
let (promise, sender) = Promise::new();
101-
self.commands.add(PerformOperation::new(
101+
self.commands.add(AddOperation::new(
102102
self.target,
103103
Receive::new(Some(sender), true),
104104
));
@@ -176,7 +176,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
176176
) -> Chain<'w, 's, 'a, P::Response, P::Streams, ModifiersUnset>
177177
where
178178
P::Response: 'static + Send + Sync,
179-
P::Streams: Stream,
179+
P::Streams: StreamPack,
180180
{
181181
let source = self.target;
182182
let target = self.commands.spawn(UnusedTarget).id();
@@ -193,7 +193,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
193193
where
194194
F::MapType: Provider<Request=Response>,
195195
<F::MapType as Provider>::Response: 'static + Send + Sync,
196-
<F::MapType as Provider>::Streams: Stream,
196+
<F::MapType as Provider>::Streams: StreamPack,
197197
{
198198
self.then(f.as_map())
199199
}
@@ -243,7 +243,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
243243
ThenResponse: 'static + Send + Sync,
244244
F: Provider<Request = Response, Response = Option<ThenResponse>>,
245245
F::Response: 'static + Send + Sync,
246-
F::Streams: Stream,
246+
F::Streams: StreamPack,
247247
{
248248
self.then(filter_provider).cancel_on_none()
249249
}
@@ -259,7 +259,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
259259
ThenResponse: 'static + Send + Sync,
260260
F: Provider<Request = Response, Response = Option<ThenResponse>>,
261261
F::Response: 'static + Send + Sync,
262-
F::Streams: Stream,
262+
F::Streams: StreamPack,
263263
{
264264
self.cancellation_filter(filter_provider).dispose_on_cancel()
265265
}
@@ -326,7 +326,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
326326
|_| self.commands.spawn(UnusedTarget).id()
327327
);
328328

329-
self.commands.add(PerformOperation::new(
329+
self.commands.add(AddOperation::new(
330330
source,
331331
ForkClone::<Response>::new(ForkTargetStorage::from_iter(targets)),
332332
));
@@ -358,7 +358,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
358358
let mut targets = ForkTargetStorage::new();
359359
targets.0.reserve(number_forks);
360360

361-
self.commands.add(PerformOperation::new(
361+
self.commands.add(AddOperation::new(
362362
source,
363363
ForkClone::<Response>::new(targets.clone())
364364
));
@@ -538,7 +538,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
538538
let source = self.target;
539539
let target = self.commands.spawn(UnusedTarget).id();
540540

541-
self.commands.add(PerformOperation::new(
541+
self.commands.add(AddOperation::new(
542542
source, Noop::<Response>::new(target),
543543
));
544544
Chain::new(source, target, self.commands)
@@ -601,7 +601,7 @@ where
601601
let target_ok = self.commands.spawn(UnusedTarget).id();
602602
let target_err = self.commands.spawn(UnusedTarget).id();
603603

604-
self.commands.add(PerformOperation::new(
604+
self.commands.add(AddOperation::new(
605605
source,
606606
make_result_branching::<T, E>(
607607
ForkTargetStorage::from_iter([target_ok, target_err])
@@ -640,7 +640,7 @@ where
640640
let source = self.target;
641641
let target = self.commands.spawn(UnusedTarget).id();
642642

643-
self.commands.add(PerformOperation::new(
643+
self.commands.add(AddOperation::new(
644644
source,
645645
make_cancel_filter_on_err::<T, E>(target),
646646
));
@@ -698,7 +698,7 @@ where
698698
let target_some = self.commands.spawn(UnusedTarget).id();
699699
let target_none = self.commands.spawn(UnusedTarget).id();
700700

701-
self.commands.add(PerformOperation::new(
701+
self.commands.add(AddOperation::new(
702702
source,
703703
make_option_branching::<T>(
704704
ForkTargetStorage::from_iter([target_some, target_none])
@@ -717,7 +717,7 @@ where
717717
let source = self.target;
718718
let target = self.commands.spawn(UnusedTarget).id();
719719

720-
self.commands.add(PerformOperation::new(
720+
self.commands.add(AddOperation::new(
721721
source,
722722
make_cancel_filter_on_none::<T>(target),
723723
));
@@ -786,7 +786,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L> Chain<'w, 's, 'a,
786786
) -> (Dangling<Response, Streams>, U) {
787787
let cancel_target = self.commands.spawn(UnusedTarget).id();
788788
let signal_target = self.commands.spawn(UnusedTarget).id();
789-
self.commands.add(PerformOperation::new(
789+
self.commands.add(AddOperation::new(
790790
cancel_target,
791791
EnterCancel::new(self.source, signal_target, signal),
792792
));

src/chain/dangling.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use crate::{
1919
Chain, OutputChain, ModifiersClosed, ModifiersUnset, UnusedTarget,
2020
FunnelInputStorage, JoinInput, ZipJoin,
21-
BundleJoin, PerformOperation,
21+
BundleJoin, AddOperation,
2222
};
2323

2424
use bevy::prelude::{Entity, Commands};
@@ -95,9 +95,9 @@ where
9595
let joiner = commands.spawn(()).id();
9696
let target = commands.spawn(UnusedTarget).id();
9797

98-
commands.add(PerformOperation::new(input_a, JoinInput::<A>::new(joiner)));
99-
commands.add(PerformOperation::new(input_b, JoinInput::<B>::new(joiner)));
100-
commands.add(PerformOperation::new(
98+
commands.add(AddOperation::new(input_a, JoinInput::<A>::new(joiner)));
99+
commands.add(AddOperation::new(input_b, JoinInput::<B>::new(joiner)));
100+
commands.add(AddOperation::new(
101101
joiner,
102102
ZipJoin::<Self::JoinedResponse>::new(
103103
FunnelInputStorage::from_iter([input_a, input_b]),
@@ -141,10 +141,10 @@ where
141141
let joiner = commands.spawn(()).id();
142142
let target = commands.spawn(UnusedTarget).id();
143143

144-
commands.add(PerformOperation::new(input_a, JoinInput::<A>::new(joiner)));
145-
commands.add(PerformOperation::new(input_b, JoinInput::<B>::new(joiner)));
146-
commands.add(PerformOperation::new(input_c, JoinInput::<C>::new(joiner)));
147-
commands.add(PerformOperation::new(
144+
commands.add(AddOperation::new(input_a, JoinInput::<A>::new(joiner)));
145+
commands.add(AddOperation::new(input_b, JoinInput::<B>::new(joiner)));
146+
commands.add(AddOperation::new(input_c, JoinInput::<C>::new(joiner)));
147+
commands.add(AddOperation::new(
148148
joiner,
149149
ZipJoin::<Self::JoinedResponse>::new(
150150
FunnelInputStorage::from_iter([input_a, input_b, input_c]),
@@ -266,14 +266,14 @@ where
266266
);
267267
let joiner = commands.spawn(()).id();
268268
for input in &inputs.0 {
269-
commands.add(PerformOperation::new(
269+
commands.add(AddOperation::new(
270270
*input,
271271
JoinInput::<Response>::new(joiner),
272272
));
273273
}
274274

275275
let target = commands.spawn(UnusedTarget).id();
276-
commands.add(PerformOperation::new(
276+
commands.add(AddOperation::new(
277277
joiner,
278278
BundleJoin::<Response>::new(inputs, target),
279279
));

src/chain/fork_clone_builder.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use bevy::prelude::{Entity, Commands};
1919

20-
use crate::{OutputChain, UnusedTarget, PerformOperation, ForkClone, ForkTargetStorage};
20+
use crate::{OutputChain, UnusedTarget, AddOperation, ForkClone, ForkTargetStorage};
2121

2222
pub trait ForkCloneBuilder<Response> {
2323
type Outputs;
@@ -45,7 +45,7 @@ where
4545
let target_0 = commands.spawn(UnusedTarget).id();
4646
let target_1 = commands.spawn(UnusedTarget).id();
4747

48-
commands.add(PerformOperation::new(
48+
commands.add(AddOperation::new(
4949
source,
5050
ForkClone::<R>::new(
5151
ForkTargetStorage::from_iter([target_0, target_1])
@@ -76,7 +76,7 @@ where
7676
let target_1 = commands.spawn(UnusedTarget).id();
7777
let target_2 = commands.spawn(UnusedTarget).id();
7878

79-
commands.add(PerformOperation::new(
79+
commands.add(AddOperation::new(
8080
source,
8181
ForkClone::<R>::new(
8282
ForkTargetStorage::from_iter([target_0, target_1, target_2])

src/chain/unzip.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use smallvec::SmallVec;
2121

2222
use crate::{
2323
Dangling, UnusedTarget, ForkTargetStorage, OperationRequest, Input, ManageInput,
24-
ForkUnzip, PerformOperation, OutputChain, FunnelInputStorage, OperationResult,
24+
ForkUnzip, AddOperation, OutputChain, FunnelInputStorage, OperationResult,
2525
SingleTargetStorage, OrBroken, OperationReachability,
2626
OperationError, InspectInput,
2727
};
@@ -68,7 +68,7 @@ impl<A: 'static + Send + Sync> Unzippable for (A,) {
6868

6969
let result = Dangling::new(source, targets[0]);
7070

71-
commands.add(PerformOperation::new(
71+
commands.add(AddOperation::new(
7272
source,
7373
ForkUnzip::<Self>::new(ForkTargetStorage(targets)),
7474
));
@@ -155,7 +155,7 @@ impl<A: 'static + Send + Sync, B: 'static + Send + Sync> Unzippable for (A, B) {
155155
Dangling::new(source, targets[1]),
156156
);
157157

158-
commands.add(PerformOperation::new(
158+
commands.add(AddOperation::new(
159159
source,
160160
ForkUnzip::<Self>::new(ForkTargetStorage(targets)),
161161
));
@@ -272,7 +272,7 @@ where
272272
Dangling::new(source, targets[2]),
273273
);
274274

275-
commands.add(PerformOperation::new(
275+
commands.add(AddOperation::new(
276276
source,
277277
ForkUnzip::<Self>::new(ForkTargetStorage(targets)),
278278
));

src/channel.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use bevy::{
2222

2323
use crossbeam::channel::{unbounded, Sender as CbSender, Receiver as CbReceiver};
2424

25-
use crate::{Stream, Provider, Promise, RequestExt, OperationRoster};
25+
use crate::{StreamPack, Provider, Promise, RequestExt, OperationRoster};
2626

2727
#[derive(Clone)]
2828
pub struct Channel<Streams = ()> {
@@ -49,7 +49,7 @@ impl<Streams> Channel<Streams> {
4949
where
5050
P::Request: 'static + Send + Sync,
5151
P::Response: 'static + Send + Sync,
52-
P::Streams: 'static + Stream,
52+
P::Streams: 'static + StreamPack,
5353
P: 'static + Send,
5454
{
5555
self.build(move |commands| {
@@ -119,7 +119,7 @@ struct StreamCommand<T> {
119119
data: T,
120120
}
121121

122-
impl<T: Stream> Command for StreamCommand<T> {
122+
impl<T: StreamPack> Command for StreamCommand<T> {
123123
fn apply(self, world: &mut World) {
124124
let Some(mut source_mut) = world.get_entity_mut(self.source) else {
125125
return;

src/handler.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
use crate::{
1919
BlockingHandler, AsyncHandler, Channel, InnerChannel, ChannelQueue,
20-
OperationRoster, Stream, Input, Provider,
21-
PerformOperation, OperateHandler, ManageInput, OperationError,
20+
OperationRoster, StreamPack, Input, Provider,
21+
AddOperation, OperateHandler, ManageInput, OperationError,
2222
OrBroken, OperateTask, Operation, OperationSetup,
2323
};
2424

@@ -113,7 +113,7 @@ impl<'a> HandleRequest<'a> {
113113
Ok(())
114114
}
115115

116-
fn get_channel<Streams: Stream>(&mut self) -> Channel<Streams> {
116+
fn get_channel<Streams: StreamPack>(&mut self) -> Channel<Streams> {
117117
let sender = self.world.get_resource_or_insert_with(|| ChannelQueue::new()).sender.clone();
118118
let channel = InnerChannel::new(self.source, sender);
119119
channel.into_specific()
@@ -183,7 +183,7 @@ where
183183
Task: Future + 'static + Send,
184184
Request: 'static + Send + Sync,
185185
Task::Output: 'static + Send + Sync,
186-
Streams: Stream,
186+
Streams: StreamPack,
187187
{
188188
fn handle(&mut self, mut input: HandleRequest) -> Result<(), OperationError> {
189189
let Input { session, data: request } = input.get_request()?;
@@ -251,7 +251,7 @@ where
251251
Task: Future + 'static + Send,
252252
Request: 'static + Send + Sync,
253253
Task::Output: 'static + Send + Sync,
254-
Streams: Stream,
254+
Streams: StreamPack,
255255
{
256256
type Request = Request;
257257
type Response = Task::Output;
@@ -291,7 +291,7 @@ where
291291
Task: Future + 'static + Send,
292292
Request: 'static + Send + Sync,
293293
Task::Output: 'static + Send + Sync,
294-
Streams: Stream,
294+
Streams: StreamPack,
295295
{
296296
type Request = Request;
297297
type Response = Task::Output;
@@ -394,13 +394,13 @@ impl<Request, Response, Streams> Provider for Handler<Request, Response, Streams
394394
where
395395
Request: 'static + Send + Sync,
396396
Response: 'static + Send + Sync,
397-
Streams: Stream,
397+
Streams: StreamPack,
398398
{
399399
type Request = Request;
400400
type Response = Response;
401401
type Streams = Streams;
402402

403403
fn provide(self, source: Entity, target: Entity, commands: &mut bevy::prelude::Commands) {
404-
commands.add(PerformOperation::new(source, OperateHandler::new(self, target)));
404+
commands.add(AddOperation::new(source, OperateHandler::new(self, target)));
405405
}
406406
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ pub use input::*;
4242
pub mod map;
4343
pub use map::*;
4444

45+
pub mod node;
46+
pub use node::*;
47+
4548
pub mod operation;
4649
pub use operation::*;
4750

0 commit comments

Comments
 (0)