Skip to content

Commit c779f5a

Browse files
committed
Reimplementing Join using buffers
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 2aba865 commit c779f5a

File tree

13 files changed

+561
-309
lines changed

13 files changed

+561
-309
lines changed

src/buffer.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717

1818
use bevy::prelude::Entity;
1919

20+
use crate::{Builder, Chain, UnusedTarget, OnNewBufferValue, InputSlot};
21+
22+
mod buffered;
23+
pub use buffered::*;
24+
25+
mod bufferable;
26+
pub use bufferable::*;
27+
2028
/// A buffer is a special type of node within a workflow that is able to store
2129
/// and release data. When a session is finished, the buffered data from the
2230
/// session will be automatically cleared.
@@ -26,6 +34,44 @@ pub struct Buffer<T> {
2634
pub(crate) _ignore: std::marker::PhantomData<T>,
2735
}
2836

37+
impl<T> Buffer<T> {
38+
/// Get a unit `()` trigger output each time a new value is added to the buffer.
39+
pub fn on_new_value<'w, 's, 'a, 'b>(
40+
&self,
41+
builder: &'b mut Builder<'w, 's, 'a>
42+
) -> Chain<'w, 's, 'a, 'b, ()> {
43+
assert_eq!(self.scope, builder.scope);
44+
let target = builder.commands.spawn(UnusedTarget).id();
45+
builder.commands.add(OnNewBufferValue::new(self.source, target));
46+
Chain::new(target, builder)
47+
}
48+
49+
/// Specify that you want to pull from this Buffer by cloning. This can be
50+
/// used by operations like join to tell them that they should clone from
51+
/// the buffer instead of consuming from it.
52+
pub fn by_cloning(self) -> CloneFromBuffer<T>
53+
where
54+
T: Clone,
55+
{
56+
CloneFromBuffer {
57+
scope: self.scope,
58+
source: self.source,
59+
_ignore: Default::default()
60+
}
61+
}
62+
63+
/// Get an input slot for this buffer.
64+
pub fn input_slot(self) -> InputSlot<T> {
65+
InputSlot::new(self.scope, self.source)
66+
}
67+
}
68+
69+
pub struct CloneFromBuffer<T: Clone> {
70+
pub(crate) scope: Entity,
71+
pub(crate) source: Entity,
72+
pub(crate) _ignore: std::marker::PhantomData<T>,
73+
}
74+
2975
/// Settings to describe the behavior of a buffer.
3076
#[derive(Default, Clone, Copy)]
3177
pub struct BufferSettings {
@@ -62,3 +108,27 @@ impl Default for RetentionPolicy {
62108
Self::KeepLast(1)
63109
}
64110
}
111+
112+
impl<T> Clone for Buffer<T> {
113+
fn clone(&self) -> Self {
114+
Self {
115+
scope: self.scope,
116+
source: self.source,
117+
_ignore: Default::default(),
118+
}
119+
}
120+
}
121+
122+
impl<T> Copy for Buffer<T> {}
123+
124+
impl<T: Clone> Clone for CloneFromBuffer<T> {
125+
fn clone(&self) -> Self {
126+
Self {
127+
scope: self.scope,
128+
source: self.source,
129+
_ignore: Default::default(),
130+
}
131+
}
132+
}
133+
134+
impl<T: Clone> Copy for CloneFromBuffer<T> {}

