Skip to content

Commit 85501df

Browse files
authored
Introduce Split operation (#33)
Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
1 parent 72fcd72 commit 85501df

File tree

9 files changed

+1370
-15
lines changed

9 files changed

+1370
-15
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ async-task = { version = "4.7.1", optional = true }
2525
# bevy_tasks::Task, so we're leaving it as a mandatory dependency for now.
2626
bevy_tasks = { version = "0.12", features = ["multi-threaded"] }
2727

28-
arrayvec = "0.7"
2928
itertools = "0.13"
3029
smallvec = "1.13"
3130
tokio = { version = "1.39", features = ["sync"]}

src/buffer/bufferable.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -207,28 +207,28 @@ impl<T: Bufferable, const N: usize> Bufferable for [T; N] {
207207
}
208208

209209
pub trait IterBufferable {
210-
type BufferType: Buffered;
210+
type BufferElement: Buffered;
211211

212212
/// Convert an iterable collection of bufferable workflow elements into
213213
/// buffers if they are not buffers already.
214214
fn into_buffer_vec<const N: usize>(
215215
self,
216216
builder: &mut Builder,
217-
) -> SmallVec<[Self::BufferType; N]>;
217+
) -> SmallVec<[Self::BufferElement; N]>;
218218

219219
/// Join an iterable collection of bufferable workflow elements.
220220
///
221221
/// Performance is best if you can choose an `N` which is equal to the
222222
/// number of buffers inside the iterable, but this will work even if `N`
223223
/// does not match the number.
224-
fn join_vec<const N: usize>(
224+
fn join_vec<'w, 's, 'a, 'b, const N: usize>(
225225
self,
226-
builder: &mut Builder,
227-
) -> Output<SmallVec<[<Self::BufferType as Buffered>::Item; N]>>
226+
builder: &'b mut Builder<'w, 's, 'a>,
227+
) -> Chain<'w, 's, 'a, 'b, SmallVec<[<Self::BufferElement as Buffered>::Item; N]>>
228228
where
229229
Self: Sized,
230-
Self::BufferType: 'static + Send + Sync,
231-
<Self::BufferType as Buffered>::Item: 'static + Send + Sync,
230+
Self::BufferElement: 'static + Send + Sync,
231+
<Self::BufferElement as Buffered>::Item: 'static + Send + Sync,
232232
{
233233
let buffers = self.into_buffer_vec::<N>(builder);
234234
let join = builder.commands.spawn(()).id();
@@ -239,6 +239,23 @@ pub trait IterBufferable {
239239
Join::new(buffers, target),
240240
));
241241

242-
Output::new(builder.scope, target)
242+
Output::new(builder.scope, target).chain(builder)
243+
}
244+
}
245+
246+
impl<T> IterBufferable for T
247+
where
248+
T: IntoIterator,
249+
T::Item: Bufferable,
250+
{
251+
type BufferElement = <T::Item as Bufferable>::BufferType;
252+
253+
fn into_buffer_vec<const N: usize>(
254+
self,
255+
builder: &mut Builder,
256+
) -> SmallVec<[Self::BufferElement; N]> {
257+
SmallVec::<[Self::BufferElement; N]>::from_iter(
258+
self.into_iter().map(|e| e.into_buffer(builder)),
259+
)
243260
}
244261
}

src/builder.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ use crate::{
2626
AddOperation, AsMap, BeginCleanupWorkflow, Buffer, BufferItem, BufferKeys, BufferSettings,
2727
Bufferable, Buffered, Chain, Collect, ForkClone, ForkCloneOutput, ForkTargetStorage, Gate,
2828
GateRequest, Injection, InputSlot, IntoAsyncMap, IntoBlockingMap, Node, OperateBuffer,
29-
OperateBufferAccess, OperateDynamicGate, OperateScope, OperateStaticGate, Output, Provider,
30-
RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints, ScopeSettings, ScopeSettingsStorage,
31-
Sendish, Service, StreamPack, StreamTargetMap, StreamsOfMap, Trim, TrimBranch, UnusedTarget,
29+
OperateBufferAccess, OperateDynamicGate, OperateScope, OperateSplit, OperateStaticGate, Output,
30+
Provider, RequestOfMap, ResponseOfMap, Scope, ScopeEndpoints, ScopeSettings,
31+
ScopeSettingsStorage, Sendish, Service, SplitOutputs, Splittable, StreamPack, StreamTargetMap,
32+
StreamsOfMap, Trim, TrimBranch, UnusedTarget,
3233
};
3334

