Skip to content

Commit 3d8ec82

Browse files
committed
Make scopes off of Chain
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent a0ffc4f commit 3d8ec82

File tree

2 files changed

+84
-33
lines changed

2 files changed

+84
-33
lines changed

src/builder.rs

Lines changed: 46 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -123,37 +123,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
123123
Streams: StreamPack,
124124
{
125125
let scope_id = self.commands.spawn(()).id();
126-
let exit_scope = self.commands.spawn(UnusedTarget).id();
127-
let operation = OperateScope::<Request, Response, Streams>::new(
128-
scope_id, Some(exit_scope), settings, self.commands,
129-
);
130-
self.commands.add(AddOperation::new(scope_id, operation));
131-
132-
let (stream_in, stream_out) = Streams::spawn_scope_streams(
133-
scope_id,
134-
self.scope,
135-
self.commands,
136-
);
137-
138-
let mut builder = Builder {
139-
scope: scope_id,
140-
finish_scope_cancel: operation.finish_cancel(),
141-
commands: self.commands,
142-
};
143-
144-
let scope = Scope {
145-
input: Output::new(scope_id, operation.enter_scope()),
146-
terminate: InputSlot::new(scope_id, operation.terminal()),
147-
streams: stream_in,
148-
};
149-
150-
build(scope, &mut builder);
151-
152-
Node {
153-
input: InputSlot::new(self.scope, scope_id),
154-
output: Output::new(self.scope, exit_scope),
155-
streams: stream_out,
156-
}
126+
self.create_scope_impl(scope_id, settings, build)
157127
}
158128

159129
/// It is possible for a scope to be cancelled before it terminates. Even a
@@ -216,4 +186,49 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
216186
pub fn commands(&'a mut self) -> &'a mut Commands<'w, 's> {
217187
&mut self.commands
218188
}
189+
190+
/// Used internally to create scopes in different ways.
191+
pub(crate) fn create_scope_impl<Request, Response, Streams>(
192+
&mut self,
193+
scope_id: Entity,
194+
settings: ScopeSettings,
195+
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder),
196+
) -> Node<Request, Response, Streams>
197+
where
198+
Request: 'static + Send + Sync,
199+
Response: 'static + Send + Sync,
200+
Streams: StreamPack,
201+
{
202+
let exit_scope = self.commands.spawn(UnusedTarget).id();
203+
let operation = OperateScope::<Request, Response, Streams>::new(
204+
scope_id, Some(exit_scope), settings, self.commands,
205+
);
206+
self.commands.add(AddOperation::new(scope_id, operation));
207+
208+
let (stream_in, stream_out) = Streams::spawn_scope_streams(
209+
scope_id,
210+
self.scope,
211+
self.commands,
212+
);
213+
214+
let mut builder = Builder {
215+
scope: scope_id,
216+
finish_scope_cancel: operation.finish_cancel(),
217+
commands: self.commands,
218+
};
219+
220+
let scope = Scope {
221+
input: Output::new(scope_id, operation.enter_scope()),
222+
terminate: InputSlot::new(scope_id, operation.terminal()),
223+
streams: stream_in,
224+
};
225+
226+
build(scope, &mut builder);
227+
228+
Node {
229+
input: InputSlot::new(self.scope, scope_id),
230+
output: Output::new(self.scope, exit_scope),
231+
streams: stream_out,
232+
}
233+
}
219234
}

src/chain.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ use bevy::prelude::Entity;
2121

2222
use crate::{
2323
UnusedTarget, AddOperation, Node, InputSlot, Builder,
24-
ForkClone, StreamPack, Provider, ProvideOnce,
24+
ForkClone, StreamPack, Provider, ProvideOnce, Scope,
2525
AsMap, IntoBlockingMap, IntoAsyncMap, Output, Noop,
26-
ForkTargetStorage, StreamTargetMap,
26+
ForkTargetStorage, StreamTargetMap, ScopeSettings,
2727
make_result_branching, make_cancel_filter_on_err,
2828
make_option_branching, make_cancel_filter_on_none,
2929
};
@@ -215,6 +215,42 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
215215
self.then_node(f.into_async_map())
216216
}
217217

218+
/// Build a workflow scope to be used as an element in this chain.
219+
///
220+
/// If you want to connect to the stream outputs, use
221+
/// [`Self::then_scope_node`] instead.
222+
#[must_use]
223+
pub fn then_scope<Response, Streams>(
224+
self,
225+
settings: ScopeSettings,
226+
build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder),
227+
) -> Chain<'w, 's, 'a, 'b, Response>
228+
where
229+
Response: 'static + Send + Sync,
230+
Streams: StreamPack,
231+
{
232+
self.builder.create_scope_impl::<T, Response, Streams>(
233+
self.target, settings, build,
234+
).output.chain(self.builder)
235+
}
236+
237+
/// From the current target in the chain, build a [scoped](Scope) workflow
238+
/// and then get back a node that represents that scoped workflow.
239+
#[must_use]
240+
pub fn then_scope_node<Response, Streams>(
241+
self,
242+
settings: ScopeSettings,
243+
build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder),
244+
) -> Node<T, Response, Streams>
245+
where
246+
Response: 'static + Send + Sync,
247+
Streams: StreamPack,
248+
{
249+
self.builder.create_scope_impl::<T, Response, Streams>(
250+
self.target, settings, build,
251+
)
252+
}
253+
218254
/// Apply a [`Provider`] that filters the response by returning an [`Option`].
219255
/// If the filter returns [`None`] then a cancellation is triggered.
220256
/// Otherwise the chain continues with the value given inside [`Some`].

0 commit comments

Comments
 (0)