Skip to content

Commit 586b5e8

Browse files
committed
Update cancel and disposal API
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 3d8ec82 commit 586b5e8

File tree

5 files changed

+402
-218
lines changed

5 files changed

+402
-218
lines changed

src/chain.rs

Lines changed: 134 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@ use std::future::Future;
1919

2020
use bevy::prelude::Entity;
2121

22+
use smallvec::SmallVec;
23+
24+
use std::error::Error;
25+
2226
use crate::{
2327
UnusedTarget, AddOperation, Node, InputSlot, Builder,
2428
ForkClone, StreamPack, Provider, ProvideOnce, Scope,
2529
AsMap, IntoBlockingMap, IntoAsyncMap, Output, Noop,
26-
ForkTargetStorage, StreamTargetMap, ScopeSettings,
27-
make_result_branching, make_cancel_filter_on_err,
28-
make_option_branching, make_cancel_filter_on_none,
30+
ForkTargetStorage, StreamTargetMap, ScopeSettings, CreateCancelFilter,
31+
CreateDisposalFilter,
32+
make_result_branching, make_option_branching,
2933
};
3034

31-
use smallvec::SmallVec;
32-
3335
pub mod fork_clone_builder;
3436
pub use fork_clone_builder::*;
3537

@@ -272,21 +274,21 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
272274
self.then(filter_provider).cancel_on_none()
273275
}
274276

275-
// /// Same as [`Chain::cancellation_filter`] but the chain will be disposed
276-
// /// instead of cancelled, so the workflow may continue if the termination
277-
// /// node can still be reached.
278-
// pub fn disposal_filter<ThenResponse, F>(
279-
// self,
280-
// filter_provider: F,
281-
// ) -> Chain<'w, 's, 'a, ThenResponse>
282-
// where
283-
// ThenResponse: 'static + Send + Sync,
284-
// F: Provider<Request = Response, Response = Option<ThenResponse>>,
285-
// F::Response: 'static + Send + Sync,
286-
// F::Streams: StreamPack,
287-
// {
288-
// self.cancellation_filter(filter_provider).dispose_on_cancel()
289-
// }
277+
/// Same as [`Chain::cancellation_filter`] but the chain will be disposed
278+
/// instead of cancelled, so the workflow may continue if the termination
279+
/// node can still be reached.
280+
pub fn disposal_filter<ThenResponse, F>(
281+
self,
282+
filter_provider: F,
283+
) -> Chain<'w, 's, 'a, 'b, ThenResponse>
284+
where
285+
ThenResponse: 'static + Send + Sync,
286+
F: Provider<Request = T, Response = Option<ThenResponse>>,
287+
F::Response: 'static + Send + Sync,
288+
F::Streams: StreamPack,
289+
{
290+
self.then(filter_provider).dispose_on_none()
291+
}
290292

291293
/// When the response is delivered, we will make a clone of it and
292294
/// simultaneously pass that clone along two different impulse chains: one
@@ -517,47 +519,98 @@ where
517519
(u, v)
518520
}
519521