3435
pub(crate) mod connect;
@@ -339,6 +340,26 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
339340
self.create_collect(n, Some(n))
340341
}
341342

343+
/// Create a new split operation in the workflow. The [`InputSlot`] can take
344+
/// in values that you want to split, and [`SplitOutputs::build`] will let
345+
/// you build connections to the split value.
346+
pub fn create_split<T>(&mut self) -> (InputSlot<T>, SplitOutputs<T>)
347+
where
348+
T: 'static + Send + Sync + Splittable,
349+
{
350+
let source = self.commands.spawn(()).id();
351+
self.commands.add(AddOperation::new(
352+
Some(self.scope),
353+
source,
354+
OperateSplit::<T>::default(),
355+
));
356+
357+
(
358+
InputSlot::new(self.scope, source),
359+
SplitOutputs::new(self.scope, source),
360+
)
361+
}
362+
342363
/// This method allows you to define a cleanup workflow that branches off of
343364
/// this scope that will activate during the scope's cleanup phase. The
344365
/// input to the cleanup workflow will be a key to access to one or more

src/chain.rs

Lines changed: 123 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ use crate::{
2727
make_option_branching, make_result_branching, AddOperation, AsMap, Buffer, BufferKey,
2828
BufferKeys, Bufferable, Buffered, Builder, Collect, CreateCancelFilter, CreateDisposalFilter,
2929
ForkTargetStorage, Gate, GateRequest, InputSlot, IntoAsyncMap, IntoBlockingCallback,
30-
IntoBlockingMap, Node, Noop, OperateBufferAccess, OperateDynamicGate, OperateStaticGate,
31-
Output, ProvideOnce, Provider, Scope, ScopeSettings, Sendish, Service, Spread, StreamOf,
32-
StreamPack, StreamTargetMap, Trim, TrimBranch, UnusedTarget,
30+
IntoBlockingMap, Node, Noop, OperateBufferAccess, OperateDynamicGate, OperateSplit,
31+
OperateStaticGate, Output, ProvideOnce, Provider, Scope, ScopeSettings, Sendish, Service,
32+
Spread, StreamOf, StreamPack, StreamTargetMap, Trim, TrimBranch, UnusedTarget,
3333
};
3434

3535
pub mod fork_clone_builder;
@@ -38,6 +38,9 @@ pub use fork_clone_builder::*;
3838
pub(crate) mod premade;
3939
use premade::*;
4040

41+
pub mod split;
42+
pub use split::*;
43+
4144
pub mod unzip;
4245
pub use unzip::*;
4346

@@ -601,6 +604,71 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
601604
self.map_async(|r| r)
602605
}
603606

607+
/// If the chain's response implements the [`Splittable`] trait, then this
608+
/// will insert a split operation and provide your `build` function with the
609+
/// [`SplitBuilder`] for it. This returns the return value of your build
610+
/// function.
611+
pub fn split<U>(self, build: impl FnOnce(SplitBuilder<T>) -> U) -> U
612+
where
613+
T: Splittable,
614+
{
615+
let source = self.target;
616+
self.builder.commands.add(AddOperation::new(
617+
Some(self.builder.scope),
618+
source,
619+
OperateSplit::<T>::default(),
620+
));
621+
622+
build(SplitBuilder::new(source, self.builder))
623+
}
624+
625+
/// If the chain's response implements the [`Splittable`] trait, then this
626+
/// will insert a split and provide a container for its available outputs.
627+
/// To build connections to these outputs later, use [`SplitOutputs::build`].
628+
///
629+
/// This is equivalent to
630+
/// ```text
631+
/// .split(|split| split.outputs())
632+
/// ```
633+
pub fn split_outputs(self) -> SplitOutputs<T>
634+
where
635+
T: Splittable,
636+
{
637+
self.split(|b| b.outputs())
638+
}
639+
640+
/// If the chain's response can be turned into an iterator with an appropriate
641+
/// item type, this will allow it to be split in a list-like way.
642+
///
643+
/// This is equivalent to
644+
/// ```text
645+
/// .map_block(SplitAsList::new).split(build)
646+
/// ```
647+
pub fn split_as_list<U>(self, build: impl FnOnce(SplitBuilder<SplitAsList<T>>) -> U) -> U
648+
where
649+
T: IntoIterator,
650+
T::Item: 'static + Send + Sync,
651+
{
652+
self.map_block(SplitAsList::new).split(build)
653+
}
654+
655+
/// If the chain's response can be turned into an iterator with an appropriate
656+
/// item type, this will insert a split and provide a container for its
657+
/// available outputs. To build connections to these outputs later, use
658+
/// [`SplitOutputs::build`].
659+
///
660+
/// This is equivalent to
661+
/// ```text
662+
/// .split_as_list(|split| split.outputs())
663+
/// ```
664+
pub fn split_as_list_outputs(self) -> SplitOutputs<SplitAsList<T>>
665+
where
666+
T: IntoIterator,
667+
T::Item: 'static + Send + Sync,
668+
{
669+
self.split_as_list(|b| b.outputs())
670+
}
671+
604672
/// Add a [no-op][1] to the current end of the chain.
605673
///
606674
/// As the name suggests, a no-op will not actually do anything, but it adds
@@ -633,10 +701,12 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
633701
}
634702
}
635703

