Skip to content

Commit 885329d

Browse files
committed
Implementing impulse chains
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 73d21f0 commit 885329d

File tree

13 files changed

+365
-102
lines changed

13 files changed

+365
-102
lines changed

src/cancel.rs

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ use bevy::{
2222

2323
use backtrace::Backtrace;
2424

25-
use smallvec::SmallVec;
26-
2725
use std::sync::Arc;
2826

2927
use crate::{
@@ -197,18 +195,6 @@ impl From<Unreachability> for CancellationCause {
197195
}
198196
}
199197

200-
/// Signals that a cancellation has occurred. This can be read by receivers
201-
/// using [`try_receive_cancel()`](ManageCancellation).
202-
pub struct CancelSignal {
203-
pub session: Entity,
204-
pub cancellation: Cancellation,
205-
}
206-
207-
#[derive(Component, Default)]
208-
struct CancelSignalStorage {
209-
reverse_queue: SmallVec<[CancelSignal; 8]>,
210-
}
211-
212198
pub trait ManageCancellation {
213199
/// Have this node emit a signal to cancel the current scope.
214200
fn emit_cancel(
@@ -223,8 +209,6 @@ pub trait ManageCancellation {
223209
backtrace: Option<Backtrace>,
224210
roster: &mut OperationRoster,
225211
);
226-
227-
fn try_receive_cancel(&mut self) -> Result<Option<CancelSignal>, OperationError>;
228212
}
229213

230214
impl<'w> ManageCancellation for EntityMut<'w> {
@@ -263,11 +247,6 @@ impl<'w> ManageCancellation for EntityMut<'w> {
263247
});
264248
}
265249
}
266-
267-
fn try_receive_cancel(&mut self) -> Result<Option<CancelSignal>, OperationError> {
268-
let mut storage = self.get_mut::<CancelSignalStorage>().or_broken()?;
269-
Ok(storage.reverse_queue.pop())
270-
}
271250
}
272251

