Skip to content

Commit 0edfdb2

Browse files
committed
Implement uninterruptibility for scopes
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 586b5e8 commit 0edfdb2

File tree

4 files changed

+78
-57
lines changed

4 files changed

+78
-57
lines changed

src/builder.rs

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
123123
Streams: StreamPack,
124124
{
125125
let scope_id = self.commands.spawn(()).id();
126-
self.create_scope_impl(scope_id, settings, build)
126+
let exit_scope = self.commands.spawn(UnusedTarget).id();
127+
self.create_scope_impl(scope_id, exit_scope, settings, build)
127128
}
128129

129130
/// It is possible for a scope to be cancelled before it terminates. Even a
@@ -152,32 +153,21 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
152153
build: impl FnOnce(Scope<T, (), ()>, &mut Builder),
153154
) {
154155
let cancelling_scope_id = self.commands.spawn(()).id();
155-
let cancelling_operation = OperateScope::<T, (), ()>::new(
156-
cancelling_scope_id, Some(self.finish_scope_cancel), settings, self.commands,
156+
let _ = self.create_scope_impl::<T, (), ()>(
157+
cancelling_scope_id,
158+
self.finish_scope_cancel,
159+
settings,
160+
build,
157161
);
158-
self.commands.add(AddOperation::new(cancelling_scope_id, cancelling_operation));
159162

160163
let begin_cancel = self.commands.spawn(()).id();
161164
self.commands.add(AddOperation::new(
162165
begin_cancel,
163166
BeginCancel::<T>::new(self.scope, from_buffer.source, cancelling_scope_id),
164167
));
165-
let mut builder = Builder {
166-
scope: cancelling_scope_id,
167-
finish_scope_cancel: cancelling_operation.finish_cancel(),
168-
commands: self.commands,
169-
};
170-
171-
let scope = Scope {
172-
input: Output::new(cancelling_scope_id, cancelling_operation.enter_scope()),
173-
terminate: InputSlot::new(cancelling_scope_id, cancelling_operation.terminal()),
174-
streams: (),
175-
};
176-
177-
build(scope, &mut builder);
178168
}
179169

180-
/// Get the scope that this builder is building for
170+
/// Get the scope that this builder is building for.
181171
pub fn scope(&self) -> Entity {
182172
self.scope
183173
}
@@ -191,6 +181,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
191181
pub(crate) fn create_scope_impl<Request, Response, Streams>(
192182
&mut self,
193183
scope_id: Entity,
184+
exit_scope: Entity,
194185
settings: ScopeSettings,
195186
build: impl FnOnce(Scope<Request, Response, Streams>, &mut Builder),
196187
) -> Node<Request, Response, Streams>
@@ -199,10 +190,12 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
199190
Response: 'static + Send + Sync,
200191
Streams: StreamPack,
201192
{
202-
let exit_scope = self.commands.spawn(UnusedTarget).id();
203193
let operation = OperateScope::<Request, Response, Streams>::new(
204194
scope_id, Some(exit_scope), settings, self.commands,
205195
);
196+
let enter_scope = operation.enter_scope();
197+
let finish_scope_cancel = operation.finish_cancel();
198+
let terminal = operation.terminal();
206199
self.commands.add(AddOperation::new(scope_id, operation));
207200

208201
let (stream_in, stream_out) = Streams::spawn_scope_streams(
@@ -213,13 +206,13 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
213206

214207
let mut builder = Builder {
215208
scope: scope_id,
216-
finish_scope_cancel: operation.finish_cancel(),
209+
finish_scope_cancel,
217210
commands: self.commands,
218211
};
219212

220213
let scope = Scope {
221-
input: Output::new(scope_id, operation.enter_scope()),
222-
terminate: InputSlot::new(scope_id, operation.terminal()),
214+
input: Output::new(scope_id, enter_scope),
215+
terminate: InputSlot::new(scope_id, terminal),
223216
streams: stream_in,
224217
};
225218

src/chain.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,9 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
231231
Response: 'static + Send + Sync,
232232
Streams: StreamPack,
233233
{
234+
let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
234235
self.builder.create_scope_impl::<T, Response, Streams>(
235-
self.target, settings, build,
236+
self.target, exit_scope, settings, build,
236237
).output.chain(self.builder)
237238
}
238239

@@ -248,8 +249,9 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
248249
Response: 'static + Send + Sync,
249250
Streams: StreamPack,
250251
{
252+
let exit_scope = self.builder.commands.spawn(UnusedTarget).id();
251253
self.builder.create_scope_impl::<T, Response, Streams>(
252-
self.target, settings, build,
254+
self.target, exit_scope, settings, build,
253255
)
254256
}
255257

src/operation/scope.rs

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ pub(crate) struct OperateScope<Request, Response, Streams> {
6161
exit_scope: Option<Entity>,
6262
/// Cancellation finishes at this node
6363
finish_cancel: Entity,
64+
/// Settings for the scope
65+
settings: ScopeSettings,
6466
_ignore: std::marker::PhantomData<(Request, Response, Streams)>,
6567
}
6668

@@ -71,13 +73,12 @@ impl<Request, Response, Streams> Clone for OperateScope<Request, Response, Strea
7173
terminal: self.terminal,
7274
exit_scope: self.exit_scope,
7375
finish_cancel: self.finish_cancel,
76+
settings: self.settings.clone(),
7477
_ignore: Default::default(),
7578
}
7679
}
7780
}
7881