704+
/// The scope that the chain is building inside of.
636705
pub fn scope(&self) -> Entity {
637706
self.builder.scope
638707
}
639708

709+
/// The target where the chain will be sending its latest output.
640710
pub fn target(&self) -> Entity {
641711
self.target
642712
}
@@ -942,6 +1012,38 @@ where
9421012
}
9431013
}
9441014

1015+
impl<'w, 's, 'a, 'b, K, V, T> Chain<'w, 's, 'a, 'b, T>
1016+
where
1017+
K: 'static + Send + Sync + Eq + std::hash::Hash + Clone + std::fmt::Debug,
1018+
V: 'static + Send + Sync,
1019+
T: 'static + Send + Sync + IntoIterator<Item = (K, V)>,
1020+
{
1021+
/// If the chain's response type can be turned into an iterator that returns
1022+
/// `(key, value)` pairs, then this will split it in a map-like way, whether
1023+
/// or not it is a conventional map data structure.
1024+
///
1025+
/// This is equivalent to
1026+
/// ```text
1027+
/// .map_block(SplitAsMap::new).split(build)
1028+
/// ```
1029+
pub fn split_as_map<U>(self, build: impl FnOnce(SplitBuilder<SplitAsMap<K, V, T>>) -> U) -> U {
1030+
self.map_block(SplitAsMap::new).split(build)
1031+
}
1032+
1033+
/// If the chain's response type can be turned into an iterator that returns
1034+
/// `(key, value)` pairs, then this will split it in a map-like way and
1035+
/// provide a container for its available outputs. To build connections to
1036+
/// these outputs later, use [`SplitOutputs::build`].
1037+
///
1038+
/// This is equivalent to
1039+
/// ```text
1040+
/// .split_as_map(|split| split.outputs())
1041+
/// ```
1042+
pub fn split_as_map_outputs(self) -> SplitOutputs<SplitAsMap<K, V, T>> {
1043+
self.split_as_map(|b| b.outputs())
1044+
}
1045+
}
1046+
9451047
impl<'w, 's, 'a, 'b, Request, Response, Streams>
9461048
Chain<'w, 's, 'a, 'b, (Request, Service<Request, Response, Streams>)>
9471049
where
@@ -1030,6 +1132,24 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
10301132
}
10311133
}
10321134

1135+
impl<'w, 's, 'a, 'b, K, V> Chain<'w, 's, 'a, 'b, (K, V)>
1136+
where
1137+
K: 'static + Send + Sync,
1138+
V: 'static + Send + Sync,
1139+
{
1140+
/// If the chain's response contains a `(key, value)` pair, get the `key`
1141+
/// component from it (the first element of the tuple).
1142+
pub fn key(self) -> Chain<'w, 's, 'a, 'b, K> {
1143+
self.map_block(|(key, _)| key)
1144+
}
1145+
1146+
/// If the chain's response contains a `(key, value)` pair, get the `value`
1147+
/// component from it (the second element of the tuple).
1148+
pub fn value(self) -> Chain<'w, 's, 'a, 'b, V> {
1149+
self.map_block(|(_, value)| value)
1150+
}
1151+
}
1152+
10331153
#[cfg(test)]
10341154
mod tests {
10351155
use crate::{prelude::*, testing::*};

0 commit comments

Comments
 (0)