src/buffer/bufferable.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use crate::{
19+
Buffer, CloneFromBuffer, Output, Builder, BufferSettings, Buffered, Join,
20+
UnusedTarget, AddOperation,
21+
};
22+
23+
pub trait Bufferable {
24+
type BufferType: Buffered;
25+
26+
/// Convert these bufferable workflow elements into buffers if they are not
27+
/// buffers already.
28+
fn as_buffer(self, builder: &mut Builder) -> Self::BufferType;
29+
30+
/// Join these bufferable workflow elements.
31+
fn join(
32+
self,
33+
builder: &mut Builder,
34+
) -> Output<<Self::BufferType as Buffered>::Item>
35+
where
36+
Self: Sized,
37+
Self::BufferType: 'static + Send + Sync,
38+
<Self::BufferType as Buffered>::Item: 'static + Send + Sync,
39+
{
40+
let buffers = self.as_buffer(builder);
41+
let join = builder.commands.spawn(()).id();
42+
let target = builder.commands.spawn(UnusedTarget).id();
43+
builder.commands.add(AddOperation::new(join, Join::new(buffers, target)));
44+
45+
Output::new(builder.scope, target)
46+
}
47+
}
48+
49+
impl<T: 'static + Send + Sync> Bufferable for Buffer<T> {
50+
type BufferType = Self;
51+
fn as_buffer(self, builder: &mut Builder) -> Self::BufferType {
52+
assert_eq!(self.scope, builder.scope());
53+
self
54+
}
55+
}
56+
57+
impl<T: 'static + Send + Sync + Clone> Bufferable for CloneFromBuffer<T> {
58+
type BufferType = Self;
59+
fn as_buffer(self, builder: &mut Builder) -> Self::BufferType {
60+
assert_eq!(self.scope, builder.scope());
61+
self
62+
}
63+
}
64+
65+
impl<T: 'static + Send + Sync> Bufferable for Output<T> {
66+
type BufferType = Buffer<T>;
67+
fn as_buffer(self, builder: &mut Builder) -> Self::BufferType {
68+
assert_eq!(self.scope(), builder.scope());
69+
let buffer = builder.create_buffer::<T>(BufferSettings::default());
70+
builder.connect(self, buffer.input_slot());
71+
buffer
72+
}
73+
}
74+
75+
impl<T0, T1> Bufferable for (T0, T1)
76+
where
77+
T0: Bufferable,
78+
T1: Bufferable,
79+
{
80+
type BufferType = (T0::BufferType, T1::BufferType);
81+
fn as_buffer(self, builder: &mut Builder) -> Self::BufferType {
82+
(
83+
self.0.as_buffer(builder),
84+
self.1.as_buffer(builder),
85+
)
86+
}
87+
}
88+
89+
impl<T0, T1, T2> Bufferable for (T0, T1, T2)
90+
where
91+
T0: Bufferable,
92+
T1: Bufferable,
93+
T2: Bufferable,
94+
{
95+
type BufferType = (T0::BufferType, T1::BufferType, T2::BufferType);
96+
fn as_buffer(self, builder: &mut Builder) -> Self::BufferType {
97+
(
98+
self.0.as_buffer(builder),
99+
self.1.as_buffer(builder),
100+
self.2.as_buffer(builder),
101+
)
102+
}
103+
}
104+
105+
106+
impl<T: Bufferable, const N: usize> Bufferable for [T; N] {
107+
type BufferType = [T::BufferType; N];
108+
fn as_buffer(self, builder: &mut Builder) -> Self::BufferType {
109+
self.map(|b| b.as_buffer(builder))
110+
}
111+
}

