Skip to content

Commit f91972a

Browse files
committed
Reorganizing impulse chain implementation
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 885329d commit f91972a

File tree

17 files changed

+783
-412
lines changed

17 files changed

+783
-412
lines changed

src/cancel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use backtrace::Backtrace;
2525
use std::sync::Arc;
2626

2727
use crate::{
28-
Disposal, Filtered, OperationError, ScopeStorage, OrBroken, CancelFailure,
28+
Disposal, Filtered, OperationError, ScopeStorage, CancelFailure,
2929
OperationResult, OperationRoster, Supplanted, UnhandledErrors,
3030
};
3131

src/channel.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl InnerChannel {
9898
world: &World,
9999
) -> Result<Channel<Streams>, OperationError> {
100100
let inner = Arc::new(self);
101-
let streams = Streams::make_channel(&inner, world)?;
101+
let streams = Streams::make_channel(&inner, world);
102102
Ok(Channel { inner, streams, _ignore: Default::default() })
103103
}
104104
}
@@ -126,7 +126,7 @@ impl Default for ChannelQueue {
126126

127127
/// Use this channel to stream data using the [`StreamChannel::send`] method.
128128
pub struct StreamChannel<T> {
129-
target: Entity,
129+
target: Option<Entity>,
130130
inner: Arc<InnerChannel>,
131131
_ignore: std::marker::PhantomData<T>,
132132
}
@@ -144,7 +144,7 @@ impl<T: Stream> StreamChannel<T> {
144144
));
145145
}
146146

147-
pub(crate) fn new(target: Entity, inner: Arc<InnerChannel>) -> Self {
147+
pub(crate) fn new(target: Option<Entity>, inner: Arc<InnerChannel>) -> Self {
148148
Self { target, inner, _ignore: Default::default() }
149149
}
150150
}

src/flush.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use smallvec::SmallVec;
2626