273252
fn try_emit_cancel(
@@ -311,13 +290,12 @@ pub struct OperationCancel<'a> {
311290
struct OperationCancelStorage(fn(OperationCancel) -> OperationResult);
312291

313292
#[derive(Bundle)]
314-
pub struct CancellableBundle {
315-
storage: CancelSignalStorage,
293+
pub struct Cancellable {
316294
cancel: OperationCancelStorage,
317295
}
318296

319-
impl CancellableBundle {
297+
impl Cancellable {
320298
pub fn new(cancel: fn(OperationCancel) -> OperationResult) -> Self {
321-
CancellableBundle { storage: Default::default(), cancel: OperationCancelStorage(cancel) }
299+
Cancellable { cancel: OperationCancelStorage(cancel) }
322300
}
323301
}

src/errors.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use backtrace::Backtrace;
2121

2222
use anyhow::Error as Anyhow;
2323

24-
use crate::{OperationError, Cancel, Disposal};
24+
use crate::{OperationError, Cancel, Disposal, Broken};
2525

2626
/// This resource stores errors that have occurred that could not be handled
2727
/// internally or communicated to the user by any other means.
@@ -31,10 +31,10 @@ pub struct UnhandledErrors {
3131
pub operations: Vec<OperationError>,
3232
pub disposals: Vec<DisposalFailure>,
3333
pub stop_tasks: Vec<StopTaskFailure>,
34+
pub broken: Vec<Broken>,
3435
pub miscellaneous: Vec<MiscellaneousFailure>,
3536
}
3637

37-
3838
pub struct CancelFailure {
3939
/// The error produced while the cancellation was happening
4040
pub error: OperationError,

src/handler.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,12 @@ impl<'a> HandleRequest<'a> {
113113
Ok(())
114114
}
115115

116-
fn get_channel<Streams: StreamPack>(&mut self) -> Result<Channel<Streams>, OperationError> {
116+
fn get_channel<Streams: StreamPack>(
117+
&mut self,
118+
session: Entity,
119+
) -> Result<Channel<Streams>, OperationError> {
117120
let sender = self.world.get_resource_or_insert_with(|| ChannelQueue::new()).sender.clone();
118-
let channel = InnerChannel::new(self.source, sender);
121+
let channel = InnerChannel::new(self.source, session, sender);
119122
channel.into_specific(&self.world)
120123
}
121124
}
@@ -188,7 +191,7 @@ where
188191
fn handle(&mut self, mut input: HandleRequest) -> Result<(), OperationError> {
189192
let Input { session, data: request } = input.get_request()?;
190193

191-
let channel = input.get_channel()?;
194+
let channel = input.get_channel(session)?;
192195

193196
if !self.initialized {
194197
self.system.initialize(&mut input.world);
@@ -300,7 +303,7 @@ where
300303
fn as_handler(mut self) -> Handler<Self::Request, Self::Response, Self::Streams> {
301304
let callback = move |mut input: HandleRequest| {
302305
let Input { session, data: request } = input.get_request::<Self::Request>()?;
303-
let channel = input.get_channel()?;
306+
let channel = input.get_channel(session)?;
304307
let task = (self)(AsyncHandler { request, channel });
305308
input.give_task(session, task)
306309
};

src/target.rs renamed to src/impulse.rs

Lines changed: 83 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,37 @@
1515
*
1616
*/
1717

18-
use bevy::prelude::{Commands, Entity, Bundle, Component, Deref, DerefMut};
18+
use bevy::prelude::{
19+
Commands, Entity, Bundle, Component, BuildChildren, Event,
20+
};
1921

2022
use std::future::Future;
2123

2224
use crate::{
2325
Promise, Provider, StreamPack, Detach, TakenResponse, AddOperation,
24-
IntoBlockingMap, IntoAsyncMap,
26+
IntoBlockingMap, IntoAsyncMap, ImpulseProperties, UnusedTarget, StoreResponse,
27+
PushResponse,
2528
};
2629

30+
pub mod traits;
31+
pub use traits::*;
2732

28-
pub struct Target<'w, 's, 'a, Response, Streams> {
29-
session: Entity,
30-
commands: &'a mut Commands<'w, 's>,
31-
_ignore: std::marker::PhantomData<(Response, Streams)>
33+
/// Impulses can be chained as a simple sequence of [providers](Provider).
34+
pub struct Impulse<'w, 's, 'a, Response, Streams> {
35+
pub(crate) source: Entity,
36+
pub(crate) session: Entity,
37+
pub(crate) commands: &'a mut Commands<'w, 's>,
38+
pub(crate) _ignore: std::marker::PhantomData<(Response, Streams)>
3239
}
3340

34-
impl<'w, 's, 'a, Response, Streams> Target<'w, 's, 'a, Response, Streams>
41+
impl<'w, 's, 'a, Response, Streams> Impulse<'w, 's, 'a, Response, Streams>
3542
where
3643
Response: 'static + Send + Sync,
3744
Streams: StreamPack,
3845
{
39-
/// If the target is dropped, the request will not be cancelled.
40-
pub fn detach(self) -> Target<'w, 's, 'a, Response, Streams> {
46+
/// Keep carrying out the impulse chain up to here even if a downstream
47+
/// dependent was dropped.
48+
pub fn detach(self) -> Impulse<'w, 's, 'a, Response, Streams> {
4149
self.commands.add(Detach { session: self.session });
4250
self
4351
}
@@ -51,7 +59,7 @@ where
5159
TakenResponse::<Response>::new(response_sender),
5260
));
5361
let (bundle, stream_receivers) = Streams::make_receiver(self.session, self.commands);
54-
self.commands.entity(self.session).insert(bundle);
62+
self.commands.entity(self.source).insert(bundle);
5563

5664
Recipient {
5765
response: response_promise,
@@ -64,8 +72,23 @@ where
6472
pub fn then<P: Provider<Request = Response>>(
6573
self,
6674
provider: P,
67-
) -> Target<'w, 's, 'a, P::Response, P::Streams> {
68-
75+
) -> Impulse<'w, 's, 'a, P::Response, P::Streams> {
76+
let source = self.session;
77+
let session = self.commands.spawn((
78+
ImpulseProperties::new(),
79+
UnusedTarget,
80+
)).id();
81+
82+
// We should automatically delete the previous step in the chain once
83+
// this one is finished.
84+
self.commands.entity(source).set_parent(session);
85+
provider.connect(source, session, self.commands);
86+
Impulse {
87+
source,
88+
session,
89+
commands: self.commands,
90+
_ignore: Default::default(),
91+
}
6992
}
7093

7194
/// Apply a one-time callback whose input is the Response of the current
@@ -74,10 +97,11 @@ where
7497
///
7598
/// This takes in a regular blocking function, which means all systems will
7699
/// be blocked from running while the function gets executed.
100+
#[must_use]
77101
pub fn map_block<U>(
78102
self,
79103
f: impl FnOnce(Response) -> U + 'static + Send + Sync,
80-
) -> Target<'w, 's, 'a, U, ()>
104+
) -> Impulse<'w, 's, 'a, U, ()>
81105
where
82106
U: 'static + Send + Sync,
83107
{
@@ -89,10 +113,11 @@ where
89113
/// the Response of the returned target.
90114
///
91115
/// [1]: bevy::tasks::AsyncComputeTaskPool
116+
#[must_use]
92117
pub fn map_async<Task>(
93118
self,
94119
f: impl FnOnce(Response) -> Task + 'static + Send + Sync,
95-
) -> Target<'w, 's, 'a, Task::Output, ()>
120+
) -> Impulse<'w, 's, 'a, Task::Output, ()>
96121
where
97122
Task: Future + 'static + Send + Sync,
98123
Task::Output: 'static + Send + Sync,
@@ -105,20 +130,29 @@ where
105130
/// If the entity despawns then the request gets cancelled unless you used
106131
/// [`Self::detach`] before calling this.
107132
pub fn store(self, target: Entity) {
108-
133+
self.commands.add(AddOperation::new(
134+
self.session,
135+
StoreResponse::<Response>::new(target),
136+
));
109137
}
110138

111-
/// Append the response to the back of a [`Storage<Vec<T>>`] component in an
139+
/// Push the response to the back of a [`Storage<Vec<T>>`] component in an
112140
/// entity.
113141
///
114142
/// If the entity despawns then the request gets cancelled unless you used
115143
/// [`Self::detach`] before calling this.
116-
pub fn append(self, target: Entity) {
117-
144+
pub fn push(self, target: Entity) {
145+
self.commands.add(AddOperation::new(
146+
self.session,
147+
PushResponse::<Response>::new(target),
148+
));
118149
}
150+
151+
// TODO(@mxgrey): Offer an on_cancel method that lets users provide a
152+
// callback to be triggered when a cancellation happens.
119153
}
120154

121-
impl<'w, 's, 'a, Response, Streams> Target<'w, 's, 'a, Response, Streams>
155+
impl<'w, 's, 'a, Response, Streams> Impulse<'w, 's, 'a, Response, Streams>
122156
where
123157
Response: Bundle,
124158
{
@@ -135,11 +169,38 @@ where
135169
}
136170
}
137171

172+
impl<'w, 's, 'a, Response, Streams> Impulse<'w, 's, 'a, Response, Streams>
173+
where
174+
Response: Event,
175+
{
176+
/// Send the response out as an event once it is ready. Using this will also
177+
/// effectively [detach](Self::detach) the impulse.
178+
pub fn send(self) {
179+
180+
}
181+
}
182+
138183
pub struct Recipient<Response, Streams: StreamPack> {
139184
pub response: Promise<Response>,
140185
pub streams: Streams::Receiver,
141186
}
142187

143-
/// Used to store the response of a
144-
#[derive(Component, Deref, DerefMut)]
145-
pub struct Storage<T>(pub T);
188+
/// Used to store a response of an impulse as a component of an entity.
189+
#[derive(Component)]
190+
pub struct Storage<T> {
191+
pub data: T,
192+
pub session: Entity,
193+
}
194+
195+
/// Used to collect responses from multiple impulse chains into a container.
196+
#[derive(Component)]
197+
pub struct Collection<T> {
198+
/// The items that have been collected.
199+
pub items: Vec<Storage<T>>,
200+
}
201+
202+
impl<T> Default for Collection<T> {
203+
fn default() -> Self {
204+
Self { items: Default::default() }
205+
}
206+
}

src/impulse/traits.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::{
19+
prelude::{World, Entity},
20+
ecs::system::Command,
21+
};
22+
23+
use crate::{
24+
OperationSetup, OperationRequest, OperationResult, OperationCancel,
25+
OperationExecuteStorage, OperationError, Cancellable,
26+
};
27+
28+
pub trait Impulsive {
29+
fn setup(self, info: OperationSetup) -> OperationResult;
30+
31+
fn execute(request: OperationRequest) -> OperationResult;
32+
}
33+
34+
pub struct AddImpulse<I: Impulsive> {
35+
source: Entity,
36+
impulse: I,
37+
}
38+
39+
impl<I: Impulsive + 'static + Sync + Send> Command for AddImpulse<I> {
40+
fn apply(self, world: &mut World) {
41+
self.impulse.setup(OperationSetup { source: self.source, world });
42+
world.entity_mut(self.source).insert((
43+
OperationExecuteStorage(perform_impulse::<I>),
44+
Cancellable::new(cancel_impulse),
45+
));
46+
}
47+
}
48+
49+
fn perform_impulse<I: Impulsive>(
50+
OperationRequest { source, world, roster }: OperationRequest,
51+
) {
52+
match I::execute(OperationRequest { source, world, roster }) {
53+
Ok(()) => {
54+
// Do nothing
55+
}
56+
Err(OperationError::NotReady) => {
57+
// Do nothing
58+
}
59+
Err(OperationError::Broken(backtrace)) => {
60+
61+
}
62+
}
63+
}
64+
65+
fn cancel_impulse(
66+
OperationCancel { cancel, world, roster }: OperationCancel,
67+
) -> OperationResult {
68+
find the last impulse in the chain and then trigger the cancellation and
69+
recursively delete everything
70+
}

src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ pub use flush::*;
3939
pub mod handler;
4040
pub use handler::*;
4141

42+
pub mod impulse;
43+
pub use impulse::*;
44+
4245
pub mod input;
4346
pub use input::*;
4447

@@ -66,9 +69,6 @@ pub use service::*;
6669
pub mod stream;
6770
pub use stream::*;
6871

69-
pub mod target;
70-
pub use target::*;
71-
7272
pub mod workflow;
7373
pub use workflow::*;
7474

0 commit comments

Comments
 (0)