Skip to content

Commit 1b7417d

Browse files
committed
Introduce ProvideOnce and support one-shot maps
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent f8dbfaf commit 1b7417d

File tree

14 files changed

+506
-139
lines changed

14 files changed

+506
-139
lines changed

src/chain.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
184184
Chain::new(source, target, self.commands)
185185
}
186186

187-
/// Apply a one-time callback whose input is a [`BlockingMap`](crate::BlockingMap)
187+
/// Apply a one-time map whose input is a [`BlockingMap`](crate::BlockingMap)
188188
/// or an [`AsyncMap`](crate::AsyncMap).
189189
pub fn map<M, F: AsMap<M>>(
190190
self,
@@ -198,28 +198,28 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
198198
self.then(f.as_map())
199199
}
200200

201-
/// Apply a one-time callback whose input is the Response of the current
202-
/// Chain. The output of the map will be the Response of the returned Chain.
201+
/// Apply a map whose input is the Response of the current Chain. The
202+
/// output of the map will be the Response of the returned Chain.
203203
///
204204
/// This takes in a regular blocking function rather than an async function,
205205
/// so while the function is executing, it will block all systems from
206206
/// running, similar to how [`Commands`] are flushed.
207207
pub fn map_block<U>(
208208
self,
209-
f: impl FnOnce(Response) -> U + 'static + Send + Sync,
209+
f: impl FnMut(Response) -> U + 'static + Send + Sync,
210210
) -> Chain<'w, 's, 'a, U, (), ModifiersUnset>
211211
where
212212
U: 'static + Send + Sync,
213213
{
214214
self.then(f.into_blocking_map())
215215
}
216216

217-
/// Apply a one-time callback whose output is a Future that will be run in
218-
/// the [`AsyncComputeTaskPool`](bevy::tasks::AsyncComputeTaskPool). The
217+
/// Apply a map whose output is a Future that will be run in the
218+
/// [`AsyncComputeTaskPool`](bevy::tasks::AsyncComputeTaskPool). The
219219
/// output of the Future will be the Response of the returned Chain.
220220
pub fn map_async<Task>(
221221
self,
222-
f: impl FnOnce(Response) -> Task + 'static + Send + Sync,
222+
f: impl FnMut(Response) -> Task + 'static + Send + Sync,
223223
) -> Chain<'w, 's, 'a, Task::Output, (), ModifiersUnset>
224224
where
225225
Task: Future + 'static + Send + Sync,
@@ -266,7 +266,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
266266

267267
/// When the response is delivered, we will make a clone of it and
268268
/// simultaneously pass that clone along two different impulse chains: one
269-
/// determined by the `build` callback provided to this function and the
269+
/// determined by the `build` map provided to this function and the
270270
/// other determined by the [`Chain`] that gets returned by this function.
271271
///
272272
/// This can only be applied when `Response` can be cloned.

src/handler.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::{
1919
BlockingHandler, AsyncHandler, Channel, InnerChannel, ChannelQueue,
20-
OperationRoster, StreamPack, Input, Provider,
20+
OperationRoster, StreamPack, Input, Provider, ProvideOnce,
2121
AddOperation, OperateHandler, ManageInput, OperationError,
2222
OrBroken, OperateTask, Operation, OperationSetup,
2323
};
@@ -105,9 +105,10 @@ impl<'a> HandleRequest<'a> {
105105
where
106106
Task::Output: Send + Sync,
107107
{
108+
let sender = self.world.get_resource_or_insert_with(|| ChannelQueue::new()).sender.clone();
108109
let task = AsyncComputeTaskPool::get().spawn(task);
109110
let task_id = self.world.spawn(()).id();
110-
OperateTask::new(session, self.source, self.target, task, None)
111+
OperateTask::new(task_id, session, self.source, self.target, task, None, sender)
111112
.setup(OperationSetup { source: task_id, world: self.world });
112113
self.roster.queue(task_id);
113114
Ok(())
@@ -393,7 +394,7 @@ where
393394
}
394395
}
395396

396-
impl<Request, Response, Streams> Provider for Handler<Request, Response, Streams>
397+
impl<Request, Response, Streams> ProvideOnce for Handler<Request, Response, Streams>
397398
where
398399
Request: 'static + Send + Sync,
399400
Response: 'static + Send + Sync,
@@ -407,3 +408,12 @@ where
407408
commands.add(AddOperation::new(source, OperateHandler::new(self, target)));
408409
}
409410
}
411+
412+
impl<Request, Response, Streams> Provider for Handler<Request, Response, Streams>
413+
where
414+
Request: 'static + Send + Sync,
415+
Response: 'static + Send + Sync,
416+
Streams: StreamPack,
417+
{
418+
419+
}

