Skip to content

Commit b088fe1

Browse files
committed
Fixing implementation of cancellation
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent b49212a commit b088fe1

File tree

11 files changed

+117
-53
lines changed

11 files changed

+117
-53
lines changed

src/cancel.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl From<Broken> for CancellationCause {
128128
#[derive(Debug, Clone)]
129129
pub struct Cancel {
130130
/// The entity that triggered the cancellation
131-
pub(crate) source: Entity,
131+
pub(crate) origin: Entity,
132132
/// The target of the cancellation
133133
pub(crate) target: Entity,
134134
/// The session which is being cancelled for the target
@@ -138,6 +138,11 @@ pub struct Cancel {
138138
}
139139

140140
impl Cancel {
141+
pub(crate) fn for_target(mut self, target: Entity) -> Self {
142+
self.target = target;
143+
self
144+
}
145+
141146
pub(crate) fn trigger(
142147
self,
143148
world: &mut World,
@@ -267,7 +272,7 @@ pub fn try_emit_broken(
267272
.push(CancelFailure {
268273
error: OperationError::Broken(Some(Backtrace::new())),
269274
cancel: Cancel {
270-
source,
275+
origin: source,
271276
target: source,
272277
session: None,
273278
cancellation: Broken { node: source, backtrace }.into(),
@@ -287,16 +292,16 @@ fn try_emit_cancel(
287292
// The cancellation is happening inside a scope, so we should cancel
288293
// the scope
289294
let scope = scope.get();
290-
roster.cancel(Cancel { source, target: scope, session, cancellation });
295+
roster.cancel(Cancel { origin: source, target: scope, session, cancellation });
291296
} else if let Some(session) = session {
292297
// The cancellation is not happening inside a scope, so we should tell
293298
// the session itself to cancel.
294-
roster.cancel(Cancel { source, target: session, session: Some(session), cancellation });
299+
roster.cancel(Cancel { origin: source, target: session, session: Some(session), cancellation });
295300
} else {
296301
return Err(CancelFailure::new(
297302
OperationError::Broken(Some(Backtrace::new())),
298303
Cancel {
299-
source,
304+
origin: source,
300305
target: source,
301306
session,
302307
cancellation,

src/chain.rs

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,22 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
237237
).output.chain(self.builder)
238238
}
239239

240+
/// Simplified version of [`Self::then_scope`] limited to a simple input and
241+
/// output. This does not support streams and only uses default scope
242+
/// settings.
243+
///
244+
/// Unlike `then_scope`, this function can infer the types for the generics
245+
/// so you don't need to explicitly specify them.
246+
pub fn then_io_scope<Response>(
247+
self,
248+
build: impl FnOnce(Scope<T, Response, ()>, &mut Builder),
249+
) -> Chain<'w, 's, 'a, 'b, Response>
250+
where
251+
Response: 'static + Send + Sync,
252+
{
253+
self.then_scope(ScopeSettings::default(), build)
254+
}
255+
240256
/// From the current target in the chain, build a [scoped](Scope) workflow
241257
/// and then get back a node that represents that scoped workflow.
242258
#[must_use]
@@ -255,6 +271,22 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
255271
)
256272
}
257273

274+
/// Simplified version of [`Self::then_scope_node`] limited to a simple
275+
/// input and output. This does not support streams and only uses default
276+
/// scope settings.
277+
///
278+
/// Unlike `then_scope_node`, this function can infer the types for the
279+
/// generics so you don't need to explicitly specify them.
280+
pub fn then_io_scope_node<Response>(
281+
self,
282+
build: impl FnOnce(Scope<T, Response, ()>, &mut Builder),
283+
) -> Node<T, Response, ()>
284+
where
285+
Response: 'static + Send + Sync,
286+
{
287+
self.then_scope_node(ScopeSettings::default(), build)
288+
}
289+
258290
/// Apply a [`Provider`] that filters the response by returning an [`Option`].
259291
/// If the filter returns [`None`] then a cancellation is triggered.
260292
/// Otherwise the chain continues with the value given inside [`Some`].
@@ -839,36 +871,38 @@ mod tests {
839871
assert!(context.no_unhandled_errors());
840872
}
841873

842-
// #[test]
843-
// fn test_dispose_on_cancel() {
844-
// let mut context = TestingContext::minimal_plugins();
845-
846-
// let mut promise = context.build(|commands| {
847-
// commands
848-
// .provide("hello")
849-
// .map_block(produce_err)
850-
// .cancel_on_err()
851-
// .dispose_on_cancel()
852-
// .take()
853-
// });
854-
855-
// context.run_while_pending(&mut promise);
856-
// assert!(promise.peek().is_disposed());
857-
858-
// // If we flip the order of cancel_on_err and dispose_on_cancel then the
859-
// // outcome should be a cancellation instead of a disposal, because the
860-
// // disposal was requested for a part of the chain that did not get
861-
// // cancelled.
862-
// let mut promise = context.build(|commands| {
863-
// commands
864-
// .provide("hello")
865-
// .map_block(produce_err)
866-
// .dispose_on_cancel()
867-
// .cancel_on_err()
868-
// .take()
869-
// });
870-
871-
// context.run_while_pending(&mut promise);
872-
// assert!(promise.peek().is_cancelled());
873-
// }
874+
#[test]
875+
fn test_cancel_on_none() {
876+
let mut context = TestingContext::minimal_plugins();
877+
878+
dbg!();
879+
let workflow = context.build_io_workflow(|scope, builder| {
880+
scope.input.chain(builder)
881+
.map_block(duplicate)
882+
.map_block(print_debug(format!("{}", line!())))
883+
.map_block(add)
884+
.map_block(print_debug(format!("{}", line!())))
885+
.map_block(produce_none)
886+
.map_block(print_debug(format!("{}", line!())))
887+
.cancel_on_none()
888+
.map_block(print_debug(format!("{}", line!())))
889+
.map_block(duplicate)
890+
.map_block(print_debug(format!("{}", line!())))
891+
.map_block(add)
892+
.map_block(print_debug(format!("{}", line!())))
893+
.connect(scope.terminate);
894+
});
895+
896+
dbg!();
897+
let mut promise = context.build(|commands| {
898+
commands
899+
.request(2.0, workflow)
900+
.take_response()
901+
});
902+
903+
dbg!();
904+
context.run_with_conditions(&mut promise, Duration::from_secs(2));
905+
dbg!(context.get_unhandled_errors());
906+
assert!(promise.peek().is_cancelled());
907+
}
874908
}

src/flush.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ fn garbage_cleanup(world: &mut World, roster: &mut OperationRoster) {
108108
}
109109

110110
while let Some(cancel) = roster.cancel.pop_front() {
111-
cancel.trigger(world, roster);
111+
dbg!(cancel).trigger(world, roster);
112112
}
113113
}
114114

src/impulse.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::future::Future;
2323

2424
use crate::{
2525
Promise, ProvideOnce, StreamPack, IntoBlockingMapOnce, IntoAsyncMapOnce,
26-
AsMapOnce, UnusedTarget, StreamTargetMap,
26+
AsMapOnce, UnusedTarget, StreamTargetMap, Cancellable,
2727
};
2828

2929
mod detach;
@@ -127,7 +127,9 @@ where
127127

128128
// We should automatically delete the previous step in the chain once
129129
// this one is finished.
130-
self.commands.entity(source).set_parent(target);
130+
self.commands.entity(source)
131+
.insert(Cancellable::new(cancel_impulse))
132+
.set_parent(target);
131133
provider.connect(None, source, target, self.commands);
132134
Impulse {
133135
source,
@@ -137,7 +139,6 @@ where
137139
}
138140
}
139141

140-
141142
/// Apply a one-time callback whose input is the Response of the current
142143
/// target. The output of the map will become the Response of the returned
143144
/// target.

src/impulse/internal.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ fn perform_impulse<I: Impulsive>(
8989
.push(CancelFailure {
9090
error: OperationError::Broken(Some(Backtrace::new())),
9191
cancel: Cancel {
92-
source,
92+
origin: source,
9393
target: source,
9494
session: None,
9595
cancellation: Broken { node: source, backtrace }.into(),
@@ -100,20 +100,24 @@ fn perform_impulse<I: Impulsive>(
100100
}
101101
}
102102

103-
fn cancel_impulse(
103+
pub(crate) fn cancel_impulse(
104104
OperationCancel { cancel, world, roster }: OperationCancel,
105105
) -> OperationResult {
106+
dbg!(cancel.origin);
106107
// We cancel an impulse by travelling to its terminal and
107-
let mut terminal = cancel.source;
108+
let mut terminal = dbg!(cancel.target);
108109
loop {
110+
dbg!(terminal);
109111
let Some(target) = world.get::<SingleTargetStorage>(terminal) else {
110112
break;
111113
};
112114
terminal = target.get();
113115
}
114116

115117
if let Some(on_cancel) = world.get::<OnTerminalCancelled>(terminal) {
118+
dbg!(terminal);
116119
let on_cancel = on_cancel.0;
120+
let cancel = cancel.for_target(terminal);
117121
match on_cancel(OperationCancel { cancel: cancel.clone(), world, roster }) {
118122
Ok(()) | Err(OperationError::NotReady) => {
119123
// Do nothing
@@ -128,6 +132,8 @@ fn cancel_impulse(
128132
});
129133
}
130134
}
135+
} else {
136+
dbg!(terminal);
131137
}
132138

133139
if let Some(terminal_mut) = world.get_entity_mut(terminal) {

src/impulse/taken.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ impl<T> TakenResponse<T> {
3939

4040
impl<T: 'static + Send + Sync> Impulsive for TakenResponse<T> {
4141
fn setup(mut self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
42+
dbg!(source);
4243
let lifecycle_sender = world
4344
.get_resource_or_insert_with(|| ImpulseLifecycleChannel::default())
4445
.sender.clone();

src/input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ impl<T: 'static + Send + Sync> Command for InputCommand<T> {
338338
backtrace: Some(Backtrace::new()),
339339
});
340340
let cancel = Cancel {
341-
source: self.target,
341+
origin: self.target,
342342
target: self.session,
343343
session: Some(self.session),
344344
cancellation: Cancellation::from_cause(cause)

src/operation/scope.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,8 @@ where
326326
) -> ScopeEndpoints {
327327
let enter_scope = commands.spawn(EntryForScope(scope_id)).id();
328328
let terminal = commands.spawn(()).set_parent(scope_id).id();
329-
let finish_scope_cancel = commands.spawn(()).set_parent(scope_id).id();
329+
let finish_scope_cancel = commands.spawn(FinishCancelForScope(scope_id))
330+
.set_parent(scope_id).id();
330331

331332
let scope = OperateScope::<Request, Response, Streams> {
332333
enter_scope,
@@ -367,7 +368,7 @@ where
367368

368369
fn receive_cancel(
369370
OperationCancel {
370-
cancel: Cancel { source: _origin, target: source, session, cancellation },
371+
cancel: Cancel { origin: _origin, target: source, session, cancellation },
371372
world,
372373
roster
373374
}: OperationCancel,
@@ -599,9 +600,14 @@ pub struct FinalizeScopeCleanup(pub(crate) fn(OperationCleanup) -> OperationResu
599600
#[derive(Component)]
600601
pub(crate) struct ScopeEntryStorage(pub(crate) Entity);
601602

603+
/// Store the scope entity for the first node within a scope
602604
#[derive(Component)]
603605
pub(crate) struct EntryForScope(pub(crate) Entity);
604606

607+
/// Store the scope entity for the FinishCancel operation within a scope
608+
#[derive(Component)]
609+
struct FinishCancelForScope(Entity);
610+
605611
pub(crate) struct Terminate<T> {
606612
scope: Entity,
607613
_ignore: std::marker::PhantomData<T>,
@@ -915,7 +921,7 @@ impl Operation for FinishCancel {
915921
impl FinishCancel {
916922
fn receive_cancel(
917923
OperationCancel {
918-
cancel: Cancel { source: _origin, target: source, session, cancellation },
924+
cancel: Cancel { origin: _origin, target: source, session, cancellation },
919925
world,
920926
roster
921927
}: OperationCancel,
@@ -1003,7 +1009,7 @@ impl FinishCancel {
10031009
OperationRequest { source, world, roster }: OperationRequest,
10041010
) -> OperationResult {
10051011
let mut source_mut = world.get_entity_mut(source).or_broken()?;
1006-
let scope = source_mut.get::<ScopeStorage>().or_broken()?.get();
1012+
let scope = source_mut.get::<FinishCancelForScope>().or_broken()?.0;
10071013
let mut awaiting = source_mut.get_mut::<AwaitingCancelStorage>().or_broken()?;
10081014
let a = awaiting.0.get(index).or_broken()?;
10091015
let parent_session = a.cancelled.parent_session;

src/request.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::future::Future;
2121

2222
use crate::{
2323
UnusedTarget, StreamPack, ProvideOnce, IntoBlockingMapOnce, IntoAsyncMap,
24-
Impulse, Detached, InputCommand,
24+
Impulse, Detached, InputCommand, Cancellable, cancel_impulse,
2525
};
2626

2727
/// Extensions for creating impulse chains by making a request to a provider or
@@ -91,7 +91,8 @@ impl<'w, 's> RequestExt<'w, 's> for Commands<'w, 's> {
9191
UnusedTarget,
9292
)).id();
9393

94-
let source = self.spawn(())
94+
let source = self
95+
.spawn(Cancellable::new(cancel_impulse))
9596
// We set the parent of this source to the target so that when the
9697
// target gets despawned, this will also be despawned.
9798
.set_parent(target)

src/service/workflow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ where
146146
// This workflow is already running and we need to stop it at the
147147
// scope level
148148
roster.cancel(Cancel {
149-
source,
149+
origin: source,
150150
target: workflow.scope,
151151
session: Some(stop.session),
152152
cancellation: Cancellation::supplanted(

0 commit comments

Comments
 (0)