Skip to content

Commit 28dbcf2

Browse files
committed
Testing and fixing simple workflows
Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
1 parent f652c0c commit 28dbcf2

25 files changed

+550
-291
lines changed

src/buffer/bufferable.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ pub trait Bufferable {
4242
let buffers = self.as_buffer(builder);
4343
let join = builder.commands.spawn(()).id();
4444
let target = builder.commands.spawn(UnusedTarget).id();
45-
builder.commands.add(AddOperation::new(join, Join::new(buffers, target)));
45+
builder.commands.add(AddOperation::new(
46+
Some(builder.scope()), join, Join::new(buffers, target)
47+
));
4648

4749
Output::new(builder.scope, target)
4850
}
@@ -139,7 +141,9 @@ pub trait IterBufferable {
139141
let buffers = self.as_buffer_vec::<N>(builder);
140142
let join = builder.commands.spawn(()).id();
141143
let target = builder.commands.spawn(UnusedTarget).id();
142-
builder.commands.add(AddOperation::new(join, Join::new(buffers, target)));
144+
builder.commands.add(AddOperation::new(
145+
Some(builder.scope()), join, Join::new(buffers, target),
146+
));
143147

144148
Output::new(builder.scope, target)
145149
}

src/builder.rs

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
*
1616
*/
1717

18-
use bevy::prelude::{Entity, Commands};
18+
use bevy::prelude::{Entity, Commands, BuildChildren};
19+
20+
use std::future::Future;
1921

2022
use crate::{
2123
Provider, UnusedTarget, StreamPack, Node, InputSlot, Output, StreamTargetMap,
2224
Buffer, BufferSettings, AddOperation, OperateBuffer, Scope, OperateScope,
23-
ScopeSettings, BeginCancel,
25+
ScopeSettings, BeginCancel, ScopeEndpoints, IntoBlockingMap, IntoAsyncMap,
2426
};
2527

2628
pub(crate) mod connect;
@@ -58,7 +60,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
5860
{
5961
let source = self.commands.spawn(()).id();
6062
let target = self.commands.spawn(UnusedTarget).id();
61-
provider.connect(source, target, self.commands);
63+
provider.connect(Some(self.scope), source, target, self.commands);
6264

6365
let mut map = StreamTargetMap::default();
6466
let (bundle, streams) = <P::Streams as StreamPack>::spawn_node_streams(
@@ -72,6 +74,31 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
7274
}
7375
}
7476

77+
/// Create a [node](Node) that provides a [blocking map](crate::BlockingMap).
78+
pub fn create_map_block<T, U>(
79+
&mut self,
80+
f: impl FnMut(T) -> U + 'static + Send + Sync,
81+
) -> Node<T, U, ()>
82+
where
83+
T: 'static + Send + Sync,
84+
U: 'static + Send + Sync,
85+
{
86+
self.create_node(f.into_blocking_map())
87+
}
88+
89+
/// Create a [node](Node) that provides an [async map](crate::AsyncMap).
90+
pub fn create_map_async<T, Task>(
91+
&mut self,
92+
f: impl FnMut(T) -> Task + 'static + Send + Sync,
93+
) -> Node<T, Task::Output, ()>
94+
where
95+
T: 'static + Send + Sync,
96+
Task: Future + 'static + Send + Sync,
97+
Task::Output: 'static + Send + Sync,
98+
{
99+
self.create_node(f.into_async_map())
100+
}
101+
75102
/// Connect the output of one into the input slot of another node.
76103
pub fn connect<T: 'static + Send + Sync>(
77104
&mut self,
@@ -94,6 +121,7 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
94121
) -> Buffer<T> {
95122
let source = self.commands.spawn(()).id();
96123
self.commands.add(AddOperation::new(
124+
Some(self.scope),
97125
source,
98126
OperateBuffer::<T>::new(settings),
99127
));
@@ -160,8 +188,9 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
160188
build,
161189
);
162190

