Skip to content

Commit d93c183

Browse files
committed
Finished implementation of impulses
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent f91972a commit d93c183

File tree

8 files changed

+111
-151
lines changed

8 files changed

+111
-151
lines changed

src/impulse.rs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ pub(crate) use internal::*;
3737
mod push;
3838
pub(crate) use push::*;
3939

40+
mod send_event;
41+
pub(crate) use send_event::*;
42+
4043
mod store;
4144
pub(crate) use store::*;
4245

@@ -46,7 +49,7 @@ pub(crate) use taken::*;
4649
/// Impulses can be chained as a simple sequence of [providers](Provider).
4750
pub struct Impulse<'w, 's, 'a, Response, Streams> {
4851
pub(crate) source: Entity,
49-
pub(crate) session: Entity,
52+
pub(crate) target: Entity,
5053
pub(crate) commands: &'a mut Commands<'w, 's>,
5154
pub(crate) _ignore: std::marker::PhantomData<(Response, Streams)>
5255
}
@@ -59,7 +62,7 @@ where
5962
/// Keep carrying out the impulse chain up to here even if a downstream
6063
/// dependent was dropped.
6164
pub fn detach(self) -> Impulse<'w, 's, 'a, Response, Streams> {
62-
self.commands.add(Detach { session: self.session });
65+
self.commands.add(Detach { session: self.target });
6366
self
6467
}
6568

