Skip to content

Commit f03c7ca

Browse files
committed
Implementing the control flow of streams
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent edfdddc commit f03c7ca

File tree

4 files changed

+280
-44
lines changed

4 files changed

+280
-44
lines changed

src/node.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,79 @@
1717

1818
use bevy::prelude::Entity;
1919

20+
use crate::StreamPack;
21+
22+
/// A collection of all the inputs and outputs for a node within a workflow.
23+
pub struct Node<Request, Response, Streams: StreamPack> {
24+
/// The input slot for the node. Feed requests into this slot to trigger
25+
/// the node.
26+
pub input: InputSlot<Request>,
27+
/// The final output of the node. Build off of this to handle the response
28+
/// that comes out of the node.
29+
pub output: Output<Response>,
30+
/// The streams that come out of the node. A stream may fire off data any
31+
/// number of times while a node is active. Each stream can fire off data
32+
/// independently. Once the final output of the node is delivered, no more
33+
/// stream data will come out.
34+
pub streams: Streams::StreamOutputPack,
35+
}
36+
37+
/// The slot that receives input for a node. When building a workflow, you can
38+
/// connect the output of a node to this, as long as the types match.
2039
pub struct InputSlot<Request> {
2140
scope: Entity,
2241
source: Entity,
2342
_ignore: std::marker::PhantomData<Request>,
2443
}
2544

2645
impl<Request> InputSlot<Request> {
46+
pub fn id(&self) -> Entity {
47+
self.source
48+
}
49+
pub fn scope(&self) -> Entity {
50+
self.scope
51+
}
2752
pub(crate) fn new(scope: Entity, source: Entity) -> Self {
2853
Self { scope, source, _ignore: Default::default() }
2954
}
3055
}
3156

57+
/// The output of a node. This can only be fed to one input slot before being
58+
/// consumed. If the `Response` parameter can be cloned then you can feed this
59+
/// into a [`CloneForkOutput`] to feed the output into any number of input slots.
3260
pub struct Output<Response> {
3361
scope: Entity,
3462
target: Entity,
3563
_ignore: std::marker::PhantomData<Response>,
3664
}
3765

3866
impl<Response> Output<Response> {
67+
pub fn id(&self) -> Entity {
68+
self.target
69+
}
70+
pub fn scope(&self) -> Entity {
71+
self.scope
72+
}
73+
pub(crate) fn new(scope: Entity, target: Entity) -> Self {
74+
Self { scope, target, _ignore: Default::default() }
75+
}
76+
}
77+
78+
/// The output of a cloning fork node. This output can be fed into any number of
79+
/// input slots.
80+
pub struct CloneForkOutput<Response> {
81+
scope: Entity,
82+
target: Entity,
83+
_ignore: std::marker::PhantomData<Response>,
84+
}
85+
86+
impl<Response> CloneForkOutput<Response> {
87+
pub fn id(&self) -> Entity {
88+
self.target
89+
}
90+
pub fn scope(&self) -> Entity {
91+
self.scope
92+
}
3993
pub(crate) fn new(scope: Entity, target: Entity) -> Self {
4094
Self { scope, target, _ignore: Default::default() }
4195
}

src/operation/scope.rs

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
Cancellation, Unreachability, InspectDisposals, execute_operation,
2323
BufferSettings, Buffer, CancellableBundle, OperationRoster, ManageCancellation,
2424
OperationError, OperationCancel, Cancel, UnhandledErrors, check_reachability,
25-
Blocker, Stream, StreamTargetStorage,
25+
Blocker, Stream, StreamTargetStorage, StreamRequest,
2626
};
2727

2828
use backtrace::Backtrace;
@@ -1046,11 +1046,70 @@ pub(crate) struct ExitTarget {
10461046
pub(crate) blocker: Option<Blocker>,
10471047
}
10481048