2727
use crate::{
2828
ChannelQueue, WakeQueue, OperationRoster, ServiceHook, InputReady,
29-
Cancel, DroppedPromiseQueue, UnusedTarget, ServiceLifecycle, ServiceLifecycleQueue,
29+
Cancel, DroppedPromiseQueue, UnusedTarget, ServiceLifecycle, ServiceLifecycleChannel,
3030
OperationRequest,
3131
execute_operation, dispose_for_despawned_service,
3232
};
@@ -39,8 +39,8 @@ pub fn flush_impulses(
3939
) {
4040
let mut roster = OperationRoster::new();
4141

42-
world.get_resource_or_insert_with(|| ServiceLifecycleQueue::new());
43-
world.resource_scope::<ServiceLifecycleQueue, ()>(|world, lifecycles| {
42+
world.get_resource_or_insert_with(|| ServiceLifecycleChannel::new());
43+
world.resource_scope::<ServiceLifecycleChannel, ()>(|world, lifecycles| {
4444
// Clean up the dangling requests of any services that have been despawned.
4545
for removed_service in lifecycles.receiver.try_iter() {
4646
dispose_for_despawned_service(removed_service, world, &mut roster)

src/impulse.rs

Lines changed: 89 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,26 @@ use bevy::prelude::{
2222
use std::future::Future;
2323

2424
use crate::{
25-
Promise, Provider, StreamPack, Detach, TakenResponse, AddOperation,
26-
IntoBlockingMap, IntoAsyncMap, ImpulseProperties, UnusedTarget, StoreResponse,
27-
PushResponse,
25+
Promise, Provider, StreamPack, IntoBlockingMap, IntoAsyncMap, UnusedTarget,
2826
};
2927

30-
pub mod traits;
31-
pub use traits::*;
28+
mod detach;
29+
pub(crate) use detach::*;
30+
31+
mod insert;
32+
pub(crate) use insert::*;
33+
34+
mod internal;
35+
pub(crate) use internal::*;
36+
37+
mod push;
38+
pub(crate) use push::*;
39+
40+
mod store;
41+
pub(crate) use store::*;
42+
43+
mod taken;
44+
pub(crate) use taken::*;
3245

3346
/// Impulses can be chained as a simple sequence of [providers](Provider).
3447
pub struct Impulse<'w, 's, 'a, Response, Streams> {
@@ -50,15 +63,16 @@ where
5063
self
5164
}
5265

53-
/// Take the data that comes out of the request.
66+
/// Take the data that comes out of the request, including both the response
67+
/// and the streams.
5468
#[must_use]
5569
pub fn take(self) -> Recipient<Response, Streams> {
5670
let (response_sender, response_promise) = Promise::<Response>::new();
57-
self.commands.add(AddOperation::new(
71+
self.commands.add(AddImpulse::new(
5872
self.session,
5973
TakenResponse::<Response>::new(response_sender),
6074
));
61-
let (bundle, stream_receivers) = Streams::make_receiver(self.session, self.commands);
75+
let (bundle, stream_receivers) = Streams::take_streams(self.session, self.commands);
6276
self.commands.entity(self.source).insert(bundle);
6377

6478
Recipient {
@@ -67,6 +81,17 @@ where
6781
}
6882
}
6983

84+
/// Take only the response data that comes out of the request.
85+
#[must_use]
86+
pub fn take_response(self) -> Promise<Response> {
87+
let (response_sender, response_promise) = Promise::<Response>::new();
88+
self.commands.add(AddImpulse::new(
89+
self.session,
90+
TakenResponse::<Response>::new(response_sender),
91+
));
92+
response_promise
93+
}
94+
7095
/// Pass the outcome of the request to another provider.
7196
#[must_use]
7297
pub fn then<P: Provider<Request = Response>>(
@@ -75,7 +100,7 @@ where
75100
) -> Impulse<'w, 's, 'a, P::Response, P::Streams> {
76101
let source = self.session;
77102
let session = self.commands.spawn((
78-
ImpulseProperties::new(),
103+
Detached::default(),
79104
UnusedTarget,
80105
)).id();
81106

@@ -127,36 +152,75 @@ where
127152

128153
/// Store the response in a [`Storage`] component in the specified entity.
129154
///
155+
/// Each stream will be collected into [`Collection`] components in the
156+
/// specified entity, one for each stream type. To store the streams in a
157+
/// different entity, call [`Self::collect_streams`] before this.
158+
///
130159
/// If the entity despawns then the request gets cancelled unless you used
131160
/// [`Self::detach`] before calling this.
132161
pub fn store(self, target: Entity) {
133-
self.commands.add(AddOperation::new(
162+
self.commands.add(AddImpulse::new(
134163
self.session,
135164
StoreResponse::<Response>::new(target),
136165
));
166+
167+
let stream_targets = Streams::collect_streams(
168+
self.source, target, self.commands,
169+
);
170+
self.commands.entity(self.source).insert(stream_targets);
137171
}
138172

139-
/// Push the response to the back of a [`Storage<Vec<T>>`] component in an
173+
/// Collect the stream data into [`Collection<T>`] components in the
174+
/// specified target, one collection for each stream data type. You must
175+
/// still decide what to do with the final response data.
176+
#[must_use]
177+
pub fn collect_streams(self, target: Entity) -> Impulse<'w, 's, 'a, Response, ()> {
178+
let stream_targets = Streams::collect_streams(
179+
self.source, target, self.commands,
180+
);
181+
self.commands.entity(self.source).insert(stream_targets);
182+
183+
Impulse {
184+
source: self.source,
185+
session: self.session,
186+
commands: self.commands,
187+
_ignore: Default::default(),
188+
}
189+
}
190+
191+
/// Push the response to the back of a [`Collection<T>`] component in an
192+
/// entity.
193+
///
194+
/// Similar to [`Self::store`] this will also collect streams into this
140195
/// entity.
141196
///
142197
/// If the entity despawns then the request gets cancelled unless you used
143198
/// [`Self::detach`] before calling this.
144199
pub fn push(self, target: Entity) {
145-
self.commands.add(AddOperation::new(
200+
self.commands.add(AddImpulse::new(
146201
self.session,
147-
PushResponse::<Response>::new(target),
202+
PushResponse::<Response>::new(target, false),
148203
));
204+
205+
let stream_targets = Streams::collect_streams(
206+
self.source, target, self.commands,
207+
);
208+
self.commands.entity(self.source).insert(stream_targets);
149209
}
150210

151-
// TODO(@mxgrey): Offer an on_cancel method that lets users provide a
152-
// callback to be triggered when a cancellation happens.
211+
// TODO(@mxgrey): Consider offering ways for users to respond to cancellations.
212+
// For example, offer an on_cancel method that lets users provide a callback
213+
// to be triggered when a cancellation happens. Or focus on terminal impulses,
214+
// like offer store_or_else(~), push_or_else(~) etc which accept a callback
215+
// that will be triggered after a cancellation.
153216
}
154217

155218
impl<'w, 's, 'a, Response, Streams> Impulse<'w, 's, 'a, Response, Streams>
156219
where
157220
Response: Bundle,
158221
{
159-
/// Insert the response as a bundle in the specified entity.
222+
/// Insert the response as a bundle in the specified entity. Stream data
223+
/// will be dropped unless you use [`Self::collect_streams`] before this.
160224
///
161225
/// If the entity despawns then the request gets cancelled unless you used
162226
/// [`Self::detach`] before calling this.
@@ -173,9 +237,11 @@ impl<'w, 's, 'a, Response, Streams> Impulse<'w, 's, 'a, Response, Streams>
173237
where
174238
Response: Event,
175239
{
176-
/// Send the response out as an event once it is ready. Using this will also
177-
/// effectively [detach](Self::detach) the impulse.
178-
pub fn send(self) {
240+
/// Send the response out as an event once it is ready. Stream data will be
241+
/// dropped unless you use [`Self::collect_streams`] before this.
242+
///
243+
/// Using this will also effectively [detach](Self::detach) the impulse.
244+
pub fn send_event(self) {
179245

180246
}
181247
}
@@ -192,7 +258,10 @@ pub struct Storage<T> {
192258
pub session: Entity,
193259
}
194260

195-
/// Used to collect responses from multiple impulse chains into a container.
261+
/// Used to collect responses from multiple impulse chains into a container
262+
/// attached to an entity.
263+
//
264+
// TODO(@mxgrey): Consider allowing the user to choose the container type.
196265
#[derive(Component)]
197266
pub struct Collection<T> {
198267
/// The items that have been collected.

src/impulse/detach.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::{
19+
prelude::{Entity, Component, World},
20+
ecs::system::Command,
21+
};
22+
23+
use anyhow::anyhow;
24+
25+
use backtrace::Backtrace;
26+
27+
use crate::{MiscellaneousFailure, UnusedTarget, UnhandledErrors};
28+
29+
#[derive(Component)]
30+
pub(crate) struct Detached(bool);
31+
32+
impl Default for Detached {
33+
fn default() -> Self {
34+
Detached(false)
35+
}
36+
}
37+
38+
pub(crate) struct Detach {
39+
pub(crate) session: Entity,
40+
}
41+
42+
impl Command for Detach {
43+
fn apply(self, world: &mut World) {
44+
let backtrace;
45+
if let Some(mut session_mut) = world.get_entity_mut(self.session) {
46+
if let Some(mut detached) = session_mut.get_mut::<Detached>() {
47+
detached.0 = true;
48+
session_mut.remove::<UnusedTarget>();
49+
return;
50+
} else {
51+
// The session is missing the target properties that it's
52+
// supposed to have
53+
backtrace = Backtrace::new();
54+
}
55+
} else {
56+
// The session has despawned before we could manage to use it, or it
57+
// never existed in the first place.
58+
backtrace = Backtrace::new();
59+
}
60+
61+
let failure = MiscellaneousFailure {
62+
error: anyhow!("Unable to detach target {:?}", self.session),
63+
backtrace: Some(backtrace),
64+
};
65+
world.get_resource_or_insert_with(|| UnhandledErrors::default())
66+
.miscellaneous
67+
.push(failure);
68+
}
69+
}

src/impulse/insert.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::prelude::{Entity, Component};
19+
20+
use crate::{
21+
Impulsive, OperationSetup, OperationRequest, Storage,
22+
OperationResult, OrBroken, Input, ManageInput,
23+
Collection, InputBundle,
24+
add_lifecycle_dependency,
25+
};
26+
27+
#[derive(Component)]
28+
pub(crate) struct InsertResponse<T> {
29+
target: Entity,
30+
_ignore: std::marker::PhantomData<T>,
31+
}
32+
33+
impl<T> InsertResponse<T> {
34+
pub(crate) fn new(target: Entity) -> Self {
35+
Self { target, _ignore: Default::default() }
36+
}
37+
}
38+
39+
impl<T: 'static + Send + Sync> Impulsive for InsertResponse<T> {
40+
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
41+
world.entity_mut(source).insert((
42+
InputBundle::<T>::new(),
43+
self,
44+
));
45+
Ok(())
46+
}
47+
48+
fn execute(
49+
OperationRequest { source, world, roster }: OperationRequest,
50+
) -> OperationResult {
51+
let mut source_mut = world.get_entity_mut(source).or_broken()?;
52+
53+
}
54+
}

0 commit comments

Comments
 (0)