Skip to content

Commit 2aba865

Browse files
committed
Introduce buffer implementation
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent b040f64 commit 2aba865

24 files changed

+696
-331
lines changed

src/buffer.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::prelude::Entity;
19+
20+
/// A buffer is a special type of node within a workflow that is able to store
21+
/// and release data. When a session is finished, the buffered data from the
22+
/// session will be automatically cleared.
23+
pub struct Buffer<T> {
24+
pub(crate) scope: Entity,
25+
pub(crate) source: Entity,
26+
pub(crate) _ignore: std::marker::PhantomData<T>,
27+
}
28+
29+
/// Settings to describe the behavior of a buffer.
30+
#[derive(Default, Clone, Copy)]
31+
pub struct BufferSettings {
32+
retention: RetentionPolicy,
33+
}
34+
35+
impl BufferSettings {
36+
/// Get the retention policy for the buffer.
37+
pub fn retention(&self) -> RetentionPolicy {
38+
self.retention
39+
}
40+
}
41+
42+
/// Describe how data within a buffer gets retained. Most mechanisms that pull
43+
/// data from a buffer will remove the oldest item in the buffer, so this policy
44+
/// is for dealing with situations where items are being stored faster than they
45+
/// are being pulled.
46+
///
47+
/// The default value is KeepLast(1).
48+
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
49+
pub enum RetentionPolicy {
50+
/// Keep the last N items that were stored into the buffer. Once the limit
51+
/// is reached, the oldest item will be removed any time a new item arrives.
52+
KeepLast(usize),
53+
/// Keep the first N items that are stored into the buffer. Once the limit
54+
/// is reached, any new item that arrives will be discarded.
55+
KeepFirst(usize),
56+
/// Do not limit how many items can be stored in the buffer.
57+
KeepAll,
58+
}
59+
60+
impl Default for RetentionPolicy {
61+
fn default() -> Self {
62+
Self::KeepLast(1)
63+
}
64+
}

src/builder.rs

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ use bevy::prelude::{Entity, Commands};
1919

2020
use crate::{
2121
Provider, UnusedTarget, StreamPack, Node, InputSlot, Output, StreamTargetMap,
22+
Buffer, BufferSettings, AddOperation, OperateBuffer,
2223
};
2324

24-
pub(crate) mod internal;
25-
pub(crate) use internal::*;
25+
pub(crate) mod connect;
26+
pub(crate) use connect::*;
2627

2728
/// Device used for building a workflow. Simply pass a mutable borrow of this
2829
/// into any functions which ask for it.
@@ -38,6 +39,10 @@ pub struct Builder<'w, 's, 'a> {
3839
}
3940

4041
impl<'w, 's, 'a> Builder<'w, 's, 'a> {
42+
/// Create a node for a provider. This will give access to an input slot, an
43+
/// output slots, and a pack of stream outputs which can all be connected to
44+
/// other nodes.
45+
#[must_use]
4146
pub fn create_node<P: Provider>(
4247
&mut self,
4348
provider: P,
@@ -63,6 +68,54 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
6368
}
6469
}
6570

