Skip to content

Commit ca2d7e1

Browse files
committed
Add test and fix implementation for join
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent b088fe1 commit ca2d7e1

File tree

8 files changed

+226
-104
lines changed

8 files changed

+226
-104
lines changed

src/buffer/bufferable.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use smallvec::SmallVec;
1919

2020
use crate::{
2121
Buffer, CloneFromBuffer, Output, Builder, BufferSettings, Buffered, Join,
22-
UnusedTarget, AddOperation,
22+
UnusedTarget, AddOperation, Chain,
2323
};
2424

2525
pub trait Bufferable {
@@ -30,10 +30,10 @@ pub trait Bufferable {
3030
fn as_buffer(self, builder: &mut Builder) -> Self::BufferType;
3131

3232
/// Join these bufferable workflow elements.
33-
fn join(
33+
fn join<'w, 's, 'a, 'b>(
3434
self,
35-
builder: &mut Builder,
36-
) -> Output<<Self::BufferType as Buffered>::Item>
35+
builder: &'b mut Builder<'w, 's, 'a>,
36+
) -> Chain<'w, 's, 'a, 'b, <Self::BufferType as Buffered>::Item>
3737
where
3838
Self: Sized,
3939
Self::BufferType: 'static + Send + Sync,
@@ -46,7 +46,7 @@ pub trait Bufferable {
4646
Some(builder.scope()), join, Join::new(buffers, target)
4747
));
4848

49-
Output::new(builder.scope, target)
49+
Output::new(builder.scope, target).chain(builder)
5050
}
5151
}
5252

src/buffer/buffered.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use smallvec::SmallVec;
2121

2222
use crate::{
2323
Buffer, CloneFromBuffer, OperationError, OrBroken, InspectInput, ManageInput,
24+
OperationResult, ForkTargetStorage,
2425
};
2526

2627
pub trait Buffered: Clone {
@@ -37,6 +38,12 @@ pub trait Buffered: Clone {
3738
world: &mut World,
3839
) -> Result<Self::Item, OperationError>;
3940

41+
fn listen(
42+
&self,
43+
listener: Entity,
44+
world: &mut World,
45+
) -> OperationResult;
46+
4047
fn as_input(&self) -> SmallVec<[Entity; 8]>;
4148
}
4249

@@ -56,6 +63,20 @@ impl<T: 'static + Send + Sync> Buffered for Buffer<T> {
5663
.pull_from_buffer::<T>(session)
5764
}
5865

66+
fn listen(
67+
&self,
68+
listener: Entity,
69+
world: &mut World,
70+
) -> OperationResult {
71+
let mut targets = world
72+
.get_mut::<ForkTargetStorage>(self.source)
73+
.or_broken()?;
74+
targets.0.push(listener);
75+
targets.0.sort();
76+
targets.0.dedup();
77+
Ok(())
78+
}
79+
5980
fn as_input(&self) -> SmallVec<[Entity; 8]> {
6081
SmallVec::from_iter([self.source])
6182
}
@@ -81,6 +102,20 @@ impl<T: 'static + Send + Sync + Clone> Buffered for CloneFromBuffer<T> {
81102
.clone_from_buffer(session)
82103
}
83104

105+
fn listen(
106+
&self,
107+
listener: Entity,
108+
world: &mut World,
109+
) -> OperationResult {
110+
let mut targets = world
111+
.get_mut::<ForkTargetStorage>(self.source)
112+
.or_broken()?;
113+
targets.0.push(listener);
114+
targets.0.sort();
115+
targets.0.dedup();
116+
Ok(())
117+
}
118+
84119
fn as_input(&self) -> SmallVec<[Entity; 8]> {
85120
SmallVec::from_iter([self.source])
86121
}
@@ -113,6 +148,16 @@ where
113148
Ok((t0, t1))
114149
}
115150

151+
fn listen(
152+
&self,
153+
listener: Entity,
154+
world: &mut World,
155+
) -> OperationResult {
156+
self.0.listen(listener, world)?;
157+
self.1.listen(listener, world)?;
158+
Ok(())
159+
}
160+
116161
fn as_input(&self) -> SmallVec<[Entity; 8]> {
117162
let mut inputs = SmallVec::new();
118163
inputs.extend(self.0.as_input());
@@ -151,6 +196,17 @@ where
151196
Ok((t0, t1, t2))
152197
}
153198

199+
fn listen(
200+
&self,
201+
listener: Entity,
202+
world: &mut World,
203+
) -> OperationResult {
204+
self.0.listen(listener, world)?;
205+
self.1.listen(listener, world)?;
206+
self.2.listen(listener, world)?;
207+
Ok(())
208+
}
209+
154210
fn as_input(&self) -> SmallVec<[Entity; 8]> {
155211
let mut inputs = SmallVec::new();
156212
inputs.extend(self.0.as_input());
@@ -190,6 +246,17 @@ impl<T: Buffered, const N: usize> Buffered for [T; N] {
190246
}).collect()
191247
}
192248

249+
fn listen(
250+
&self,
251+
listener: Entity,
252+
world: &mut World,
253+
) -> OperationResult {
254+
for buffer in self {
255+
buffer.listen(listener, world)?;
256+
}
257+
Ok(())
258+
}
259+
193260
fn as_input(&self) -> SmallVec<[Entity; 8]> {
194261
self.iter().flat_map(|buffer| buffer.as_input()).collect()
195262
}
@@ -223,6 +290,17 @@ impl<T: Buffered, const N: usize> Buffered for SmallVec<[T; N]> {
223290
}).collect()
224291
}
225292

293+
fn listen(
294+
&self,
295+
listener: Entity,
296+
world: &mut World,
297+
) -> OperationResult {
298+
for buffer in self {
299+
buffer.listen(listener, world)?;
300+
}
301+
Ok(())
302+
}
303+
226304
fn as_input(&self) -> SmallVec<[Entity; 8]> {
227305
self.iter().flat_map(|buffer| buffer.as_input()).collect()
228306
}

0 commit comments

Comments
 (0)