Skip to content

Commit b040f64

Browse files
committed
Fixing implementation of streams
Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
1 parent 18745e6 commit b040f64

File tree

8 files changed

+316
-174
lines changed

8 files changed

+316
-174
lines changed

src/builder.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (C) 2023 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::prelude::{Entity, Commands};
19+
20+
use crate::{
21+
Provider, UnusedTarget, StreamPack, Node, InputSlot, Output, StreamTargetMap,
22+
};
23+
24+
pub(crate) mod internal;
25+
pub(crate) use internal::*;
26+
27+
/// Device used for building a workflow. Simply pass a mutable borrow of this
28+
/// into any functions which ask for it.
29+
///
30+
/// Note that each scope has its own [`Builder`], and a panic will occur if a
31+
/// [`Builder`] gets used in the wrong scope. As of right now there is no known
32+
/// way to trick the compiler into using a [`Builder`] in the wrong scope, but
33+
/// please open an issue with a minimal reproducible example if you find a way
34+
/// to make it panic.
35+
pub struct Builder<'w, 's, 'a> {
36+
pub(crate) scope: Entity,
37+
pub(crate) commands: &'a mut Commands<'w, 's>,
38+
}
39+
40+
impl<'w, 's, 'a> Builder<'w, 's, 'a> {
41+
pub fn create_node<P: Provider>(
42+
&mut self,
43+
provider: P,
44+
) -> Node<P::Request, P::Response, P::Streams>
45+
where
46+
P::Request: 'static + Send + Sync,
47+
P::Response: 'static + Send + Sync,
48+
P::Streams: StreamPack,
49+
{
50+
let source = self.commands.spawn(()).id();
51+
let target = self.commands.spawn(UnusedTarget).id();
52+
provider.connect(source, target, self.commands);
53+
54+
let mut map = StreamTargetMap::default();
55+
let (bundle, streams) = <P::Streams as StreamPack>::spawn_node_streams(
56+
&mut map, self,
57+
);
58+
self.commands.entity(source).insert(bundle);
59+
Node {
60+
input: InputSlot::new(self.scope, source),
61+
output: Output::new(self.scope, target),
62+
streams,
63+
}
64+
}
65+
66+
/// Get the scope that this builder is building for
67+
pub fn scope(&self) -> Entity {
68+
self.scope
69+
}
70+
71+
/// Borrow the commands for the builder
72+
pub fn commands(&'a mut self) -> &'a mut Commands<'w, 's> {
73+
&mut self.commands
74+
}
75+
}

src/builder/internal.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::{
19+
prelude::Entity,
20+
ecs::system::Command,
21+
};
22+
23+
use crate::{SingleTargetStorage, ForkTargetStorage};
24+
25+
/// If two nodes have been created, they will each have a unique source and a
26+
/// target entity allocated to them. If we want to connect them, then we want
27+
/// the target of one to no longer be unique - we instead want it to be the
28+
/// source entity of the other. This [`Command`] redirects the target information
29+
/// of the sending node to target the source entity of the receiving node.
30+
struct RedirectConnection {
31+
original_target: Entity,
32+
new_target: Entity,
33+
}
34+

src/chain.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717

1818
use std::future::Future;
1919

20+
use bevy::prelude::Entity;
21+
2022
use crate::{
2123
UnusedTarget, AddOperation, Node, InputSlot, Builder,
2224
ForkClone, StreamPack, Provider, ProvideOnce,
2325
AsMap, IntoBlockingMap, IntoAsyncMap, Output, Noop,
24-
ForkTargetStorage,
26+
ForkTargetStorage, StreamTargetMap,
2527
make_result_branching, make_cancel_filter_on_err,
2628
make_option_branching, make_cancel_filter_on_none,
2729
};
2830

29-
use bevy::prelude::{Entity, Commands};
30-
3131
use smallvec::SmallVec;
3232

3333
pub mod fork_clone_builder;
@@ -76,6 +76,11 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
7676
Output::new(self.builder.scope, self.target)
7777
}
7878

79+
/// Connect this output into an input slot.
80+
pub fn connect(self, input: InputSlot<T>) {
81+
TODO: Finish implementing this
82+
}
83+
7984
/// Connect the response at the end of the chain into a new provider. Get
8085
/// the response of the new provider as a chain so you can continue chaining
8186
/// operations.
@@ -105,10 +110,12 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
105110
let source = self.target;
106111
let target = self.builder.commands.spawn(UnusedTarget).id();
107112
provider.connect(source, target, self.builder.commands);
113+
114+
let mut map = StreamTargetMap::default();
108115
let (bundle, streams) = <P::Streams as StreamPack>::spawn_node_streams(
109-
self.builder.scope, self.builder.commands,
116+
&mut map, self.builder,
110117
);
111-
self.builder.commands.entity(source).insert(bundle);
118+
self.builder.commands.entity(source).insert((bundle, map));
112119
Node {
113120
input: InputSlot::new(self.builder.scope, source),
114121
output: Output::new(self.builder.scope, target),

src/impulse.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::future::Future;
2323

2424
use crate::{
2525
Promise, ProvideOnce, StreamPack, IntoBlockingMapOnce, IntoAsyncMapOnce,
26-
AsMapOnce, UnusedTarget,
26+
AsMapOnce, UnusedTarget, StreamTargetMap,
2727
};
2828

2929
mod detach;
@@ -92,8 +92,9 @@ where
9292
self.target,
9393
TakenResponse::<Response>::new(response_sender),
9494
));
95-
let (bundle, stream_receivers) = Streams::take_streams(self.target, self.commands);
96-
self.commands.entity(self.source).insert(bundle);
95+
let mut map = StreamTargetMap::default();
96+
let (bundle, stream_receivers) = Streams::take_streams(self.target, &mut map, self.commands);
97+
self.commands.entity(self.source).insert((bundle, map));
9798