71+
/// Connect the output of one into the input slot of another node.
72+
pub fn connect<T: 'static + Send + Sync>(
73+
&mut self,
74+
output: Output<T>,
75+
input: InputSlot<T>,
76+
) {
77+
assert_eq!(output.scope(), input.scope());
78+
self.commands.add(Connect {
79+
original_target: output.id(),
80+
new_target: input.id(),
81+
});
82+
}
83+
84+
/// Create a [`Buffer`] which can be used to store and pull data within
85+
/// a scope. This is often used along with joining to synchronize multiple
86+
/// branches.
87+
pub fn create_buffer<T: 'static + Send + Sync>(
88+
&mut self,
89+
settings: BufferSettings,
90+
) -> Buffer<T> {
91+
let source = self.commands.spawn(()).id();
92+
self.commands.add(AddOperation::new(
93+
source,
94+
OperateBuffer::<T>::new(settings),
95+
));
96+
97+
Buffer { scope: self.scope, source, _ignore: Default::default() }
98+
}
99+
100+
/// Create a [`Buffer`] which can be used to store and pull data within a
101+
/// scope. Unlike the buffer created by [`Self::create_buffer`], this will
102+
/// give a clone of the latest items to nodes that pull from it instead of
103+
/// the items being consumed. That means the items are reused by default,
104+
/// which may be useful if you want some data to persist across multiple
105+
/// uses.
106+
pub fn create_cloning_buffer<T: 'static + Send + Sync + Clone>(
107+
&mut self,
108+
settings: BufferSettings,
109+
) -> Buffer<T> {
110+
let source = self.commands.spawn(()).id();
111+
self.commands.add(AddOperation::new(
112+
source,
113+
OperateBuffer::<T>::new_cloning(settings),
114+
));
115+
116+
Buffer { scope: self.scope, source, _ignore: Default::default() }
117+
}
118+
66119
/// Get the scope that this builder is building for
67120
pub fn scope(&self) -> Entity {
68121
self.scope

src/builder/connect.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::{
19+
prelude::{Entity, World},
20+
ecs::system::Command,
21+
};
22+
23+
use backtrace::Backtrace;
24+
25+
use crate::{
26+
SingleInputStorage, SingleTargetStorage, ForkTargetStorage, StreamTargetMap,
27+
OperationResult, OperationError, OrBroken, UnhandledErrors, ConnectionFailure,
28+
};
29+
30+
/// If two nodes have been created, they will each have a unique source and a
31+
/// target entity allocated to them. If we want to connect them, then we want
32+
/// the target of one to no longer be unique - we instead want it to be the
33+
/// source entity of the other. This [`Command`] redirects the target information
34+
/// of the sending node to target the source entity of the receiving node.
35+
#[derive(Clone, Copy)]
36+
pub(crate) struct Connect {
37+
pub(crate) original_target: Entity,
38+
pub(crate) new_target: Entity,
39+
}
40+
41+
impl Command for Connect {
42+
fn apply(self, world: &mut World) {
43+
if let Err(OperationError::Broken(backtrace)) = try_connect(self, world) {
44+
world.get_resource_or_insert_with(|| UnhandledErrors::default())
45+
.connections
46+
.push(ConnectionFailure {
47+
original_target: self.original_target,
48+
new_target: self.new_target,
49+
backtrace: backtrace.unwrap_or_else(|| Backtrace::new()),
50+
})
51+
}
52+
}
53+
}
54+
55+
fn try_connect(connect: Connect, world: &mut World) -> OperationResult {
56+
let inputs = world.get::<SingleInputStorage>(connect.original_target)
57+
.or_broken()?.get().clone();
58+
59+
for input in inputs {
60+
let mut input_mut = world.get_entity_mut(input).or_broken()?;
61+
62+
if let Some(mut target) = input_mut.get_mut::<SingleTargetStorage>() {
63+
target.set(connect.new_target);
64+
}
65+
66+
if let Some(mut targets) = input_mut.get_mut::<ForkTargetStorage>() {
67+
for target in &mut targets.0 {
68+
if *target == connect.original_target {
69+
*target = connect.new_target;
70+
}
71+
}
72+
}
73+
74+
if let Some(mut targets) = input_mut.get_mut::<StreamTargetMap>() {
75+
for target in &mut targets.map {
76+
if *target == connect.original_target {
77+
*target = connect.new_target;
78+
}
79+
}
80+
}
81+
}
82+
83+
Ok(())
84+
}

src/builder/internal.rs

Lines changed: 0 additions & 34 deletions
This file was deleted.

src/cancel.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl From<Broken> for CancellationCause {
126126
/// Passed into the [`OperationRoster`](crate::OperationRoster) to pass a cancel
127127
/// signal into the target.
128128
#[derive(Debug, Clone)]
129-
pub(crate) struct Cancel {
129+
pub struct Cancel {
130130
/// The entity that triggered the cancellation
131131
pub(crate) source: Entity,
132132
/// The target of the cancellation
@@ -160,15 +160,18 @@ impl Cancel {
160160
) -> Result<(), CancelFailure> {
161161
if let Some(cancel) = world.get::<OperationCancelStorage>(self.target) {
162162
let cancel = cancel.0;
163-
(cancel)(OperationCancel { cancel: self, world, roster });
163+
// TODO(@mxgrey): Figure out a way to structure this so we don't
164+
// need to always clone self.
165+
return (cancel)(OperationCancel { cancel: self.clone(), world, roster })
166+
.map_err(|error| {
167+
CancelFailure::new(error, self)
168+
});
164169
} else {
165170
return Err(CancelFailure::new(
166171
OperationError::Broken(Some(Backtrace::new())),
167172
self,
168173
));
169174
}
170-
171-
Ok(())
172175
}
173176
}
174177

0 commit comments

Comments
 (0)