Skip to content

Commit a0ffc4f

Browse files
committed
Finish scope spawning implementation
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 3e077a7 commit a0ffc4f

File tree

8 files changed

+197
-47
lines changed

8 files changed

+197
-47
lines changed

src/builder.rs

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use bevy::prelude::{Entity, Commands};
1919

2020
use crate::{
2121
Provider, UnusedTarget, StreamPack, Node, InputSlot, Output, StreamTargetMap,
22-
Buffer, BufferSettings, AddOperation, OperateBuffer,
22+
Buffer, BufferSettings, AddOperation, OperateBuffer, Scope, OperateScope,
23+
ScopeSettings, BeginCancel,
2324
};
2425

2526
pub(crate) mod connect;
@@ -34,7 +35,10 @@ pub(crate) use connect::*;
3435
/// please open an issue with a minimal reproducible example if you find a way
3536
/// to make it panic.
3637
pub struct Builder<'w, 's, 'a> {
38+
/// The scope that this builder is meant to help build
3739
pub(crate) scope: Entity,
40+
/// The target for cancellation workflows
41+
pub(crate) finish_scope_cancel: Entity,
3842
pub(crate) commands: &'a mut Commands<'w, 's>,
3943
}
4044

@@ -97,6 +101,112 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
97101
Buffer { scope: self.scope, source, _ignore: Default::default() }
98102
}
99103

104+
/// Create an isolated scope within the workflow. This can be useful for
105+
/// racing multiple branches, creating an uninterruptible segment within
106+
/// your workflow, or being able to run the same multiple instances of the
107+
/// same sub-workflow in parallel without them interfering with each other.
108+
///
109+
/// A value can be sent into the scope by connecting an [`Output`] of a node
110+
/// in the parent scope to the [`InputSlot`] of the node which gets returned
111+
/// by this function. Each time a value is sent into the scope, it will run
112+
/// through the workflow of the scope with a unique session ID. Even if
113+
/// multiple values are sent in from the same session, they will each be
114+
/// assigned their own unique session ID while inside of this scope.
115+
pub fn create_scope<Request, Response, Streams>(
116+
&mut self,
117+
settings: ScopeSettings,
118+
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder),
119+
) -> Node<Request, Response, Streams>
120+
where
121+
Request: 'static + Send + Sync,
122+
Response: 'static + Send + Sync,
123+
Streams: StreamPack,
124+
{
125+
let scope_id = self.commands.spawn(()).id();
126+
let exit_scope = self.commands.spawn(UnusedTarget).id();
127+
let operation = OperateScope::<Request, Response, Streams>::new(
128+
scope_id, Some(exit_scope), settings, self.commands,
129+
);
130+
self.commands.add(AddOperation::new(scope_id, operation));
131+
132+
let (stream_in, stream_out) = Streams::spawn_scope_streams(
133+
scope_id,
134+
self.scope,
135+
self.commands,
136+
);
137+
138+
let mut builder = Builder {
139+
scope: scope_id,
140+
finish_scope_cancel: operation.finish_cancel(),
141+
commands: self.commands,
142+
};
143+
144+
let scope = Scope {
145+
input: Output::new(scope_id, operation.enter_scope()),
146+
terminate: InputSlot::new(scope_id, operation.terminal()),
147+
streams: stream_in,
148+
};
149+
150+
build(scope, &mut builder);
151+
152+
Node {
153+
input: InputSlot::new(self.scope, scope_id),
154+
output: Output::new(self.scope, exit_scope),
155+
streams: stream_out,
156+
}
157+
}
158+
159+
/// It is possible for a scope to be cancelled before it terminates. Even a
160+
/// scope which is marked as uninterruptible will still experience a
161+
/// cancellation if its terminal node becomes unreachable.
162+
///
163+
/// This method allows you to define a workflow that branches off of this
164+
/// scope that will active if and only if the scope gets cancelled. The
165+
/// workflow will be activated once for each item in the buffer, and each
166+
/// activation will have its own session.
167+
///
168+
/// If you only want this cancellation workflow to activate once per
169+
/// cancelled session, then you should use a buffer that has a limit of one
170+
/// item.
171+
///
172+
/// The cancelled scope will only finish its cleanup after all cancellation
173+
/// workflows for the cancelled scope have finished, either by terminating
174+
/// or by being cancelled themselves.
175+
//
176+
// TODO(@mxgrey): Consider offering a setting to choose between whether each
177+
// buffer item gets its own session or whether they share a session.
178+
pub fn on_cancel<T: 'static + Send + Sync>(
179+
&mut self,
180+
from_buffer: Buffer<T>,
181+
settings: ScopeSettings,
182+
build: impl FnOnce(Scope<T, (), ()>, &mut Builder),
183+
) {
184+
let cancelling_scope_id = self.commands.spawn(()).id();
185+
let cancelling_operation = OperateScope::<T, (), ()>::new(
186+
cancelling_scope_id, Some(self.finish_scope_cancel), settings, self.commands,
187+
);
188+
self.commands.add(AddOperation::new(cancelling_scope_id, cancelling_operation));
189+
190+
let begin_cancel = self.commands.spawn(()).id();
191+
self.commands.add(AddOperation::new(
192+
begin_cancel,
193+
BeginCancel::<T>::new(self.scope, from_buffer.source, cancelling_scope_id),
194+
));
195+
let mut builder = Builder {
196+
scope: cancelling_scope_id,
197+
finish_scope_cancel: cancelling_operation.finish_cancel(),
198+
commands: self.commands,
199+
};
200+
201+
let scope = Scope {
202+
input: Output::new(cancelling_scope_id, cancelling_operation.enter_scope()),
203+
terminate: InputSlot::new(cancelling_scope_id, cancelling_operation.terminal()),
204+
streams: (),
205+
};
206+
207+
build(scope, &mut builder);
208+
}
209+
100210
/// Get the scope that this builder is building for
101211
pub fn scope(&self) -> Entity {
102212
self.scope

src/chain/unzip.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ use smallvec::SmallVec;
2121

2222
use crate::{
2323
UnusedTarget, ForkTargetStorage, OperationRequest, Input, ManageInput,
24-
ForkUnzip, AddOperation, FunnelInputStorage, OperationResult,
25-
SingleTargetStorage, OrBroken, OperationReachability, Output,
26-
OperationError, InspectInput, Chain, Builder,
24+
ForkUnzip, AddOperation, OperationResult, OrBroken, Output, Chain, Builder,
2725
};
2826

2927
/// A trait for response types that can be unzipped

src/handler.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ impl<'a> HandleRequest<'a> {
114114
self.world.get_resource_or_insert_with(|| UnhandledErrors::default())
115115
.setup
116116
.push(SetupFailure { broken_node: self.source, error });
117-
118117
}
119118
self.roster.queue(task_id);
120119
Ok(())

src/operation.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
use crate::{
1919
DeliveryLabelId, Cancel, ManageInput, InspectInput, UnhandledErrors,
20-
CancelFailure, Broken, ManageCancellation, ManageDisposal, SetupFailure,
21-
MiscellaneousFailure,
20+
Broken, ManageDisposal, SetupFailure, MiscellaneousFailure,
2221
try_emit_broken,
2322
};
2423

2524
use bevy::{
26-
prelude::{Entity, World, Component, Query},
27-
ecs::system::{Command, SystemParam},
25+
prelude::{Entity, World, Component},
26+
ecs::system::Command,
2827
};
2928

3029
use backtrace::Backtrace;

src/operation/operate_task.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ impl WakeQueue {
6868
}
6969
}
7070