src/buffer/buffered.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::prelude::{Entity, World};
19+
20+
use smallvec::SmallVec;
21+
22+
use crate::{
23+
Buffer, CloneFromBuffer, OperationError, OrBroken, InspectInput, ManageInput,
24+
};
25+
26+
pub trait Buffered: Copy {
27+
fn buffered_count(
28+
&self,
29+
session: Entity,
30+
world: &World,
31+
) -> Result<usize, OperationError>;
32+
33+
type Item;
34+
fn pull(
35+
&self,
36+
session: Entity,
37+
world: &mut World,
38+
) -> Result<Self::Item, OperationError>;
39+
40+
fn as_input(&self) -> SmallVec<[Entity; 8]>;
41+
}
42+
43+
impl<T: 'static + Send + Sync> Buffered for Buffer<T> {
44+
fn buffered_count(&self, session: Entity, world: &World) -> Result<usize, OperationError> {
45+
world.get_entity(self.source).or_broken()?
46+
.buffered_count::<T>(session)
47+
}
48+
49+
type Item = T;
50+
fn pull(
51+
&self,
52+
session: Entity,
53+
world: &mut World,
54+
) -> Result<Self::Item, OperationError> {
55+
world.get_entity_mut(self.source).or_broken()?
56+
.pull_from_buffer::<T>(session)
57+
}
58+
59+
fn as_input(&self) -> SmallVec<[Entity; 8]> {
60+
SmallVec::from_iter([self.source])
61+
}
62+
}
63+
64+
impl<T: 'static + Send + Sync + Clone> Buffered for CloneFromBuffer<T> {
65+
fn buffered_count(
66+
&self,
67+
session: Entity,
68+
world: &World,
69+
) -> Result<usize, OperationError> {
70+
world.get_entity(self.source).or_broken()?
71+
.buffered_count::<T>(session)
72+
}
73+
74+
type Item = T;
75+
fn pull(
76+
&self,
77+
session: Entity,
78+
world: &mut World,
79+
) -> Result<Self::Item, OperationError> {
80+
world.get_entity(self.source).or_broken()?
81+
.clone_from_buffer(session)
82+
}
83+
84+
fn as_input(&self) -> SmallVec<[Entity; 8]> {
85+
SmallVec::from_iter([self.source])
86+
}
87+
}
88+
89+
impl<T0, T1> Buffered for (T0, T1)
90+
where
91+
T0: Buffered,
92+
T1: Buffered,
93+
{
94+
fn buffered_count(
95+
&self,
96+
session: Entity,
97+
world: &World,
98+
) -> Result<usize, OperationError> {
99+
Ok([
100+
self.0.buffered_count(session, world)?,
101+
self.1.buffered_count(session, world)?,
102+
].iter().copied().min().unwrap_or(0))
103+
}
104+
105+
type Item = (T0::Item, T1::Item);
106+
fn pull(
107+
&self,
108+
session: Entity,
109+
world: &mut World,
110+
) -> Result<Self::Item, OperationError> {
111+
let t0 = self.0.pull(session, world)?;
112+
let t1 = self.1.pull(session, world)?;
113+
Ok((t0, t1))
114+
}
115+
116+
fn as_input(&self) -> SmallVec<[Entity; 8]> {
117+
let mut inputs = SmallVec::new();
118+
inputs.extend(self.0.as_input());
119+
inputs.extend(self.1.as_input());
120+
inputs
121+
}
122+
}
123+
124+
impl<T0, T1, T2> Buffered for (T0, T1, T2)
125+
where
126+
T0: Buffered,
127+
T1: Buffered,
128+
T2: Buffered,
129+
{
130+
fn buffered_count(
131+
&self,
132+
session: Entity,
133+
world: &World,
134+
) -> Result<usize, OperationError> {
135+
Ok([
136+
self.0.buffered_count(session, world)?,
137+
self.1.buffered_count(session, world)?,
138+
self.2.buffered_count(session, world)?,
139+
].iter().copied().min().unwrap_or(0))
140+
}
141+
142+
type Item = (T0::Item, T1::Item, T2::Item);
143+
fn pull(
144+
&self,
145+
session: Entity,
146+
world: &mut World,
147+
) -> Result<Self::Item, OperationError> {
148+
let t0 = self.0.pull(session, world)?;
149+
let t1 = self.1.pull(session, world)?;
150+
let t2 = self.2.pull(session, world)?;
151+
Ok((t0, t1, t2))
152+
}
153+
154+
fn as_input(&self) -> SmallVec<[Entity; 8]> {
155+
let mut inputs = SmallVec::new();
156+
inputs.extend(self.0.as_input());
157+
inputs.extend(self.1.as_input());
158+
inputs.extend(self.2.as_input());
159+
inputs
160+
}
161+
}
162+
163+
impl<T: Buffered, const N: usize> Buffered for [T; N] {
164+
fn buffered_count(
165+
&self,
166+
session: Entity,
167+
world: &World,
168+
) -> Result<usize, OperationError> {
169+
let mut min_count = None;
170+
for buffer in self.iter() {
171+
let count = buffer.buffered_count(session, world)?;
172+
if !min_count.is_some_and(|min| min < count) {
173+
min_count = Some(count);
174+
}
175+
}
176+
177+
Ok(min_count.unwrap_or(0))
178+
}
179+
180+
// TODO(@mxgrey) We may be able to use [T::Item; N] here instead of SmallVec
181+
// when try_map is stabilized: https://github.com/rust-lang/rust/issues/79711
182+
type Item = SmallVec<[T::Item; N]>;
183+
fn pull(
184+
&self,
185+
session: Entity,
186+
world: &mut World,
187+
) -> Result<Self::Item, OperationError> {
188+
self.iter().map(|buffer| {
189+
buffer.pull(session, world)
190+
}).collect()
191+
}
192+
193+
fn as_input(&self) -> SmallVec<[Entity; 8]> {
194+
self.iter().flat_map(|buffer| buffer.as_input()).collect()
195+
}
196+
}

0 commit comments

Comments
 (0)