Skip to content

Commit 4b0e630

Browse files
committed
Tweak scope builder API
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent ca2d7e1 commit 4b0e630

File tree

11 files changed

+166
-146
lines changed

11 files changed

+166
-146
lines changed

src/builder.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
Provider, UnusedTarget, StreamPack, Node, InputSlot, Output, StreamTargetMap,
2424
Buffer, BufferSettings, AddOperation, OperateBuffer, Scope, OperateScope,
2525
ScopeSettings, BeginCancel, ScopeEndpoints, IntoBlockingMap, IntoAsyncMap,
26-
AsMap, ProvideOnce,
26+
AsMap, ProvideOnce, ScopeSettingsStorage,
2727
};
2828

2929
pub(crate) mod connect;
@@ -164,19 +164,34 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
164164
/// through the workflow of the scope with a unique session ID. Even if
165165
/// multiple values are sent in from the same session, they will each be
166166
/// assigned their own unique session ID while inside of this scope.
167-
pub fn create_scope<Request, Response, Streams>(
167+
pub fn create_scope<Request, Response, Streams, Settings>(
168168
&mut self,
169-
settings: ScopeSettings,
170-
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder),
169+
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
171170
) -> Node<Request, Response, Streams>
172171
where
173172
Request: 'static + Send + Sync,
174173
Response: 'static + Send + Sync,
175174
Streams: StreamPack,
175+
Settings: Into<ScopeSettings>,
176176
{
177177
let scope_id = self.commands.spawn(()).id();
178178
let exit_scope = self.commands.spawn(UnusedTarget).id();
179-
self.create_scope_impl(scope_id, exit_scope, settings, build)
179+
self.create_scope_impl(scope_id, exit_scope, build)
180+
}
181+
182+
/// Alternative to [`Self::create_scope`] for pure input/output scopes (i.e.
183+
/// there are no output streams). Using this signature should allow the
184+
/// compiler to infer all the generic arguments when there are no streams.
185+
pub fn create_io_scope<Request, Response, Settings>(
186+
&mut self,
187+
build: impl FnOnce(Scope<Request, Response, ()>, &mut Builder) -> Settings,
188+
) -> Node<Request, Response, ()>
189+
where
190+
Request: 'static + Send + Sync,
191+
Response: 'static + Send + Sync,
192+
Settings: Into<ScopeSettings>,
193+
{
194+
self.create_scope::<Request, Response, (), Settings>(build)
180195
}
181196

