Skip to content

Commit 3e077a7

Browse files
committed
Finish buffer implementation for join and catch unhandled errors
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent c779f5a commit 3e077a7

20 files changed

+231
-580
lines changed

src/buffer/bufferable.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*
1616
*/
1717

18+
use smallvec::SmallVec;
19+
1820
use crate::{
1921
Buffer, CloneFromBuffer, Output, Builder, BufferSettings, Buffered, Join,
2022
UnusedTarget, AddOperation,
@@ -109,3 +111,36 @@ impl<T: Bufferable, const N: usize> Bufferable for [T; N] {
109111
self.map(|b| b.as_buffer(builder))
110112
}
111113
}
114+
115+
pub trait IterBufferable {
116+
type BufferType: Buffered;
117+
118+
/// Convert an iterable collection of bufferable workflow elements into
119+
/// buffers if they are not buffers already.
120+
fn as_buffer_vec<const N: usize>(
121+
self,
122+
builder: &mut Builder,
123+
) -> SmallVec<[Self::BufferType; N]>;
124+
125+
/// Join an iterable collection of bufferable workflow elements.
126+
///
127+
/// Performance is best if you can choose an `N` which is equal to the
128+
/// number of buffers inside the iterable, but this will work even if `N`
129+
/// does not match the number.
130+
fn join_vec<const N: usize>(
131+
self,
132+
builder: &mut Builder,
133+
) -> Output<SmallVec<[<Self::BufferType as Buffered>::Item; N]>>
134+
where
135+
Self: Sized,
136+
Self::BufferType: 'static + Send + Sync,
137+
<Self::BufferType as Buffered>::Item: 'static + Send + Sync,
138+
{
139+
let buffers = self.as_buffer_vec::<N>(builder);
140+
let join = builder.commands.spawn(()).id();
141+
let target = builder.commands.spawn(UnusedTarget).id();
142+
builder.commands.add(AddOperation::new(join, Join::new(buffers, target)));
143+
144+
Output::new(builder.scope, target)
145+
}
146+
}

src/buffer/buffered.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
Buffer, CloneFromBuffer, OperationError, OrBroken, InspectInput, ManageInput,
2424
};
2525

26-
pub trait Buffered: Copy {
26+
pub trait Buffered: Clone {
2727
fn buffered_count(
2828
&self,
2929
session: Entity,
@@ -194,3 +194,36 @@ impl<T: Buffered, const N: usize> Buffered for [T; N] {
194194
self.iter().flat_map(|buffer| buffer.as_input()).collect()
195195
}
196196
}
197+
198+
impl<T: Buffered, const N: usize> Buffered for SmallVec<[T; N]> {
199+
fn buffered_count(
200+
&self,
201+
session: Entity,
202+
world: &World,
203+
) -> Result<usize, OperationError> {
204+
let mut min_count = None;
205+
for buffer in self.iter() {
206+
let count = buffer.buffered_count(session, world)?;
207+
if !min_count.is_some_and(|min| min < count) {
208+
min_count = Some(count);
209+
}
210+
}
211+
212+
Ok(min_count.unwrap_or(0))
213+
}
214+
215+
type Item = SmallVec<[T::Item; N]>;
216+
fn pull(
217+
&self,
218+
session: Entity,
219+
world: &mut World,
220+
) -> Result<Self::Item, OperationError> {
221+
self.iter().map(|buffer| {
222+
buffer.pull(session, world)
223+
}).collect()
224+
}
225+
226+
fn as_input(&self) -> SmallVec<[Entity; 8]> {
227+
self.iter().flat_map(|buffer| buffer.as_input()).collect()
228+
}
229+
}

src/cancel.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,30 @@ impl<'w> ManageCancellation for EntityMut<'w> {
252252
}
253253
}
254254

255+
pub fn try_emit_broken(
256+
source: Entity,
257+
backtrace: Option<Backtrace>,
258+
world: &mut World,
259+
roster: &mut OperationRoster,
260+
) {
261+
if let Some(mut source_mut) = world.get_entity_mut(source) {
262+
source_mut.emit_broken(backtrace, roster);
263+
} else {
264+
world
265+
.get_resource_or_insert_with(|| UnhandledErrors::default())
266+
.cancellations
267+
.push(CancelFailure {
268+
error: OperationError::Broken(Some(Backtrace::new())),
269+
cancel: Cancel {
270+
source,
271+
target: source,
272+
session: None,
273+
cancellation: Broken { node: source, backtrace }.into(),
274+
}
275+
});
276+
}
277+
}
278+
255279
fn try_emit_cancel(
256280
source_mut: &mut EntityMut,
257281
session: Option<Entity>,

src/chain.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
UnusedTarget, AddOperation, Node, InputSlot, Builder,
2424
ForkClone, StreamPack, Provider, ProvideOnce,
2525
AsMap, IntoBlockingMap, IntoAsyncMap, Output, Noop,
26-
ForkTargetStorage, StreamTargetMap, Connect,
26+
ForkTargetStorage, StreamTargetMap,
2727
make_result_branching, make_cancel_filter_on_err,
2828
make_option_branching, make_cancel_filter_on_none,
2929
};
@@ -36,9 +36,6 @@ pub use fork_clone_builder::*;
3636
pub mod unzip;
3737
pub use unzip::*;
3838

39-
pub mod zipped;
40-
pub use zipped::*;
41-
4239
/// After submitting a service request, use [`Chain`] to describe how
4340
/// the response should be handled. At a minimum, for the response to be
4441
/// delivered, you must choose one of:
@@ -363,9 +360,9 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
363360
).collect()
364361
}
365362

366-
/// If you have a `Chain<(A, B, C, ...), _, _>` with a tuple response then
363+
/// If you have a `Chain<(A, B, C, ...)>` with a tuple response then
367364
/// `unzip` allows you to convert it into a tuple of chains:
368-
/// `(Dangling<A>, Dangling<B>, Dangling<C>, ...)`.
365+
/// `(Output<A>, Output<B>, Output<C>, ...)`.
369366
///
370367
/// You can also consider using `unzip_build` to continue building each
371368
/// chain in the tuple independently by providing a builder function for
@@ -375,19 +372,19 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
375372
where
376373
T: Unzippable,
377374
{
378-
T::unzip_chain(self.target, self.builder)
375+
T::unzip_output(Output::<T>::new(self.scope(), self.target), self.builder)
379376
}
380377

381-
/// If you have a `Chain<(A, B, C, ...), _, _>` with a tuple response then
382-
/// `unzip_build` allows you to split it into multiple chains and apply a
383-
/// separate builder function to each chain. You will be passed back the
384-
/// zipped output of all the builder functions.
378+
/// If you have a `Chain<(A, B, C, ...)>` with a tuple response then
379+
/// `unzip_build` allows you to split it into multiple chains (one for each
380+
/// tuple element) and apply a separate builder function to each chain. You
381+
/// will be passed back the zipped output of all the builder functions.
385382
#[must_use]
386-
pub fn unzip_build<Builders>(self, builders: Builders) -> Builders::Output
383+
pub fn unzip_build<Build>(self, build: Build) -> Build::ReturnType
387384
where
388-
Builders: UnzipBuilder<T>
385+
Build: UnzipBuilder<T>
389386
{
390-
builders.unzip_build(self.target, self.builder)
387+
build.unzip_build(Output::<T>::new(self.scope(), self.target), self.builder)
391388
}
392389

393390
/// If the chain's response implements the [`Future`] trait, applying

0 commit comments

Comments
 (0)