1049-
pub(crate) struct RedirectStream<T: Stream> {
1049+
pub(crate) struct RedirectScopeStream<T: Stream> {
10501050
_ignore: std::marker::PhantomData<T>,
10511051
}
10521052

1053-
impl<T: Stream> Operation for RedirectStream<T> {
1053+
impl<T: Stream> RedirectScopeStream<T> {
1054+
pub(crate) fn new() -> Self {
1055+
Self { _ignore: Default::default() }
1056+
}
1057+
}
1058+
1059+
impl<T: Stream> Operation for RedirectScopeStream<T> {
1060+
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
1061+
world.entity_mut(source).insert(
1062+
InputBundle::<T>::new(),
1063+
);
1064+
Ok(())
1065+
}
1066+
1067+
fn execute(
1068+
OperationRequest { source, world, roster }: OperationRequest,
1069+
) -> OperationResult {
1070+
let mut source_mut = world.get_entity_mut(source).or_broken()?;
1071+
let Input { session: scoped_session, data } = source_mut.take_input::<T>()?;
1072+
let scope = source_mut.get::<ScopeStorage>().or_broken()?.get();
1073+
let stream_target = world.get::<StreamTargetStorage<T>>(scope).or_broken()?.get();
1074+
let parent_session = world.get::<ParentSession>(scoped_session).or_broken()?.get();
1075+
data.send(StreamRequest {
1076+
source,
1077+
session: parent_session,
1078+
target: stream_target,
1079+
world,
1080+
roster
1081+
})
1082+
}
1083+
1084+
fn is_reachable(mut r: OperationReachability) -> ReachabilityResult {
1085+
if r.has_input::<T>()? {
1086+
return Ok(true);
1087+
}
1088+
1089+
let scope = r.world.get::<ScopeStorage>(r.source).or_broken()?.get();
1090+
r.check_upstream(scope)
1091+
1092+
// TODO(@mxgrey): Consider whether we can/should identify more
1093+
// specifically whether the current state of the scope would be able to
1094+
// reach this specific stream.
1095+
}
1096+
1097+
fn cleanup(mut clean: OperationCleanup) -> OperationResult {
1098+
clean.cleanup_inputs::<T>()
1099+
}
1100+
}
1101+
1102+
pub(crate) struct RedirectWorkflowStream<T: Stream> {
1103+
_ignore: std::marker::PhantomData<T>,
1104+
}
1105+
1106+
impl<T: Stream> RedirectWorkflowStream<T> {
1107+
pub(crate) fn new() -> Self {
1108+
Self { _ignore: Default::default() }
1109+
}
1110+
}
1111+
1112+
impl<T: Stream> Operation for RedirectWorkflowStream<T> {
10541113
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
10551114
world.entity_mut(source).insert(
10561115
InputBundle::<T>::new(),
@@ -1062,10 +1121,10 @@ impl<T: Stream> Operation for RedirectStream<T> {
10621121
OperationRequest { source, world, roster }: OperationRequest,
10631122
) -> OperationResult {
10641123
let mut source_mut = world.get_entity_mut(source).or_broken()?;
1065-
let Input { session, data } = source_mut.take_input::<T>()?;
1124+
let Input { session: scoped_session, data } = source_mut.take_input::<T>()?;
10661125
let scope = source_mut.get::<ScopeStorage>().or_broken()?.get();
10671126
let exit = world.get::<ExitTargetStorage>(scope).or_broken()?
1068-
.map.get(&session)
1127+
.map.get(&scoped_session)
10691128
// If the map does not have this session in it, that should simply
10701129
// mean that the workflow has terminated, so we should discard this
10711130
// stream data.
@@ -1078,9 +1137,13 @@ impl<T: Stream> Operation for RedirectStream<T> {
10781137
let stream_target = world.get::<StreamTargetStorage<T>>(exit_source)
10791138
.or_broken()?.get();
10801139

1081-
world.get_entity_mut(stream_target).or_broken()?.give_input(
1082-
parent_session, data, roster,
1083-
)
1140+
data.send(StreamRequest {
1141+
source,
1142+
session: parent_session,
1143+
target: stream_target,
1144+
world,
1145+
roster,
1146+
})
10841147
}
10851148

10861149
fn is_reachable(mut r: OperationReachability) -> ReachabilityResult {

0 commit comments

Comments
 (0)