182197
/// It is possible for a scope to be cancelled before it terminates. Even a
@@ -198,17 +213,19 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
198213
//
199214
// TODO(@mxgrey): Consider offering a setting to choose between whether each
200215
// buffer item gets its own session or whether they share a session.
201-
pub fn on_cancel<T: 'static + Send + Sync>(
216+
pub fn on_cancel<T, Settings>(
202217
&mut self,
203218
from_buffer: Buffer<T>,
204-
settings: ScopeSettings,
205-
build: impl FnOnce(Scope<T, (), ()>, &mut Builder),
206-
) {
219+
build: impl FnOnce(Scope<T, (), ()>, &mut Builder) -> Settings,
220+
)
221+
where
222+
T: 'static + Send + Sync,
223+
Settings: Into<ScopeSettings>,
224+
{
207225
let cancelling_scope_id = self.commands.spawn(()).id();
208-
let _ = self.create_scope_impl::<T, (), ()>(
226+
let _ = self.create_scope_impl::<T, (), (), Settings>(
209227
cancelling_scope_id,
210228
self.finish_scope_cancel,
211-
settings,
212229
build,
213230
);
214231

@@ -231,24 +248,24 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
231248
}
232249

233250
/// Used internally to create scopes in different ways.
234-
pub(crate) fn create_scope_impl<Request, Response, Streams>(
251+
pub(crate) fn create_scope_impl<Request, Response, Streams, Settings>(
235252
&mut self,
236253
scope_id: Entity,
237254
exit_scope: Entity,
238-
settings: ScopeSettings,
239-
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder),
255+
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder) -> Settings,
240256
) -> Node<Request, Response, Streams>
241257
where
242258
Request: 'static + Send + Sync,
243259
Response: 'static + Send + Sync,
244260
Streams: StreamPack,
261+
Settings: Into<ScopeSettings>,
245262
{
246263
let ScopeEndpoints {
247264
terminal,
248265
enter_scope,
249266
finish_scope_cancel
250267
} = OperateScope::<Request, Response, Streams>::add(
251-
Some(self.scope()), scope_id, Some(exit_scope), settings, self.commands,
268+
Some(self.scope()), scope_id, Some(exit_scope), self.commands,
252269
);
253270

254271
let (stream_in, stream_out) = Streams::spawn_scope_streams(
@@ -269,7 +286,8 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
269286
streams: stream_in,
270287
};
271288

272-
build(scope, &mut builder);
289+
let settings = build(scope, &mut builder).into();
290+
self.commands.entity(scope_id).insert(ScopeSettingsStorage(settings));
273291

274292
Node {
275293
input: InputSlot::new(self.scope, scope_id),

src/chain.rs

Lines changed: 45 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -220,18 +220,18 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
220220
/// If you want to connect to the stream outputs, use
221221
/// [`Self::then_scope_node`] instead.
222222
#[must_use]
223-
pub fn then_scope<Response, Streams>(
223+
pub fn then_scope<Response, Streams, Settings>(
224224
self,
225-
settings: ScopeSettings,
226-
build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder),
225+
build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
227226
) -> Chain<'w, 's, 'a, 'b, Response>
228227
where
229228
Response: 'static + Send + Sync,
230229
Streams: StreamPack,
230+
Settings: Into<ScopeSettings>,
231231
{
232232
let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
233-
self.builder.create_scope_impl::<T, Response, Streams>(
234-
self.target, exit_scope, settings, build,
233+
self.builder.create_scope_impl::<T, Response, Streams, Settings>(
234+
self.target, exit_scope, build,
235235
).output.chain(self.builder)
236236
}
237237

@@ -241,31 +241,32 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
241241
///
242242
/// Unlike `then_scope`, this function can infer the types for the generics
243243
/// so you don't need to explicitly specify them.
244-
pub fn then_io_scope<Response>(
244+
pub fn then_io_scope<Response, Settings>(
245245
self,
246-
build: impl FnOnce(Scope<T, Response, ()>, &mut Builder),
246+
build: impl FnOnce(Scope<T, Response, ()>, &mut Builder) -> Settings,
247247
) -> Chain<'w, 's, 'a, 'b, Response>
248248
where
249249
Response: 'static + Send + Sync,
250+
Settings: Into<ScopeSettings>,
250251
{
251-
self.then_scope(ScopeSettings::default(), build)
252+
self.then_scope(build)
252253
}
253254

254255
/// From the current target in the chain, build a [scoped](Scope) workflow
255256
/// and then get back a node that represents that scoped workflow.
256257
#[must_use]
257-
pub fn then_scope_node<Response, Streams>(
258+
pub fn then_scope_node<Response, Streams, Settings>(
258259
self,
259-
settings: ScopeSettings,
260-
build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder),
260+
build: impl FnOnce(Scope<T, Response, Streams>, &mut Builder) -> Settings,
261261
) -> Node<T, Response, Streams>
262262
where
263263
Response: 'static + Send + Sync,
264264
Streams: StreamPack,
265+
Settings: Into<ScopeSettings>,
265266
{
266267
let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
267-
self.builder.create_scope_impl::<T, Response, Streams>(
268-
self.target, exit_scope, settings, build,
268+
self.builder.create_scope_impl::<T, Response, Streams, Settings>(
269+
self.target, exit_scope, build,
269270
)
270271
}
271272

@@ -275,14 +276,15 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
275276
///
276277
/// Unlike `then_scope_node`, this function can infer the types for the
277278
/// generics so you don't need to explicitly specify them.
278-
pub fn then_io_scope_node<Response>(
279+
pub fn then_io_scope_node<Response, Settings>(
279280
self,
280-
build: impl FnOnce(Scope<T, Response, ()>, &mut Builder),
281+
build: impl FnOnce(Scope<T, Response, ()>, &mut Builder) -> Settings,
281282
) -> Node<T, Response, ()>
282283
where
283284
Response: 'static + Send + Sync,
285+
Settings: Into<ScopeSettings>,
284286
{
285-
self.then_scope_node(ScopeSettings::default(), build)
287+
self.then_scope_node(build)
286288
}
287289

288290
/// Apply a [`Provider`] that filters the response by returning an [`Option`].
@@ -499,14 +501,21 @@ where
499501
/// that trait, then you can use [`Self::cancel_on_quiet_err`] instead.
500502
///
501503
/// ```
502-
/// use bevy_impulse::{*, testing::*};
504+
/// use crate::{*, testing::*};
505+
///
503506
/// let mut context = TestingContext::minimal_plugins();
504-
/// let mut promise = context.build(|commands| {
507+
///
508+
/// let workflow = context.spawn_io_workflow(|scope, builder| {
509+
/// scope.input.chain(builder)
510+
/// .map_block(produce_err)
511+
/// .cancel_on_err()
512+
/// .connect(scope.terminate);
513+
/// });
514+
///
515+
/// let mut promise = context.command(|commands| {
505516
/// commands
506-
/// .provide("hello")
507-
/// .map_block(produce_err)
508-
/// .cancel_on_err()
509-
/// .take()
517+
/// .request("hello", workflow)
518+
/// .take_response()
510519
/// });
511520
///
512521
/// context.run_while_pending(&mut promise);
@@ -703,49 +712,41 @@ mod tests {
703712
fn test_join() {
704713
let mut context = TestingContext::minimal_plugins();
705714

706-
let workflow = context.build_io_workflow(|scope, builder| {
715+
let workflow = context.spawn_io_workflow(|scope, builder| {
707716
scope.input.chain(builder)
708-
.map(print_debug(format!("{}", line!())))
709717
// (2.0, 2.0)
710718
.unzip_build((
711719
|chain: Chain<f64>| chain
712720
// 2.0
713-
.map(print_debug(format!("{}", line!())))
714721
.map_block(|value|
715722
WaitRequest {
716723
duration: Duration::from_secs_f64(value/100.0),
717724
value,
718725
}
719726
)
720-
.map(print_debug(format!("{}", line!())))
721727
.map_async(wait)
722-
.map(print_debug(format!("{}", line!())))
723728
// 2.0
724729
.output(),
725730
|chain: Chain<f64>| chain
726731
// 2.0
727732
.map_block(|value| 2.0*value)
728-
.map(print_debug(format!("{}", line!())))
729733
// 4.0
730734
.output(),
731735
))
732736
.join(builder)
733-
.map(print_debug(format!("{}", line!())))
734737
// (2.0, 4.0)
735738
.map_block(add)
736-
.map(print_debug(format!("{}", line!())))
737739
// 6.0
738740
.connect(scope.terminate);
739741
});
740742

741-
let mut promise = context.build(|commands|
743+
let mut promise = context.command(|commands|
742744
commands
743745
.request((2.0, 2.0), workflow)
744746
.take_response()
745747
);
746748

747749
context.run_with_conditions(&mut promise, Duration::from_secs(2));
748-
dbg!(promise.peek());
749750
assert!(promise.peek().available().is_some_and(|value| *value == 6.0));
750751
assert!(context.no_unhandled_errors());
751752
}
@@ -754,12 +755,12 @@ mod tests {
754755
fn test_race() {
755756
let mut context = TestingContext::minimal_plugins();
756757

757-
let workflow = context.build_io_workflow(|scope, builder| {
758+
let workflow = context.spawn_io_workflow(|scope, builder| {
758759
scope.input.chain(builder)
759760
// (2.0, 2.0)
760761
.map_block(add)
761762
// 4.0
762-
.then_scope::<_, ()>(ScopeSettings::default(), |scope, builder| {
763+
.then_io_scope(|scope, builder| {
763764
scope.input.chain(builder)
764765
// 4.0
765766
.fork_clone((
@@ -794,7 +795,7 @@ mod tests {
794795
.connect(scope.terminate);
795796
});
796797

797-
let mut promise = context.build(|commands|
798+
let mut promise = context.command(|commands|
798799
commands
799800
.request((2.0, 2.0), workflow)
800801
.take_response()
@@ -814,11 +815,11 @@ mod tests {
814815
fn test_unzip() {
815816
let mut context = TestingContext::minimal_plugins();
816817

817-
let workflow = context.build_io_workflow(|scope, builder| {
818+
let workflow = context.spawn_io_workflow(|scope, builder| {
818819
scope.input.chain(builder)
819820
.map_block(add)
820821
.map_block(|v| (v, 2.0*v))
821-
.then_scope::<_, ()>(ScopeSettings::default(), |scope, builder| {
822+
.then_io_scope(|scope, builder| {
822823
scope.input.chain(builder)
823824
.unzip_build((
824825
|chain: Chain<f64>| {
@@ -844,7 +845,7 @@ mod tests {
844845
});
845846

846847

847-
let mut promise = context.build(|commands| {
848+
let mut promise = context.command(|commands| {
848849
commands
849850
.request((2.0, 3.0), workflow)
850851
.take_response()
@@ -859,38 +860,28 @@ mod tests {
859860
fn test_cancel_on_special_case() {
860861
let mut context = TestingContext::minimal_plugins();
861862

862-
dbg!();
863-
let workflow = context.build_io_workflow(|scope, builder| {
863+
let workflow = context.spawn_io_workflow(|scope, builder| {
864864
scope.input.chain(builder)
865865
.map_block(duplicate)
866-
.map(print_debug(format!("{}", line!())))
867866
.map_block(add)
868-
.map(print_debug(format!("{}", line!())))
869867
.map_block(produce_none)
870-
.map(print_debug(format!("{}", line!())))
871868
.cancel_on_none()
872-
.map(print_debug(format!("{}", line!())))
873869
.map_block(duplicate)
874-
.map(print_debug(format!("{}", line!())))
875870
.map_block(add)
876-
.map(print_debug(format!("{}", line!())))
877871
.connect(scope.terminate);
878872
});
879873

880-
dbg!();
881-
let mut promise = context.build(|commands| {
874+
let mut promise = context.command(|commands| {
882875
commands
883876
.request(2.0, workflow)
884877
.take_response()
885878
});
886879

887-
dbg!();
888880
context.run_with_conditions(&mut promise, Duration::from_secs(2));
889-
dbg!(context.get_unhandled_errors());
890881
assert!(promise.peek().is_cancelled());
891882
assert!(context.no_unhandled_errors());
892883

893-
let workflow = context.build_io_workflow(|scope, builder| {
884+
let workflow = context.spawn_io_workflow(|scope, builder| {
894885
scope.input.chain(builder)
895886
.map_block(duplicate)
896887
.map_block(add)
@@ -901,7 +892,7 @@ mod tests {
901892
.connect(scope.terminate);
902893
});
903894

904-
let mut promise = context.build(|commands| {
895+
let mut promise = context.command(|commands| {
905896
commands
906897
.request(2.0, workflow)
907898
.take_response()
@@ -916,7 +907,7 @@ mod tests {
916907
fn test_disposal() {
917908
let mut context = TestingContext::minimal_plugins();
918909

919-
let workflow = context.build_io_workflow(|scope, builder| {
910+
let workflow = context.spawn_io_workflow(|scope, builder| {
920911
scope.input.chain(builder)
921912
.map_block(duplicate)
922913
.map_block(add)
@@ -927,14 +918,13 @@ mod tests {
927918
.connect(scope.terminate);
928919
});
929920

930-
let mut promise = context.build(|commands| {
921+
let mut promise = context.command(|commands| {
931922
commands
932923
.request(2.0, workflow)
933924
.take_response()
934925
});
935926

936927
context.run_with_conditions(&mut promise, Duration::from_secs(2));
937-
dbg!(promise.peek());
938928
assert!(promise.peek().is_cancelled());
939929
assert!(context.no_unhandled_errors());
940930
}

0 commit comments

Comments
 (0)