@@ -69,10 +72,10 @@ where
6972
pub fn take(self) -> Recipient<Response, Streams> {
7073
let (response_sender, response_promise) = Promise::<Response>::new();
7174
self.commands.add(AddImpulse::new(
72-
self.session,
75+
self.target,
7376
TakenResponse::<Response>::new(response_sender),
7477
));
75-
let (bundle, stream_receivers) = Streams::take_streams(self.session, self.commands);
78+
let (bundle, stream_receivers) = Streams::take_streams(self.target, self.commands);
7679
self.commands.entity(self.source).insert(bundle);
7780

7881
Recipient {
@@ -86,7 +89,7 @@ where
8689
pub fn take_response(self) -> Promise<Response> {
8790
let (response_sender, response_promise) = Promise::<Response>::new();
8891
self.commands.add(AddImpulse::new(
89-
self.session,
92+
self.target,
9093
TakenResponse::<Response>::new(response_sender),
9194
));
9295
response_promise
@@ -98,7 +101,7 @@ where
98101
self,
99102
provider: P,
100103
) -> Impulse<'w, 's, 'a, P::Response, P::Streams> {
101-
let source = self.session;
104+
let source = self.target;
102105
let session = self.commands.spawn((
103106
Detached::default(),
104107
UnusedTarget,
@@ -110,7 +113,7 @@ where
110113
provider.connect(source, session, self.commands);
111114
Impulse {
112115
source,
113-
session,
116+
target: session,
114117
commands: self.commands,
115118
_ignore: Default::default(),
116119
}
@@ -135,7 +138,7 @@ where
135138

136139
/// Apply a one-time callback whose output is a [`Future`] that will be run
137140
/// in the [`AsyncComputeTaskPool`][1]. The output of the [`Future`] will be
138-
/// the Response of the returned target.
141+
/// the Response of the returned Impulse.
139142
///
140143
/// [1]: bevy::tasks::AsyncComputeTaskPool
141144
#[must_use]
@@ -160,8 +163,8 @@ where
160163
/// [`Self::detach`] before calling this.
161164
pub fn store(self, target: Entity) {
162165
self.commands.add(AddImpulse::new(
163-
self.session,
164-
StoreResponse::<Response>::new(target),
166+
self.target,
167+
Store::<Response>::new(target),
165168
));
166169

167170
let stream_targets = Streams::collect_streams(
@@ -182,7 +185,7 @@ where
182185

183186
Impulse {
184187
source: self.source,
185-
session: self.session,
188+
target: self.target,
186189
commands: self.commands,
187190
_ignore: Default::default(),
188191
}
@@ -198,8 +201,8 @@ where
198201
/// [`Self::detach`] before calling this.
199202
pub fn push(self, target: Entity) {
200203
self.commands.add(AddImpulse::new(
201-
self.session,
202-
PushResponse::<Response>::new(target, false),
204+
self.target,
205+
Push::<Response>::new(target, false),
203206
));
204207

205208
let stream_targets = Streams::collect_streams(
@@ -229,7 +232,10 @@ where
229232
/// [`Self::store`] or [`Self::append`]. Alternatively you can transform it
230233
/// into a bundle using [`Self::map_block`] or [`Self::map_async`].
231234
pub fn insert(self, target: Entity) {
232-
235+
self.commands.add(AddImpulse::new(
236+
self.target,
237+
Insert::<Response>::new(target),
238+
));
233239
}
234240
}
235241

@@ -242,7 +248,10 @@ where
242248
///
243249
/// Using this will also effectively [detach](Self::detach) the impulse.
244250
pub fn send_event(self) {
245-
251+
self.commands.add(AddImpulse::new(
252+
self.target,
253+
SendEvent::<Response>::new(),
254+
));
246255
}
247256
}
248257

src/impulse/insert.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,29 @@
1515
*
1616
*/
1717

18-
use bevy::prelude::{Entity, Component};
18+
use bevy::prelude::{Entity, Component, Bundle, DespawnRecursiveExt};
1919

2020
use crate::{
21-
Impulsive, OperationSetup, OperationRequest, Storage,
22-
OperationResult, OrBroken, Input, ManageInput,
23-
Collection, InputBundle,
21+
Impulsive, OperationSetup, OperationRequest, OperationResult, OrBroken,
22+
Input, ManageInput, InputBundle,
2423
add_lifecycle_dependency,
2524
};
2625

2726
#[derive(Component)]
28-
pub(crate) struct InsertResponse<T> {
27+
pub(crate) struct Insert<T> {
2928
target: Entity,
3029
_ignore: std::marker::PhantomData<T>,
3130
}
3231

33-
impl<T> InsertResponse<T> {
32+
impl<T> Insert<T> {
3433
pub(crate) fn new(target: Entity) -> Self {
3534
Self { target, _ignore: Default::default() }
3635
}
3736
}
3837

39-
impl<T: 'static + Send + Sync> Impulsive for InsertResponse<T> {
38+
impl<T: 'static + Send + Sync + Bundle> Impulsive for Insert<T> {
4039
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
40+
add_lifecycle_dependency(source, self.target, world);
4141
world.entity_mut(source).insert((
4242
InputBundle::<T>::new(),
4343
self,
@@ -46,9 +46,15 @@ impl<T: 'static + Send + Sync> Impulsive for InsertResponse<T> {
4646
}
4747

4848
fn execute(
49-
OperationRequest { source, world, roster }: OperationRequest,
49+
OperationRequest { source, world, .. }: OperationRequest,
5050
) -> OperationResult {
5151
let mut source_mut = world.get_entity_mut(source).or_broken()?;
52-
52+
let Input { data, .. } = source_mut.take_input::<T>()?;
53+
let target = source_mut.get::<Insert<T>>().or_broken()?.target;
54+
if let Some(mut target_mut) = world.get_entity_mut(target) {
55+
target_mut.insert(data);
56+
}
57+
world.entity_mut(source).despawn_recursive();
58+
Ok(())
5359
}
5460
}

src/impulse/push.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,25 @@ use crate::{
2626

2727

2828
#[derive(Component)]
29-
pub(crate) struct PushResponse<Response> {
29+
pub(crate) struct Push<T> {
3030
target: Entity,
3131
is_stream: bool,
32-
_ignore: std::marker::PhantomData<Response>,
32+
_ignore: std::marker::PhantomData<T>,
3333
}
3434

35-
impl<Response> PushResponse<Response> {
35+
impl<T> Push<T> {
3636
pub(crate) fn new(target: Entity, is_stream: bool) -> Self {
3737
Self { target, is_stream, _ignore: Default::default() }
3838
}
3939
}
4040

41-
impl<Response: 'static + Send + Sync> Impulsive for PushResponse<Response> {
41+
impl<T: 'static + Send + Sync> Impulsive for Push<T> {
4242
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
4343
if !self.is_stream {
4444
add_lifecycle_dependency(source, self.target, world);
4545
}
4646
world.entity_mut(source).insert((
47-
InputBundle::<Response>::new(),
47+
InputBundle::<T>::new(),
4848
self
4949
));
5050
Ok(())
@@ -54,10 +54,10 @@ impl<Response: 'static + Send + Sync> Impulsive for PushResponse<Response> {
5454
OperationRequest { source, world, .. }: OperationRequest,
5555
) -> OperationResult {
5656
let mut source_mut = world.get_entity_mut(source).or_broken()?;
57-
let Input { session, data } = source_mut.take_input::<Response>()?;
58-
let target = source_mut.get::<PushResponse<Response>>().or_broken()?.target;
57+
let Input { session, data } = source_mut.take_input::<T>()?;
58+
let target = source_mut.get::<Push<T>>().or_broken()?.target;
5959
let mut target_mut = world.get_entity_mut(target).or_broken()?;
60-
if let Some(mut collection) = target_mut.get_mut::<Collection<Response>>() {
60+
if let Some(mut collection) = target_mut.get_mut::<Collection<T>>() {
6161
collection.items.push(Storage { session, data });
6262
} else {
6363
let mut collection = Collection::default();

src/impulse/send_event.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::prelude::{DespawnRecursiveExt, Event};
19+
20+
use crate::{
21+
Impulsive, OperationSetup, OperationRequest, OperationResult, OrBroken,
22+
Input, ManageInput, InputBundle,
23+
};
24+
25+
pub(crate) struct SendEvent<T> {
26+
_ignore: std::marker::PhantomData<T>,
27+
}
28+
29+
impl<T> SendEvent<T> {
30+
pub(crate) fn new() -> Self {
31+
Self { _ignore: Default::default() }
32+
}
33+
}
34+
35+
impl<T: 'static + Send + Sync + Event> Impulsive for SendEvent<T> {
36+
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
37+
world.entity_mut(source).insert(InputBundle::<T>::new());
38+
Ok(())
39+
}
40+
41+
fn execute(
42+
OperationRequest { source, world, .. }: OperationRequest,
43+
) -> OperationResult {
44+
let mut source_mut = world.get_entity_mut(source).or_broken()?;
45+
let Input { data, .. } = source_mut.take_input::<T>()?;
46+
source_mut.despawn_recursive();
47+
world.send_event(data);
48+
Ok(())
49+
}
50+
}

src/impulse/store.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,22 @@ use crate::{
2424
};
2525

2626
#[derive(Component)]
27-
pub(crate) struct StoreResponse<Response> {
27+
pub(crate) struct Store<T> {
2828
target: Entity,
29-
_ignore: std::marker::PhantomData<Response>,
29+
_ignore: std::marker::PhantomData<T>,
3030
}
3131

32-
impl<Response> StoreResponse<Response> {
32+
impl<T> Store<T> {
3333
pub(crate) fn new(target: Entity) -> Self {
3434
Self { target, _ignore: Default::default() }
3535
}
3636
}
3737

38-
impl<Response: 'static + Send + Sync> Impulsive for StoreResponse<Response> {
38+
impl<T: 'static + Send + Sync> Impulsive for Store<T> {
3939
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
4040
add_lifecycle_dependency(source, self.target, world);
4141
world.entity_mut(source).insert((
42-
InputBundle::<Response>::new(),
42+
InputBundle::<T>::new(),
4343
self
4444
));
4545
Ok(())
@@ -49,9 +49,11 @@ impl<Response: 'static + Send + Sync> Impulsive for StoreResponse<Response> {
4949
OperationRequest { source, world, .. }: OperationRequest,
5050
) -> OperationResult {
5151
let mut source_mut = world.get_entity_mut(source).or_broken()?;
52-
let Input { session, data } = source_mut.take_input::<Response>()?;
53-
let target = source_mut.get::<StoreResponse<Response>>().or_broken()?.target;
54-
world.get_entity_mut(target).or_broken()?.insert(Storage { data, session });
52+
let Input { session, data } = source_mut.take_input::<T>()?;
53+
let target = source_mut.get::<Store<T>>().or_broken()?.target;
54+
if let Some(mut target_mut) = world.get_entity_mut(target) {
55+
target_mut.insert(Storage { data, session });
56+
}
5557
world.entity_mut(source).despawn_recursive();
5658
Ok(())
5759
}

0 commit comments

Comments
 (0)