520-
// /// If the result contains an [`Err`] value then the chain will be cancelled
521-
// /// from this link onwards. The next link in the chain will receive a `T` if
522-
// /// the chain is not cancelled.
523-
// ///
524-
// /// Note that when cancelling in this way, you will lose the `E` data inside
525-
// /// of the [`Err`] variant. If you want to divert the execution flow during
526-
// /// an [`Err`] result but still want to access the `E` data, then you can
527-
// /// use `Chain::branch_for_err` or `Chain::branch_result_zip` instead.
528-
// ///
529-
// /// ```
530-
// /// use bevy_impulse::{*, testing::*};
531-
// /// let mut context = TestingContext::minimal_plugins();
532-
// /// let mut promise = context.build(|commands| {
533-
// /// commands
534-
// /// .provide("hello")
535-
// /// .map_block(produce_err)
536-
// /// .cancel_on_err()
537-
// /// .take()
538-
// /// });
539-
// ///
540-
// /// context.run_while_pending(&mut promise);
541-
// /// assert!(promise.peek().is_cancelled());
542-
// /// ```
543-
// pub fn cancel_on_err(self) -> Chain<'w, 's, 'a, T> {
544-
// let source = self.target;
545-
// let target = self.commands.spawn(UnusedTarget).id();
546-
547-
// self.commands.add(AddOperation::new(
548-
// source,
549-
// make_cancel_filter_on_err::<T, E>(target),
550-
// ));
551-
552-
// Chain::new(self.scope, target, self.commands)
553-
// }
554-
555-
// /// If the result contains an [`Err`] value then the chain will be disposed
556-
// /// from this link onwards. Disposal means that the chain will terminate at
557-
// /// this point but no cancellation behavior will be triggered.
558-
// pub fn dispose_on_err(self) -> Chain<'w, 's, 'a, T> {
559-
// self.cancel_on_err().dispose_on_cancel()
560-
// }
522+
/// If the result contains an [`Err`] value then the entire scope that
523+
/// contains this operation will be immediately cancelled. If the scope is
524+
/// within a node of an outer workflow, then the node will emit a disposal
525+
/// for its outer workflow. Otherwise if this is the root scope of a workflow
526+
/// then the whole workflow is immediately cancelled. This effect will happen
527+
/// even if the scope is set to be uninterruptible.
528+
///
529+
/// This operation only works for results with an [`Err`] variant that
530+
/// implements the [`Error`] trait. If your [`Err`] variant does not implement
531+
/// that trait, then you can use [`Self::cancel_on_quiet_err`] instead.
532+
///
533+
/// ```
534+
/// use bevy_impulse::{*, testing::*};
535+
/// let mut context = TestingContext::minimal_plugins();
536+
/// let mut promise = context.build(|commands| {
537+
/// commands
538+
/// .provide("hello")
539+
/// .map_block(produce_err)
540+
/// .cancel_on_err()
541+
/// .take()
542+
/// });
543+
///
544+
/// context.run_while_pending(&mut promise);
545+
/// assert!(promise.peek().is_cancelled());
546+
/// ```
547+
#[must_use]
548+
pub fn cancel_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
549+
where
550+
E: Error,
551+
{
552+
let source = self.target;
553+
let target = self.builder.commands.spawn(UnusedTarget).id();
554+
555+
self.builder.commands.add(AddOperation::new(
556+
source,
557+
CreateCancelFilter::on_err::<T, E>(target),
558+
));
559+
560+
Chain::new(target, self.builder)
561+
}
562+
563+
/// Same as [`Self::cancel_on_err`] except it also works for [`Err`] variants
564+
/// that do not implement [`Error`]. The catch is that their error message
565+
/// will not be included in the [`Filtered`](crate::Filtered) information
566+
/// that gets propagated outward.
567+
#[must_use]
568+
pub fn cancel_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
569+
let source = self.target;
570+
let target = self.builder.commands.spawn(UnusedTarget).id();
571+
572+
self.builder.commands.add(AddOperation::new(
573+
source,
574+
CreateCancelFilter::on_quiet_err::<T, E>(target),
575+
));
576+
577+
Chain::new(target, self.builder)
578+
}
579+
580+
/// If the output contains an [`Err`] value then the output will be disposed.
581+
///
582+
/// Disposal means that the node that the output is connected to will simply
583+
/// not be triggered, but the workflow is not necessarily cancelled. If a
584+
/// disposal makes it impossible for the workflow to terminate, then the
585+
/// workflow will be cancelled immediately.
586+
#[must_use]
587+
pub fn dispose_on_err(self) -> Chain<'w, 's, 'a, 'b, T>
588+
where
589+
E: Error,
590+
{
591+
let source = self.target;
592+
let target = self.builder.commands.spawn(UnusedTarget).id();
593+
594+
self.builder.commands.add(AddOperation::new(
595+
source,
596+
CreateDisposalFilter::on_err::<T, E>(target),
597+
));
598+
599+
Chain::new(target, self.builder)
600+
}
601+
602+
#[must_use]
603+
pub fn dispose_on_quiet_err(self) -> Chain<'w, 's, 'a, 'b, T> {
604+
let source = self.target;
605+
let target = self.builder.commands.spawn(UnusedTarget).id();
606+
607+
self.builder.commands.add(AddOperation::new(
608+
source,
609+
CreateDisposalFilter::on_quiet_err::<T, E>(target),
610+
));
611+
612+
Chain::new(target, self.builder)
613+
}
561614
}
562615

563616
impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, Option<T>>
@@ -626,7 +679,26 @@ where
626679

627680
self.builder.commands.add(AddOperation::new(
628681
source,
629-
make_cancel_filter_on_none::<T>(target),
682+
CreateCancelFilter::on_none::<T>(target),
683+
));
684+
685+
Chain::new(target, self.builder)
686+
}
687+
688+
/// If the output contains [`None`] value then the output will be disposed.
689+
///
690+
/// Disposal means that the node that the output is connected to will simply
691+
/// not be triggered, but the workflow is not necessarily cancelled. If a
692+
/// disposal makes it impossible for the workflow to terminate, then the
693+
/// workflow will be cancelled immediately.
694+
#[must_use]
695+
pub fn dispose_on_none(self) -> Chain<'w, 's, 'a, 'b, T> {
696+
let source = self.target;
697+
let target = self.builder.commands.spawn(UnusedTarget).id();
698+
699+
self.builder.commands.add(AddOperation::new(
700+
source,
701+
CreateDisposalFilter::on_none::<T>(target),
630702
));
631703

632704
Chain::new(target, self.builder)

src/disposal.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ impl Disposal {
6666
) -> Disposal {
6767
Supplanted { supplanted_at_node, supplanted_by_node, supplanting_session }.into()
6868
}
69+
70+
pub fn filtered(filtered_at_node: Entity, reason: Option<anyhow::Error>) -> Self {
71+
Filtered { filtered_at_node, reason }.into()
72+
}
6973
}
7074

7175
#[derive(Debug)]

src/operation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ use smallvec::SmallVec;
4040
mod branching;
4141
pub(crate) use branching::*;
4242

43-
mod cancel_filter;
44-
pub(crate) use cancel_filter::*;
43+
mod filter;
44+
pub(crate) use filter::*;
4545

4646
mod fork_clone;
4747
pub(crate) use fork_clone::*;

0 commit comments

Comments
 (0)