79-
impl<Request, Response, Streams> Copy for OperateScope<Request, Response, Streams> {}
80-
8182
impl<Request, Response, Streams> OperateScope<Request, Response, Streams> {
8283
pub(crate) fn terminal(&self) -> Entity {
8384
self.terminal
@@ -107,22 +108,35 @@ impl ScopedSession {
107108
}
108109
}
109110

111+
#[derive(Component)]
112+
struct ScopeSettingsStorage(ScopeSettings);
113+
110114
#[derive(Component)]
111115
pub(crate) enum ScopedSessionStatus {
112116
Ongoing,
113117
Finished,
118+
/// The scope was asked to cleanup from an external source, but it has an
119+
/// uninterruptible setting. We are waiting for a termination or internal
120+
/// cancel to trigger before doing a cleanup.
121+
DeferredCleanup,
122+
/// The scope has already begun the cleanup process
114123
Cleanup,
115124
Cancelled(Cancellation),
116125
}
117126

118127
impl ScopedSessionStatus {
119-
fn to_cleanup(&mut self) -> bool {
128+
fn to_cleanup(&mut self, uninterruptible: bool) -> bool {
120129
if matches!(self, Self::Cleanup) {
121130
return false;
122131
}
123132

124-
*self = Self::Cleanup;
125-
true
133+
if uninterruptible {
134+
*self = Self::DeferredCleanup;
135+
return false;
136+
} else {
137+
*self = Self::Cleanup;
138+
return true;
139+
}
126140
}
127141

128142
pub(crate) fn to_finished(&mut self) -> bool {
@@ -132,11 +146,19 @@ impl ScopedSessionStatus {
132146
*self = Self::Finished;
133147
return true;
134148
}
149+
150+
if matches!(self, Self::DeferredCleanup) {
151+
// We've been waiting for the scope to finish before beginning
152+
// cleanup because the scope is uninterruptible.
153+
*self = Self::Cleanup;
154+
return true;
155+
}
156+
135157
false
136158
}
137159

138160
fn to_cancelled(&mut self, cancellation: Cancellation) -> bool {
139-
if matches!(self, Self::Ongoing) {
161+
if matches!(self, Self::Ongoing | Self::DeferredCleanup) {
140162
*self = Self::Cancelled(cancellation);
141163
return true;
142164
}
@@ -169,7 +191,6 @@ where
169191
source_mut.insert((
170192
InputBundle::<Request>::new(),
171193
ScopeEntryStorage(self.enter_scope),
172-
FinishedStagingStorage(self.terminal),
173194
ScopeContents::new(),
174195
ScopedSessionStorage::default(),
175196
TerminalStorage(self.terminal),
@@ -178,6 +199,7 @@ where
178199
FinalizeScopeCleanup(Self::finalize_scope_cleanup),
179200
BeginCancelStorage::default(),
180201
FinishCancelStorage(self.finish_cancel),
202+
ScopeSettingsStorage(self.settings),
181203
));
182204

183205
if let Some(exit_scope) = self.exit_scope {
@@ -218,14 +240,17 @@ where
218240
OperationCleanup { source, session, world, roster }: OperationCleanup
219241
) -> OperationResult {
220242
let mut source_mut = world.get_entity_mut(source).or_broken()?;
243+
let uninterruptible = source_mut.get::<ScopeSettingsStorage>().or_broken()?.0
244+
.is_uninterruptible();
245+
221246
let pairs: SmallVec<[_; 16]> = source_mut
222247
.get_mut::<ScopedSessionStorage>()
223248
.or_broken()?
224249
.0
225250
.iter_mut()
226251
.filter(|pair| pair.parent_session == session)
227252
.filter_map(|p| {
228-
if p.status.to_cleanup() {
253+
if p.status.to_cleanup(uninterruptible) {
229254
Some(p.scoped_session)
230255
} else {
231256
None
@@ -241,15 +266,15 @@ where
241266

242267
for scoped_session in pairs {
243268
let source_ref = world.get_entity(source).or_broken()?;
244-
let staging_node = source_ref.get::<FinishedStagingStorage>().or_broken()?.0;
269+
let terminal = source_ref.get::<TerminalStorage>().or_broken()?.0;
245270
let nodes = source_ref.get::<ScopeContents>().or_broken()?.nodes().clone();
246271
for node in nodes {
247272
OperationCleanup { source: node, session: scoped_session, world, roster }.clean();
248273
}
249274

250275
// OperateScope::cleanup gets called when the entire scope is being cancelled
251276
// so we need to clear out the staging node as well.
252-
OperationCleanup { source: staging_node, session: scoped_session, world, roster }.clean();
277+
OperationCleanup { source: terminal, session: scoped_session, world, roster }.clean();
253278
}
254279

255280
Ok(())
@@ -261,7 +286,6 @@ where
261286
}
262287

263288
let source_ref = reachability.world.get_entity(reachability.source).or_broken()?;
264-
let staging = source_ref.get::<FinishedStagingStorage>().or_broken()?.0;
265289

266290
if let Some(pair) = source_ref
267291
.get::<ScopedSessionStorage>().or_broken()?
@@ -272,13 +296,9 @@ where
272296
pair.scoped_session,
273297
reachability.source,
274298
reachability.world,
275-
&mut visited
299+
&mut visited,
276300
);
277301

278-
if scoped_reachability.check_upstream(staging)? {
279-
return Ok(true);
280-
}
281-
282302
let terminal = source_ref.get::<TerminalStorage>().or_broken()?.0;
283303
if scoped_reachability.check_upstream(terminal)? {
284304
return Ok(true);
@@ -343,6 +363,7 @@ where
343363
terminal,
344364
exit_scope,
345365
finish_cancel,
366+
settings,
346367
_ignore: Default::default(),
347368
};
348369

@@ -478,8 +499,14 @@ where
478499
// so we'll return a broken error here.
479500
None.or_broken()?;
480501
}
502+
ScopedSessionStatus::DeferredCleanup => {
503+
// We shouldn't be in this function if the session is in a
504+
// deferred cleanup state. We should be waiting for a finish
505+
// or a cancellation to occur.
506+
None.or_broken()?;
507+
}
481508
ScopedSessionStatus::Finished => {
482-
let staging = source_mut.get::<FinishedStagingStorage>().or_broken()?.0;
509+
let terminal = source_mut.get::<TerminalStorage>().or_broken()?.0;
483510
let (target, blocker) = source_mut.get_mut::<ExitTargetStorage>()
484511
.and_then(|mut storage| storage.map.remove(&scoped_session))
485512
.map(|exit| (exit.target, exit.blocker))
@@ -491,7 +518,7 @@ where
491518
.or_broken()?;
492519

493520
let response = clean.world
494-
.get_mut::<Staging<Response>>(staging).or_broken()?.0
521+
.get_mut::<Staging<Response>>(terminal).or_broken()?.0
495522
.remove(&clean.session).or_broken()?;
496523
clean.world.get_entity_mut(target).or_broken()?.give_input(
497524
pair.parent_session, response, clean.roster,
@@ -564,15 +591,6 @@ pub struct FinalizeScopeCleanup(pub(crate) fn(OperationCleanup) -> OperationResu
564591
#[derive(Component)]
565592
struct ScopeEntryStorage(Entity);
566593

567-
#[derive(Component)]
568-
pub struct FinishedStagingStorage(Entity);
569-
570-
impl FinishedStagingStorage {
571-
pub fn get(&self) -> Entity {
572-
self.0
573-
}
574-
}
575-
576594
pub(crate) struct Terminate<T> {
577595
_ignore: std::marker::PhantomData<T>,
578596
}
@@ -600,6 +618,7 @@ where
600618
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
601619
world.entity_mut(source).insert((
602620
InputBundle::<T>::new(),
621+
SingleInputStorage::empty(),
603622
Staging::<T>::new(),
604623
));
605624
Ok(())
@@ -651,13 +670,17 @@ where
651670
Ok(())
652671
}
653672

654-
fn is_reachable(reachability: OperationReachability) -> ReachabilityResult {
673+
fn is_reachable(mut reachability: OperationReachability) -> ReachabilityResult {
655674
if reachability.has_input::<T>()? {
656675
return Ok(true);
657676
}
658677

659678
let staging = reachability.world.get::<Staging<T>>(reachability.source).or_broken()?;
660-
Ok(staging.0.contains_key(&reachability.session))
679+
if staging.0.contains_key(&reachability.session) {
680+
return Ok(true);
681+
}
682+
683+
SingleInputStorage::is_reachable(&mut reachability)
661684
}
662685
}
663686

src/workflow.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ pub enum DeliverySettings {
153153
}
154154

155155
/// Settings which determine how the top-level scope of the workflow behaves.
156-
#[derive(Default)]
156+
#[derive(Default, Clone)]
157157
pub struct ScopeSettings {
158158
/// Should we prevent the scope from being interrupted (e.g. cancelled)?
159159
/// False by default, meaning by default scopes can be cancelled or
@@ -198,18 +198,21 @@ impl<'w, 's> SpawnWorkflow for Commands<'w, 's> {
198198
let scope = OperateScope::<Request, Response, Streams>::new(
199199
scope_id, None, settings.scope, self,
200200
);
201+
let enter_scope = scope.enter_scope();
202+
let finish_scope_cancel = scope.finish_cancel();
203+
let terminal = scope.terminal();
201204
self.add(AddOperation::new(scope_id, scope));
202205
let mut builder = Builder {
203206
scope: scope_id,
204-
finish_scope_cancel: scope.finish_cancel(),
207+
finish_scope_cancel,
205208
commands: self
206209
};
207210

208211
let streams = Streams::spawn_workflow_streams(&mut builder);
209212

210213
let scope = Scope {
211-
input: Output::new(scope_id, scope.enter_scope()),
212-
terminate: InputSlot::new(scope_id, scope.terminal()),
214+
input: Output::new(scope_id, enter_scope),
215+
terminate: InputSlot::new(scope_id, terminal),
213216
streams,
214217
};
215218

0 commit comments

Comments
 (0)