Skip to content

Commit 4d9a71b

Browse files
Merge remote-tracking branch 'origin/impulse_builder' into luca/buffered_macros
Signed-off-by: Luca Della Vedova <lucadv@intrinsic.ai>
2 parents fe9ce29 + 128b509 commit 4d9a71b

23 files changed

+873
-182
lines changed

src/callback.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717

1818
use crate::{
19-
BlockingCallback, AsyncCallback, Channel, InnerChannel, ChannelQueue,
19+
BlockingCallback, AsyncCallback, Channel, ChannelQueue,
2020
OperationRoster, StreamPack, Input, Provider, ProvideOnce,
2121
AddOperation, OperateCallback, ManageInput, OperationError,
2222
OrBroken, OperateTask,
@@ -141,10 +141,11 @@ impl<'a> CallbackRequest<'a> {
141141
fn get_channel<Streams: StreamPack>(
142142
&mut self,
143143
session: Entity,
144-
) -> Result<Channel<Streams>, OperationError> {
144+
) -> Result<(Channel, Streams::Channel), OperationError> {
145145
let sender = self.world.get_resource_or_insert_with(|| ChannelQueue::new()).sender.clone();
146-
let channel = InnerChannel::new(self.source, session, sender);
147-
channel.into_specific(&self.world)
146+
let channel = Channel::new(self.source, session, sender);
147+
let streams = channel.for_streams::<Streams>(&self.world)?;
148+
Ok((channel, streams))
148149
}
149150
}
150151

@@ -179,7 +180,7 @@ struct BlockingCallbackSystem<Request, Response, Streams: StreamPack> {
179180
initialized: bool,
180181
}
181182

182-
impl<Request, Response, Streams> CallbackTrait<Request, Response, ()> for BlockingCallbackSystem<Request, Response, Streams>
183+
impl<Request, Response, Streams> CallbackTrait<Request, Response, Streams> for BlockingCallbackSystem<Request, Response, Streams>
183184
where
184185
Request: 'static + Send + Sync,
185186
Response: 'static + Send + Sync,
@@ -222,14 +223,14 @@ where
222223
fn call(&mut self, mut input: CallbackRequest) -> Result<(), OperationError> {
223224
let Input { session, data: request } = input.get_request()?;
224225

225-
let channel = input.get_channel(session)?;
226+
let (channel, streams) = input.get_channel::<Streams>(session)?;
226227

227228
if !self.initialized {
228229
self.system.initialize(&mut input.world);
229230
}
230231

231232
let task = self.system.run(AsyncCallback {
232-
request, channel, source: input.source, session,
233+
request, streams, channel, source: input.source, session,
233234
}, &mut input.world);
234235
self.system.apply_deferred(&mut input.world);
235236

@@ -263,15 +264,16 @@ pub trait AsCallback<M> {
263264
fn as_callback(self) -> Callback<Self::Request, Self::Response, Self::Streams>;
264265
}
265266

266-
impl<Request, Response, M, Sys> AsCallback<BlockingCallbackMarker<(Request, Response, M)>> for Sys
267+
impl<Request, Response, Streams, M, Sys> AsCallback<BlockingCallbackMarker<(Request, Response, Streams, M)>> for Sys
267268
where
268-
Sys: IntoSystem<BlockingCallback<Request>, Response, M>,
269+
Sys: IntoSystem<BlockingCallback<Request, Streams>, Response, M>,
269270
Request: 'static + Send + Sync,
270271
Response: 'static + Send + Sync,
272+
Streams: StreamPack,
271273
{
272274
type Request = Request;
273275
type Response = Response;
274-
type Streams = ();
276+
type Streams = Streams;
275277

276278
fn as_callback(self) -> Callback<Self::Request, Self::Response, Self::Streams> {
277279
Callback::new(BlockingCallbackSystem {
@@ -341,9 +343,9 @@ where
341343
fn as_callback(mut self) -> Callback<Self::Request, Self::Response, Self::Streams> {
342344
let callback = move |mut input: CallbackRequest| {
343345
let Input { session, data: request } = input.get_request::<Self::Request>()?;
344-
let channel = input.get_channel(session)?;
346+
let (channel, streams) = input.get_channel::<Streams>(session)?;
345347
let task = (self)(AsyncCallback {
346-
request, channel, source: input.source, session,
348+
request, streams, channel, source: input.source, session,
347349
});
348350
input.give_task(session, task)
349351
};

src/chain.rs

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,12 @@ pub use fork_clone_builder::*;
3636
pub mod unzip;
3737
pub use unzip::*;
3838

39-
/// After submitting a service request, use [`Chain`] to describe how
40-
/// the response should be handled. At a minimum, for the response to be
41-
/// delivered, you must choose one of:
42-
/// - `.detach()`: Let the service run to completion and then discard the
43-
/// response data.
44-
/// - `.take()`: As long as the [`Promise`] or one of its clones is alive,
45-
/// the service will continue running to completion and you will be able to
46-
/// view the response (or take the response, but only once). If all clones of
47-
/// the [`Promise`] are dropped before the service is delivered, it will
48-
/// be cancelled.
49-
/// - `.detach_and_take()`: As long as the [`Promise`] or one of its clones is
50-
/// alive, you will be able to view the response (or take the response, but
51-
/// only once). The service will run to completion even if every clone of the
52-
/// [`Promise`] is dropped.
39+
/// Chain operations onto the output of a workflow node.
5340
///
54-
/// If you do not select one of the above then the service request will be
55-
/// cancelled without ever attempting to run.
41+
/// Make sure to use [`Self::connect`] when you're done chaining so that the
42+
/// final output of the chain gets connected into another node. If the final
43+
/// output of the chain is meant to be the final output of your workflow then
44+
/// you should connect it to [`Scope::terminate`].
5645
#[must_use]
5746
pub struct Chain<'w, 's, 'a, 'b, T> {
5847
target: Entity,
@@ -65,10 +54,8 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
6554
/// use this to resume building this chain later.
6655
///
6756
/// Note that if you do not connect some path of your workflow into the
68-
/// `terminate` slot of your [`Scope`][1] then the workflow will not be able
57+
/// `terminate` slot of your [`Scope`] then the workflow will not be able
6958
/// to run.
70-
///
71-
/// [1]: crate::Scope
7259
#[must_use]
7360
pub fn output(self) -> Output<T> {
7461
Output::new(self.scope(), self.target)
@@ -217,8 +204,8 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
217204

218205
/// Build a workflow scope to be used as an element in this chain.
219206
///
220-
/// If you want to connect to the stream outputs, use
221-
/// [`Self::then_scope_node`] instead.
207+
/// If you want to connect to the stream outputs or be able to loop back
208+
/// to the input of this scope, use [`Self::then_scope_node`] instead.
222209
#[must_use]
223210
pub fn then_scope<Response, Streams, Settings>(
224211
self,
@@ -236,14 +223,13 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
236223
}
237224

238225
/// Simplified version of [`Self::then_scope`] limited to a simple input and
239-
/// output. This does not support streams and only uses default scope
240-
/// settings.
226+
/// output.
241227
///
242228
/// Unlike `then_scope`, this function can infer the types for the generics
243229
/// so you don't need to explicitly specify them.
244230
pub fn then_io_scope<Response, Settings>(
245231
self,
246-
build: impl FnOnce(Scope<T, Response, ()>, &mut Builder) -> Settings,
232+
build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
247233
) -> Chain<'w, 's, 'a, 'b, Response>
248234
where
249235
Response: 'static + Send + Sync,
@@ -271,14 +257,13 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
271257
}
272258

273259
/// Simplified version of [`Self::then_scope_node`] limited to a simple
274-
/// input and output. This does not support streams and only uses default
275-
/// scope settings.
260+
/// input and output.
276261
///
277262
/// Unlike `then_scope_node`, this function can infer the types for the
278263
/// generics so you don't need to explicitly specify them.
279264
pub fn then_io_scope_node<Response, Settings>(
280265
self,
281-
build: impl FnOnce(Scope<T, Response, ()>, &mut Builder) -> Settings,
266+
build: impl FnOnce(Scope<T, Response>, &mut Builder) -> Settings,
282267
) -> Node<T, Response, ()>
283268
where
284269
Response: 'static + Send + Sync,
@@ -288,12 +273,13 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
288273
}
289274

290275
/// Apply a [`Provider`] that filters the response by returning an [`Option`].
291-
/// If the filter returns [`None`] then a cancellation is triggered.
292-
/// Otherwise the chain continues with the value given inside [`Some`].
276+
/// If the filter returns [`None`] then a [`Cancellation`](crate::Cancellation)
277+
/// is triggered. Otherwise the chain continues with the value that was
278+
/// inside [`Some`].
293279
///
294280
/// This is conceptually similar to [`Iterator::filter_map`]. You can also
295-
/// use [`Chain::disposal_filter`] to dispose the remainder of the chain
296-
/// instead of cancelling it.
281+
/// use [`Chain::disposal_filter`] to dispose of the value instead of
282+
/// cancelling the entire scope.
297283
#[must_use]
298284
pub fn cancellation_filter<ThenResponse, F>(
299285
self,
@@ -326,14 +312,14 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
326312

327313
/// When the response is delivered, we will make a clone of it and
328314
/// simultaneously pass that clone along two different branches chains: one
329-
/// determined by the `build` function passed into this function and the
330-
/// other determined by the [`Chain`] that gets returned by this function.
315+
/// determined by the `build` function passed into this operation and the
316+
/// other determined by the [`Chain`] that gets returned.
331317
///
332318
/// This can only be applied when `Response` can be cloned.
333319
///
334320
/// See also [`Chain::fork_clone`]
335321
#[must_use]
336-
pub fn fork_clone_branch(
322+
pub fn branch_clone(
337323
self,
338324
build: impl FnOnce(Chain<T>),
339325
) -> Chain<'w, 's, 'a, 'b, T>
@@ -410,7 +396,7 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
410396
///
411397
/// As the name suggests, a no-op will not actually do anything, but it adds
412398
/// a new link (entity) into the chain.
413-
/// [1]: https://en.wikipedia.org/wiki/NOP_(code)
399+
/// [1]: `<https://en.wikipedia.org/wiki/NOP_(code)>`
414400
#[must_use]
415401
pub fn noop(self) -> Chain<'w, 's, 'a, 'b, T> {
416402
let source = self.target;
@@ -614,7 +600,7 @@ where
614600
) -> Chain<'w, 's, 'a, 'b, T> {
615601
Chain::<Option<T>>::new(
616602
self.target, self.builder,
617-
).branch_option_zip(
603+
).fork_option(
618604
|chain| chain.output(),
619605
build_none,
620606
).0.chain(self.builder)
@@ -627,8 +613,7 @@ where
627613
///
628614
/// The outputs of both builder functions will be zipped as the return value
629615
/// of this function.
630-
#[must_use]
631-
pub fn branch_option_zip<U, V>(
616+
pub fn fork_option<U, V>(
632617
self,
633618
build_some: impl FnOnce(Chain<T>) -> U,
634619
build_none: impl FnOnce(Chain<()>) -> V,
@@ -746,7 +731,7 @@ mod tests {
746731
);
747732

748733
context.run_with_conditions(&mut promise, Duration::from_secs(2));
749-
assert!(promise.peek().available().is_some_and(|value| *value == 6.0));
734+
assert!(promise.take().available().is_some_and(|value| value == 6.0));
750735
assert!(context.no_unhandled_errors());
751736
}
752737

@@ -806,7 +791,7 @@ mod tests {
806791
.with_update_count(100),
807792
);
808793

809-
assert_eq!(promise.peek().available().copied(), Some(16.0));
794+
assert_eq!(promise.take().available(), Some(16.0));
810795
assert!(context.no_unhandled_errors());
811796
}
812797

@@ -850,7 +835,7 @@ mod tests {
850835
});
851836

852837
context.run_while_pending(&mut promise);
853-
assert_eq!(promise.peek().available().copied(), Some(15.0));
838+
assert_eq!(promise.take().available(), Some(15.0));
854839
assert!(context.no_unhandled_errors());
855840
}
856841

@@ -943,7 +928,7 @@ mod tests {
943928
});
944929

945930
context.run_with_conditions(&mut promise, Duration::from_secs(2));
946-
assert!(promise.peek().available().is_some_and(|v| *v == 1.0));
931+
assert!(promise.take().available().is_some_and(|v| v == 1.0));
947932
assert!(context.no_unhandled_errors());
948933

949934
let mut promise = context.command(|commands| {
@@ -953,7 +938,7 @@ mod tests {
953938
});
954939

955940
context.run_with_conditions(&mut promise, Duration::from_secs(2));
956-
assert!(promise.peek().available().is_some_and(|v| *v == 5.0));
941+
assert!(promise.take().available().is_some_and(|v| v == 5.0));
957942
assert!(context.no_unhandled_errors());
958943

959944
let mut promise = context.command(|commands| {

src/channel.rs

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,11 @@ use crate::{
3030
};
3131

3232
#[derive(Clone)]
33-
pub struct Channel<Streams: StreamPack = ()> {
33+
pub struct Channel {
3434
inner: Arc<InnerChannel>,
35-
streams: Streams::Channel,
36-
_ignore: std::marker::PhantomData<Streams>,
3735
}
3836

39-
impl<Streams: StreamPack> Channel<Streams> {
37+
impl Channel {
4038
pub fn query<P: Provider>(&self, request: P::Request, provider: P) -> Promise<P::Response>
4139
where
4240
P::Request: 'static + Send + Sync,
@@ -68,11 +66,21 @@ impl<Streams: StreamPack> Channel<Streams> {
6866
promise
6967
}
7068

71-
/// Get stream channels that will let you send stream information. This will
72-
/// usually be one [`StreamChannel`] or a (possibly nested) tuple of
73-
/// `StreamChannel`s, whichever matches the [`StreamPack`] description.
74-
pub fn streams(&self) -> &Streams::Channel {
75-
&self.streams
69+
pub(crate) fn for_streams<Streams: StreamPack>(
70+
&self,
71+
world: &World,
72+
) -> Result<Streams::Channel, OperationError> {
73+
Ok(Streams::make_channel(&self.inner, world))
74+
}
75+
76+
pub(crate) fn new(
77+
source: Entity,
78+
session: Entity,
79+
sender: CbSender<ChannelItem>,
80+
) -> Self {
81+
Self {
82+
inner: Arc::new(InnerChannel { source, session, sender }),
83+
}
7684
}
7785
}
7886

@@ -91,23 +99,6 @@ impl InnerChannel {
9199
pub fn sender(&self) -> &CbSender<ChannelItem> {
92100
&self.sender
93101
}
94-
95-
pub(crate) fn into_specific<Streams: StreamPack>(
96-
self,
97-
world: &World,
98-
) -> Result<Channel<Streams>, OperationError> {
99-
let inner = Arc::new(self);
100-
let streams = Streams::make_channel(&inner, world);
101-
Ok(Channel { inner, streams, _ignore: Default::default() })
102-
}
103-
104-
pub(crate) fn new(
105-
source: Entity,
106-
session: Entity,
107-
sender: CbSender<ChannelItem>,
108-
) -> Self {
109-
InnerChannel { source, session, sender }
110-
}
111102
}
112103

113104
pub(crate) type ChannelItem = Box<dyn FnOnce(&mut World, &mut OperationRoster) + Send>;

src/discovery.rs

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/flush.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ fn collect_from_channels(
125125
(item)(world, roster);
126126
}
127127

128+
roster.process_deferals();
129+
128130
world.get_resource_or_insert_with(|| ServiceLifecycleChannel::new());
129131
world.resource_scope::<ServiceLifecycleChannel, ()>(|world, lifecycles| {
130132
// Clean up the dangling requests of any services that have been despawned.

0 commit comments

Comments
 (0)