163-
let begin_cancel = self.commands.spawn(()).id();
191+
let begin_cancel = self.commands.spawn(()).set_parent(self.scope).id();
164192
self.commands.add(AddOperation::new(
193+
None,
165194
begin_cancel,
166195
BeginCancel::<T>::new(self.scope, from_buffer.source, cancelling_scope_id),
167196
));
@@ -190,13 +219,13 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
190219
Response: 'static + Send + Sync,
191220
Streams: StreamPack,
192221
{
193-
let operation = OperateScope::<Request, Response, Streams>::new(
194-
scope_id, Some(exit_scope), settings, self.commands,
222+
let ScopeEndpoints {
223+
terminal,
224+
enter_scope,
225+
finish_scope_cancel
226+
} = OperateScope::<Request, Response, Streams>::add(
227+
Some(self.scope()), scope_id, Some(exit_scope), settings, self.commands,
195228
);
196-
let enter_scope = operation.enter_scope();
197-
let finish_scope_cancel = operation.finish_cancel();
198-
let terminal = operation.terminal();
199-
self.commands.add(AddOperation::new(scope_id, operation));
200229

201230
let (stream_in, stream_out) = Streams::spawn_scope_streams(
202231
scope_id,

src/builder/connect.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ use backtrace::Backtrace;
2525
use crate::{
2626
SingleInputStorage, SingleTargetStorage, ForkTargetStorage, StreamTargetMap,
2727
OperationResult, OperationError, OrBroken, UnhandledErrors, ConnectionFailure,
28+
ScopeEntryStorage, EntryForScope,
2829
};
2930

3031
/// If two nodes have been created, they will each have a unique source and a
3132
/// target entity allocated to them. If we want to connect them, then we want
3233
/// the target of one to no longer be unique - we instead want it to be the
3334
/// source entity of the other. This [`Command`] redirects the target information
3435
/// of the sending node to target the source entity of the receiving node.
35-
#[derive(Clone, Copy)]
36+
#[derive(Clone, Copy, Debug)]
3637
pub(crate) struct Connect {
3738
pub(crate) original_target: Entity,
3839
pub(crate) new_target: Entity,
@@ -53,10 +54,24 @@ impl Command for Connect {
5354
}
5455

5556
fn try_connect(connect: Connect, world: &mut World) -> OperationResult {
56-
let old_inputs = world.get::<SingleInputStorage>(connect.original_target)
57-
.or_broken()?.get().clone();
57+
if let Some(EntryForScope(scope)) = world.get(connect.original_target) {
58+
// The original target was the entry point of a scope, so we need to
59+
// handle it a bit differently. Instead of modifying target and input
60+
// storage components, we need to modify EntryForScope and
61+
// ScopeEntryStorage components.
62+
let scope = *scope;
63+
world.get_entity_mut(connect.new_target).or_broken()?.insert(EntryForScope(scope));
64+
world.get_entity_mut(scope).or_broken()?.insert(ScopeEntryStorage(connect.new_target));
5865

59-
for input in old_inputs {
66+
world.despawn(connect.original_target);
67+
return Ok(());
68+
}
69+
70+
let old_inputs = world.get_entity_mut(connect.original_target).or_broken()?
71+
.take::<SingleInputStorage>().or_broken()?
72+
.take();
73+
74+
for input in old_inputs.into_iter() {
6075
let mut input_mut = world.get_entity_mut(input).or_broken()?;
6176

6277
if let Some(mut target) = input_mut.get_mut::<SingleTargetStorage>() {
@@ -87,5 +102,7 @@ fn try_connect(connect: Connect, world: &mut World) -> OperationResult {
87102
}
88103
}
89104

105+
world.despawn(connect.original_target);
106+
90107
Ok(())
91108
}

src/callback.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,8 +418,8 @@ where
418418
type Response = Response;
419419
type Streams = Streams;
420420

421-
fn connect(self, source: Entity, target: Entity, commands: &mut bevy::prelude::Commands) {
422-
commands.add(AddOperation::new(source, OperateCallback::new(self, target)));
421+
fn connect(self, scope: Option<Entity>, source: Entity, target: Entity, commands: &mut bevy::prelude::Commands) {
422+
commands.add(AddOperation::new(scope, source, OperateCallback::new(self, target)));
423423
}
424424
}
425425

0 commit comments

Comments
 (0)