Skip to content

Commit 14dabe3

Browse files
committed
Reworking Chain
Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
1 parent 1afe576 commit 14dabe3

File tree

1 file changed

+24
-114
lines changed

1 file changed

+24
-114
lines changed

src/chain.rs

Lines changed: 24 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
use std::future::Future;
1919

2020
use crate::{
21-
UnusedTarget, Receive, AddOperation,
22-
ForkClone, Chosen, ApplyLabel, StreamPack, Provider,
21+
UnusedTarget, AddOperation,
22+
ForkClone, StreamPack, Provider, ProvideOnce,
2323
AsMap, IntoBlockingMap, IntoAsyncMap, EnterCancel,
24-
DetachDependency, DisposeOnCancel, Promise, Noop,
24+
DetachDependency, Output, Noop,
2525
Cancelled, ForkTargetStorage,
2626
make_result_branching, make_cancel_filter_on_err,
2727
make_option_branching, make_cancel_filter_on_none,
@@ -58,113 +58,25 @@ pub use unzip::*;
5858
/// If you do not select one of the above then the service request will be
5959
/// cancelled without ever attempting to run.
6060
#[must_use]
61-
pub struct Chain<'w, 's, 'a, Response, Streams, M> {
61+
pub struct Chain<'w, 's, 'a, Response> {
6262
source: Entity,
6363
target: Entity,
64+
scope: Entity,
6465
commands: &'a mut Commands<'w, 's>,
65-
response: std::marker::PhantomData<Response>,
66-
streams: std::marker::PhantomData<Streams>,
67-
modifiers: std::marker::PhantomData<M>,
66+
_ignore: std::marker::PhantomData<Response>,
6867
}
6968

70-
impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, 'a, Response, Streams, Modifiers<L, C>> {
71-
/// Have the impulse chain run until it is finished without holding onto any
72-
/// [`Promise`]. The final output will be automatically disposed.
73-
pub fn detach(self) {
74-
self.commands.add(AddOperation::new(
75-
self.target,
76-
Receive::<Response>::new(None, true),
77-
));
78-
}
79-
80-
/// Take a [`Promise`] so you can receive the final response in the chain later.
81-
/// If the [`Promise`] is dropped then the entire impulse chain will
82-
/// automatically be cancelled from whichever link in the chain has not been
83-
/// completed yet, triggering every on_cancel branch from that link to the
84-
/// end of the chain.
85-
pub fn take(self) -> Promise<Response> {
86-
let (promise, sender) = Promise::new();
87-
self.commands.add(AddOperation::new(
88-
self.target,
89-
Receive::new(Some(sender), false),
90-
));
91-
promise
92-
}
93-
94-
/// Take the promise so you can reference it later. The service request
95-
/// will continue to be fulfilled even if you drop the [`Promise`].
69+
impl<'w, 's, 'a, Response: 'static + Send + Sync> Chain<'w, 's, 'a, Response> {
70+
/// Get the raw [`Output`] slot for the current link in the chain. You can
71+
/// use this to resume building this chain later.
9672
///
97-
/// This is effectively equivalent to running both [`Chain::detach`] and
98-
/// [`Chain::take`] together.
99-
pub fn detach_and_take(self) -> Promise<Response> {
100-
let (promise, sender) = Promise::new();
101-
self.commands.add(AddOperation::new(
102-
self.target,
103-
Receive::new(Some(sender), true),
104-
));
105-
promise
106-
}
107-
108-
/// Have the ancestor links in the impulse chain run until they are finished,
109-
/// even if the remainder of this chain gets dropped. You can continue adding
110-
/// links as if this is one continuous chain.
73+
/// Note that if you do not connect some path of your workflow into the
74+
/// `terminate` slot of your [`Scope`][1] then the workflow will not be able
75+
/// to run.
11176
///
112-
/// If the ancestor links get cancelled, the cancellation cascade will still
113-
/// continue past this link. To prevent that from happening, use
114-
/// [`Chain::sever_cancel_cascade`].
115-
pub fn detach_and_chain(self) -> Chain<'w, 's, 'a, Response, Streams, ModifiersClosed> {
116-
self.commands.entity(self.source).insert(DetachDependency);
117-
Chain::new(self.source, self.target, self.commands)
118-
}
119-
120-
/// Change this into a [`Dangling`] chain. You can use this to resume building
121-
/// this chain later.
122-
///
123-
/// Note that if you do not finish building the dangling chain before the
124-
/// next flush, the chain will be cancelled up to its closest
125-
/// [`Chain::detach_and_chain`] link. You can use [`Chain::detach_and_dangle`]
126-
/// to obtain a [`Dangling`] while still ensuring that this chain will be executed.
127-
pub fn dangle(self) -> Dangling<Response, Streams> {
128-
Dangling::new(self.source, self.target)
129-
}
130-
131-
/// A combination of [`Chain::detach`] and [`Chain::dangle`].
132-
pub fn detach_and_dangle(self) -> Dangling<Response, Streams> {
133-
self.detach_and_chain().dangle()
134-
}
135-
136-
/// If any ancestor links in this chain get cancelled, the cancellation cascade
137-
/// will be stopped at this link and the remainder of the chain will be
138-
/// disposed instead of cancelled. No child links from this one will have
139-
/// their cancellation branches triggered from a cancellation that happens
140-
/// before this link. Any cancellation behavior assigned to this link will
141-
/// still apply.
142-
///
143-
/// Any cancellation that happens after this link will cascade down as
144-
/// normal until it reaches the next instance of `dispose_on_cancel`.
145-
///
146-
/// If a non-detached descendant of this link gets dropped, the ancestors of
147-
/// this link will still be cancelled as usual. To prevent a dropped
148-
/// descendant from cancelling its ancestors, use [`Chain::detach_and_chain`].
149-
///
150-
/// ```
151-
/// use bevy_impulse::{*, testing::*};
152-
/// let mut context = TestingContext::minimal_plugins();
153-
/// let mut promise = context.build(|commands| {
154-
/// commands
155-
/// .provide("hello")
156-
/// .map_block(produce_err)
157-
/// .cancel_on_err()
158-
/// .dispose_on_cancel()
159-
/// .take()
160-
/// });
161-
///
162-
/// context.run_while_pending(&mut promise);
163-
/// assert!(promise.peek().is_disposed());
164-
/// ```
165-
pub fn dispose_on_cancel(self) -> Chain<'w, 's, 'a, Response, (), ModifiersClosed> {
166-
self.commands.entity(self.source).insert(DisposeOnCancel);
167-
Chain::new(self.source, self.target, self.commands)
77+
/// [1]: crate::Scope
78+
pub fn output(self) -> Output<Response> {
79+
Output::new(self.scope, self.target)
16880
}
16981