9899
Recipient {
99100
response: response_promise,
@@ -203,21 +204,23 @@ where
203204
Store::<Response>::new(target),
204205
));
205206

207+
let mut map = StreamTargetMap::default();
206208
let stream_targets = Streams::collect_streams(
207-
self.source, target, self.commands,
209+
self.source, target, &mut map, self.commands,
208210
);
209-
self.commands.entity(self.source).insert(stream_targets);
211+
self.commands.entity(self.source).insert((stream_targets, map));
210212
}
211213

212214
/// Collect the stream data into [`Collection<T>`] components in the
213215
/// specified target, one collection for each stream data type. You must
214216
/// still decide what to do with the final response data.
215217
#[must_use]
216218
pub fn collect_streams(self, target: Entity) -> Impulse<'w, 's, 'a, Response, ()> {
219+
let mut map = StreamTargetMap::default();
217220
let stream_targets = Streams::collect_streams(
218-
self.source, target, self.commands,
221+
self.source, target, &mut map, self.commands,
219222
);
220-
self.commands.entity(self.source).insert(stream_targets);
223+
self.commands.entity(self.source).insert((stream_targets, map));
221224

222225
Impulse {
223226
source: self.source,
@@ -241,10 +244,11 @@ where
241244
Push::<Response>::new(target, false),
242245
));
243246

247+
let mut map = StreamTargetMap::default();
244248
let stream_targets = Streams::collect_streams(
245-
self.source, target, self.commands,
249+
self.source, target, &mut map, self.commands,
246250
);
247-
self.commands.entity(self.source).insert(stream_targets);
251+
self.commands.entity(self.source).insert((stream_targets, map));
248252
}
249253

250254
// TODO(@mxgrey): Consider offering ways for users to respond to cancellations.

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*
1616
*/
1717

18+
pub mod builder;
19+
pub use builder::*;
20+
1821
pub mod cancel;
1922
pub use cancel::*;
2023

src/operation/scope.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
BufferSettings, Buffer, Cancellable, OperationRoster, ManageCancellation,
2424
OperationError, OperationCancel, Cancel, UnhandledErrors, check_reachability,
2525
Blocker, Stream, StreamTargetStorage, StreamRequest, AddOperation,
26-
ScopeSettings,
26+
ScopeSettings, StreamTargetMap,
2727
};
2828

2929
use backtrace::Backtrace;
@@ -1072,37 +1072,40 @@ pub(crate) struct ExitTarget {
10721072
}
10731073

10741074
pub(crate) struct RedirectScopeStream<T: Stream> {
1075+
target: Entity,
10751076
_ignore: std::marker::PhantomData<T>,
10761077
}
10771078

10781079
impl<T: Stream> RedirectScopeStream<T> {
1079-
pub(crate) fn new() -> Self {
1080-
Self { _ignore: Default::default() }
1080+
pub(crate) fn new(target: Entity) -> Self {
1081+
Self { target, _ignore: Default::default() }
10811082
}
10821083
}
10831084

10841085
impl<T: Stream> Operation for RedirectScopeStream<T> {
10851086
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
1086-
world.entity_mut(source).insert(
1087+
world.entity_mut(source).insert((
10871088
InputBundle::<T>::new(),
1088-
);
1089+
SingleTargetStorage::new(self.target),
1090+
));
10891091
Ok(())
10901092
}
10911093

10921094
fn execute(
10931095
OperationRequest { source, world, roster }: OperationRequest,
10941096
) -> OperationResult {
10951097
let mut source_mut = world.get_entity_mut(source).or_broken()?;
1098+
1099+
// The target is optional because we want to "send" this stream even if
1100+
// there is no target listening, because streams may have custom sending
1101+
// behavior
1102+
let target = source_mut.get::<SingleTargetStorage>().map(|t| t.get());
10961103
let Input { session: scoped_session, data } = source_mut.take_input::<T>()?;
1097-
let scope = source_mut.get::<ScopeStorage>().or_broken()?.get();
1098-
let stream_target = world
1099-
.get::<StreamTargetStorage<T>>(scope)
1100-
.map(|target| target.get());
11011104
let parent_session = world.get::<ParentSession>(scoped_session).or_broken()?.get();
11021105
data.send(StreamRequest {
11031106
source,
11041107
session: parent_session,
1105-
target: stream_target,
1108+
target,
11061109
world,
11071110
roster
11081111
})
@@ -1161,8 +1164,10 @@ impl<T: Stream> Operation for RedirectWorkflowStream<T> {
11611164
let exit_source = exit.source;
11621165
let parent_session = exit.parent_session;
11631166

1167+
let stream_target_map = world.get::<StreamTargetMap>(exit_source).or_broken()?;
11641168
let stream_target = world.get::<StreamTargetStorage<T>>(exit_source)
1165-
.map(|target| target.get());
1169+
.map(|target| stream_target_map.get(target.get()))
1170+
.flatten();
11661171

11671172
data.send(StreamRequest {
11681173
source,

0 commit comments

Comments
 (0)