src/impulse.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use bevy::prelude::{
2222
use std::future::Future;
2323

2424
use crate::{
25-
Promise, Provider, StreamPack, IntoBlockingMap, IntoAsyncMap, UnusedTarget,
25+
Promise, ProvideOnce, StreamPack, IntoBlockingMapOnce, IntoAsyncMapOnce,
26+
AsMapOnce, UnusedTarget,
2627
};
2728

2829
mod detach;
@@ -34,6 +35,9 @@ pub(crate) use insert::*;
3435
mod internal;
3536
pub(crate) use internal::*;
3637

38+
mod map;
39+
pub(crate) use map::*;
40+
3741
mod push;
3842
pub(crate) use push::*;
3943

@@ -97,28 +101,29 @@ where
97101

98102
/// Pass the outcome of the request to another provider.
99103
#[must_use]
100-
pub fn then<P: Provider<Request = Response>>(
104+
pub fn then<P: ProvideOnce<Request = Response>>(
101105
self,
102106
provider: P,
103107
) -> Impulse<'w, 's, 'a, P::Response, P::Streams> {
104108
let source = self.target;
105-
let session = self.commands.spawn((
109+
let target = self.commands.spawn((
106110
Detached::default(),
107111
UnusedTarget,
108112
)).id();
109113

110114
// We should automatically delete the previous step in the chain once
111115
// this one is finished.
112-
self.commands.entity(source).set_parent(session);
113-
provider.connect(source, session, self.commands);
116+
self.commands.entity(source).set_parent(target);
117+
provider.connect(source, target, self.commands);
114118
Impulse {
115119
source,
116-
target: session,
120+
target,
117121
commands: self.commands,
118122
_ignore: Default::default(),
119123
}
120124
}
121125

126+
122127
/// Apply a one-time callback whose input is the Response of the current
123128
/// target. The output of the map will become the Response of the returned
124129
/// target.
@@ -133,7 +138,7 @@ where
133138
where
134139
U: 'static + Send + Sync,
135140
{
136-
self.then(f.into_blocking_map())
141+
self.then(f.into_blocking_map_once())
137142
}
138143

139144
/// Apply a one-time callback whose output is a [`Future`] that will be run
@@ -150,7 +155,25 @@ where
150155
Task: Future + 'static + Send + Sync,
151156
Task::Output: 'static + Send + Sync,
152157
{
153-
self.then(f.into_async_map())
158+
self.then(f.into_async_map_once())
159+
}
160+
161+
/// Apply a one-time map that implements one of
162+
/// - [`FnOnce(BlockingMap<Request, Streams>) -> Response`](crate::BlockingMap)
163+
/// - [`FnOnce(AsyncMap<Request, Streams>) -> impl Future<Response>`](crate::AsyncMap)
164+
///
165+
/// If you do not care about providing streams then you can use
166+
/// [`Self::map_block`] or [`Self::map_async`] instead.
167+
pub fn map<M, F: AsMapOnce<M>>(
168+
self,
169+
f: F,
170+
) -> Impulse<'w, 's, 'a, <F::MapType as ProvideOnce>::Response, <F::MapType as ProvideOnce>::Streams>
171+
where
172+
F::MapType: ProvideOnce<Request = Response>,
173+
<F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
174+
<F::MapType as ProvideOnce>::Streams: StreamPack,
175+
{
176+
self.then(f.as_map_once())
154177
}
155178

156179
/// Store the response in a [`Storage`] component in the specified entity.

src/impulse/map.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::{
19+
prelude::{Entity, Component, Bundle},
20+
tasks::AsyncComputeTaskPool,
21+
};
22+
23+
use std::future::Future;
24+
25+
use crate::{
26+
Impulsive, OperationSetup, OperationRequest, SingleTargetStorage, StreamPack,
27+
InputBundle, OperationResult, OrBroken, Input, ManageInput,
28+
ChannelQueue, BlockingMap, AsyncMap, InnerChannel, OperateTask, ActiveTasksStorage,
29+
CallBlockingMapOnce, CallAsyncMapOnce, Operation,
30+
};
31+
32+
/// The key difference between this and [`crate::OperateBlockingMap`] is that
33+
/// this supports FnOnce since it's used for impulse chains which are not
34+
/// reusable, whereas [`crate::OperateBlockingMap`] is used in workflows which
35+
/// need to be reusable, so it can only support FnMut.
36+
#[derive(Bundle)]
37+
pub(crate) struct ImpulseBlockingMap<F, Request, Response>
38+
where
39+
F: 'static + Send + Sync,
40+
Request: 'static + Send + Sync,
41+
Response: 'static + Send + Sync,
42+
{
43+
f: BlockingMapOnceStorage<F>,
44+
target: SingleTargetStorage,
45+
#[bundle(ignore)]
46+
_ignore: std::marker::PhantomData<(Request, Response)>,
47+
}
48+
49+
impl<F, Request, Response> ImpulseBlockingMap<F, Request, Response>
50+
where
51+
F: 'static + Send + Sync,
52+
Request: 'static + Send + Sync,
53+
Response: 'static + Send + Sync,
54+
{
55+
pub(crate) fn new(target: Entity, f: F) -> Self {
56+
Self {
57+
f: BlockingMapOnceStorage { f },
58+
target: SingleTargetStorage::new(target),
59+
_ignore: Default::default(),
60+
}
61+
}
62+
}
63+
64+
#[derive(Component)]
65+
struct BlockingMapOnceStorage<F> {
66+
f: F,
67+
}
68+
69+
impl<F, Request, Response> Impulsive for ImpulseBlockingMap<F, Request, Response>
70+
where
71+
Request: 'static + Send + Sync,
72+
Response: 'static + Send + Sync,
73+
F: CallBlockingMapOnce<Request, Response> + 'static + Send + Sync,
74+
{
75+
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
76+
world.entity_mut(source).insert((
77+
self,
78+
InputBundle::<Request>::new(),
79+
));
80+
Ok(())
81+
}
82+
83+
fn execute(
84+
OperationRequest { source, world, roster }: OperationRequest,
85+
) -> OperationResult {
86+
let mut source_mut = world.get_entity_mut(source).or_broken()?;
87+
let target = source_mut.get::<SingleTargetStorage>().or_broken()?.get();
88+
let Input { session, data: request } = source_mut.take_input::<Request>()?;
89+
let f = source_mut.take::<BlockingMapOnceStorage<F>>().or_broken()?.f;
90+
91+
let response = f.call(BlockingMap { request });
92+
93+
world.get_entity_mut(target).or_broken()?.give_input(session, response, roster)?;
94+
Ok(())
95+
}
96+
}
97+
98+
99+
// impl
100+
101+
/// The key difference between this and [`crate::OperateAsyncMap`] is that
102+
/// this supports FnOnce since it's used for impulse chains which are not
103+
/// reusable, whereas [`crate::OperateAsyncMap`] is used in workflows which
104+
/// need to be reusable, so it can only support FnMut.
105+
#[derive(Bundle)]
106+
pub(crate) struct ImpulseAsyncMap<F, Request, Task, Streams>
107+
where
108+
F: 'static + Send + Sync,
109+
Request: 'static + Send + Sync,
110+
Task: 'static + Send + Sync,
111+
Streams: 'static + Send + Sync,
112+
{
113+
f: AsyncMapOnceStorage<F>,
114+
target: SingleTargetStorage,
115+
#[bundle(ignore)]
116+
_ignore: std::marker::PhantomData<(Request, Task, Streams)>,
117+
}
118+
119+
impl<F, Request, Task, Streams> ImpulseAsyncMap<F, Request, Task, Streams>
120+
where
121+
F: 'static + Send + Sync,
122+
Request: 'static + Send + Sync,
123+
Task: 'static + Send + Sync,
124+
Streams: 'static + Send + Sync,
125+
{
126+
pub(crate) fn new(target: Entity, f: F) -> Self {
127+
Self {
128+
f: AsyncMapOnceStorage { f },
129+
target: SingleTargetStorage::new(target),
130+
_ignore: Default::default(),
131+
}
132+
}
133+
}
134+
135+
#[derive(Component)]
136+
struct AsyncMapOnceStorage<F> {
137+
f: F,
138+
}
139+
140+
impl<F, Request, Task, Streams> Impulsive for ImpulseAsyncMap<F, Request, Task, Streams>
141+
where
142+
Request: 'static + Send + Sync,
143+
Task: Future + 'static + Send + Sync,
144+
Task::Output: 'static + Send + Sync,
145+
Streams: StreamPack,
146+
F: CallAsyncMapOnce<Request, Task, Streams> + 'static + Send + Sync,
147+
{
148+
fn setup(self, OperationSetup { source, world }: OperationSetup) -> OperationResult {
149+
world.entity_mut(source).insert((
150+
self,
151+
InputBundle::<Request>::new(),
152+
ActiveTasksStorage::default(),
153+
));
154+
Ok(())
155+
}
156+
157+
fn execute(
158+
OperationRequest { source, world, roster }: OperationRequest,
159+
) -> OperationResult {
160+
let sender = world.get_resource_or_insert_with(|| ChannelQueue::new()).sender.clone();
161+
let mut source_mut = world.get_entity_mut(source).or_broken()?;
162+
let Input { session, data: request } = source_mut.take_input::<Request>()?;
163+
let target = source_mut.get::<SingleTargetStorage>().or_broken()?.get();
164+
let f = source_mut.take::<AsyncMapOnceStorage<F>>().or_broken()?.f;
165+
166+
let channel = InnerChannel::new(source, session, sender.clone());
167+
let channel = channel.into_specific(&world)?;
168+
169+
let task = AsyncComputeTaskPool::get().spawn(f.call(AsyncMap { request, channel }));
170+
171+
let task_source = world.spawn(()).id();
172+
OperateTask::new(task_source, session, source, target, task, None, sender)
173+
.setup(OperationSetup { source: task_source, world });
174+
roster.queue(task_source);
175+
Ok(())
176+
}
177+
}

src/impulse/push.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use crate::{
2424
add_lifecycle_dependency,
2525
};
2626

27-
2827
#[derive(Component)]
2928
pub(crate) struct Push<T> {
3029
target: Entity,

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ pub use input::*;
4848
pub mod map;
4949
pub use map::*;
5050

51+
pub mod map_once;
52+
pub use map_once::*;
53+
5154
pub mod node;
5255
pub use node::*;
5356

0 commit comments

Comments
 (0)