17082
/// Use the response in the chain as a new request as soon as the response
@@ -173,10 +85,9 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
17385
pub fn then<P: Provider<Request = Response>>(
17486
self,
17587
provider: P,
176-
) -> Chain<'w, 's, 'a, P::Response, P::Streams, ModifiersUnset>
88+
) -> Chain<'w, 's, 'a, P::Response>
17789
where
17890
P::Response: 'static + Send + Sync,
179-
P::Streams: StreamPack,
18091
{
18192
let source = self.target;
18293
let target = self.commands.spawn(UnusedTarget).id();
@@ -189,11 +100,10 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
189100
pub fn map<M, F: AsMap<M>>(
190101
self,
191102
f: F,
192-
) -> Chain<'w, 's, 'a, <F::MapType as Provider>::Response, <F::MapType as Provider>::Streams, ModifiersUnset>
103+
) -> Chain<'w, 's, 'a, <F::MapType as ProvideOnce>::Response>
193104
where
194105
F::MapType: Provider<Request=Response>,
195-
<F::MapType as Provider>::Response: 'static + Send + Sync,
196-
<F::MapType as Provider>::Streams: StreamPack,
106+
<F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
197107
{
198108
self.then(f.as_map())
199109
}
@@ -207,7 +117,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
207117
pub fn map_block<U>(
208118
self,
209119
f: impl FnMut(Response) -> U + 'static + Send + Sync,
210-
) -> Chain<'w, 's, 'a, U, (), ModifiersUnset>
120+
) -> Chain<'w, 's, 'a, U>
211121
where
212122
U: 'static + Send + Sync,
213123
{
@@ -220,7 +130,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
220130
pub fn map_async<Task>(
221131
self,
222132
f: impl FnMut(Response) -> Task + 'static + Send + Sync,
223-
) -> Chain<'w, 's, 'a, Task::Output, (), ModifiersUnset>
133+
) -> Chain<'w, 's, 'a, Task::Output>
224134
where
225135
Task: Future + 'static + Send + Sync,
226136
Task::Output: 'static + Send + Sync,
@@ -238,7 +148,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
238148
pub fn cancellation_filter<ThenResponse, F>(
239149
self,
240150
filter_provider: F
241-
) -> Chain<'w, 's, 'a, ThenResponse, (), ModifiersUnset>
151+
) -> Chain<'w, 's, 'a, ThenResponse>
242152
where
243153
ThenResponse: 'static + Send + Sync,
244154
F: Provider<Request = Response, Response = Option<ThenResponse>>,
@@ -249,12 +159,12 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
249159
}
250160

251161
/// Same as [`Chain::cancellation_filter`] but the chain will be disposed
252-
/// instead of cancelled, so the chain will be dropped without any
253-
/// cancellation behavior occurring.
162+
/// instead of cancelled, so the workflow may continue if the termination
163+
/// node can still be reached.
254164
pub fn disposal_filter<ThenResponse, F>(
255165
self,
256166
filter_provider: F,
257-
) -> Chain<'w, 's, 'a, ThenResponse, (), ModifiersClosed>
167+
) -> Chain<'w, 's, 'a, ThenResponse>
258168
where
259169
ThenResponse: 'static + Send + Sync,
260170
F: Provider<Request = Response, Response = Option<ThenResponse>>,

0 commit comments

Comments
 (0)