71-
#[derive(Component)]
72-
pub(crate) struct PollTask(pub(crate) fn(Entity, &mut World, &mut OperationRoster));
73-
7471
#[derive(Component)]
7572
pub(crate) struct OperateTask<Response: 'static + Send + Sync> {
7673
source: Entity,

src/operation/scope.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ impl ParentSession {
4747
}
4848
}
4949

50-
#[derive(Clone)]
5150
pub(crate) struct OperateScope<Request, Response, Streams> {
5251
/// The first node that is inside of the scope
5352
enter_scope: Entity,
@@ -65,6 +64,20 @@ pub(crate) struct OperateScope<Request, Response, Streams> {
6564
_ignore: std::marker::PhantomData<(Request, Response, Streams)>,
6665
}
6766

67+
impl<Request, Response, Streams> Clone for OperateScope<Request, Response, Streams> {
68+
fn clone(&self) -> Self {
69+
Self {
70+
enter_scope: self.enter_scope,
71+
terminal: self.terminal,
72+
exit_scope: self.exit_scope,
73+
finish_cancel: self.finish_cancel,
74+
_ignore: Default::default(),
75+
}
76+
}
77+
}
78+
79+
impl<Request, Response, Streams> Copy for OperateScope<Request, Response, Streams> {}
80+
6881
impl<Request, Response, Streams> OperateScope<Request, Response, Streams> {
6982
pub(crate) fn terminal(&self) -> Entity {
7083
self.terminal
@@ -73,6 +86,10 @@ impl<Request, Response, Streams> OperateScope<Request, Response, Streams> {
7386
pub(crate) fn enter_scope(&self) -> Entity {
7487
self.enter_scope
7588
}
89+
90+
pub(crate) fn finish_cancel(&self) -> Entity {
91+
self.finish_cancel
92+
}
7693
}
7794

7895
pub(crate) struct ScopedSession {
@@ -308,7 +325,13 @@ where
308325
commands: &mut Commands,
309326
) -> Self {
310327
let enter_scope = commands.spawn(()).id();
328+
311329
let terminal = commands.spawn(()).id();
330+
commands.add(AddOperation::new(
331+
terminal,
332+
Terminate::<Response>::new()
333+
));
334+
312335
let finish_cancel = commands.spawn(()).id();
313336
commands.add(AddOperation::new(
314337
finish_cancel,
@@ -541,9 +564,6 @@ pub struct FinalizeScopeCleanup(pub(crate) fn(OperationCleanup) -> OperationResu
541564
#[derive(Component)]
542565
struct ScopeEntryStorage(Entity);
543566

544-
#[derive(Component)]
545-
struct CancelEntryStorage(Entity);
546-
547567
#[derive(Component)]
548568
pub struct FinishedStagingStorage(Entity);
549569

@@ -705,15 +725,6 @@ pub(crate) struct CancelledSession {
705725
status: CancelStatus,
706726
}
707727

708-
impl CancelledSession {
709-
pub(crate) fn new(
710-
parent_session: Entity,
711-
status: CancelStatus,
712-
) -> Self {
713-
Self { parent_session, status }
714-
}
715-
}
716-
717728
pub(crate) enum CancelStatus {
718729
Cleanup,
719730
Cancelled(Cancellation),
@@ -732,6 +743,12 @@ pub(crate) struct BeginCancel<T> {
732743
_ignore: std::marker::PhantomData<T>,
733744
}
734745

746+
impl<T> BeginCancel<T> {
747+
pub(crate) fn new(from_scope: Entity, buffer: Entity, target: Entity) -> Self {
748+
Self { from_scope, buffer, target, _ignore: Default::default() }
749+
}
750+
}
751+
735752
impl<T> Operation for BeginCancel<T>
736753
where
737754
T: 'static + Send + Sync,

0 commit comments

Comments
 (0)