Skip to content

Commit f8dbfaf

Browse files
committed
Fixing implementation details of maps and tasks
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent d93c183 commit f8dbfaf

File tree

7 files changed

+164
-105
lines changed

7 files changed

+164
-105
lines changed

src/channel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl InnerChannel {
103103
}
104104
}
105105

106-
type ChannelItem = Box<dyn FnOnce(&mut World, &mut OperationRoster) + Send>;
106+
pub(crate) type ChannelItem = Box<dyn FnOnce(&mut World, &mut OperationRoster) + Send>;
107107

108108
#[derive(Resource)]
109109
pub(crate) struct ChannelQueue {

src/disposal.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ impl Disposal {
4747
ServiceUnavailable { service, for_node }.into()
4848
}
4949

50+
pub fn task_despawned(task: Entity, node: Entity) -> Disposal {
51+
TaskDespawned { task, node }.into()
52+
}
53+
5054
pub fn branching(
5155
branched_at_node: Entity,
5256
disposed_for_target: Entity,
@@ -85,6 +89,11 @@ pub enum DisposalCause {
8589
/// is the unavailable service.
8690
ServiceUnavailable(ServiceUnavailable),
8791

92+
/// An entity that was managing the execution of a task was despawned,
93+
/// causing the task to be cancelled and making it impossible to deliver a
94+
/// response.
95+
TaskDespawned(TaskDespawned),
96+
8897
/// An output was disposed because a mutex was poisoned.
8998
PoisonedMutex(PoisonedMutexDisposal),
9099

@@ -187,6 +196,21 @@ impl From<ServiceUnavailable> for DisposalCause {
187196
}
188197
}
189198

199+
/// A variant of [`DisposalCause`]
200+
#[derive(Debug)]
201+
pub struct TaskDespawned {
202+
/// The entity that was managing the task
203+
pub task: Entity,
204+
/// The node that the task was spawned by
205+
pub node: Entity,
206+
}
207+
208+
impl From<TaskDespawned> for DisposalCause {
209+
fn from(value: TaskDespawned) -> Self {
210+
Self::TaskDespawned(value)
211+
}
212+
}
213+
190214
/// A variant of [`DisposalCause`]
191215
#[derive(Debug)]
192216
pub struct PoisonedMutexDisposal {

src/flush.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use smallvec::SmallVec;
2626

2727
use crate::{
2828
ChannelQueue, WakeQueue, OperationRoster, ServiceHook, InputReady,
29-
Cancel, DroppedPromiseQueue, UnusedTarget, ServiceLifecycle, ServiceLifecycleChannel,
29+
Cancel, UnusedTarget, ServiceLifecycle, ServiceLifecycleChannel,
3030
OperationRequest,
3131
execute_operation, dispose_for_despawned_service,
3232
};

src/map.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@ pub trait AsMap<M> {
3636

3737
/// A trait that all different ways of defining a Blocking Map must funnel into.
3838
pub(crate) trait CallBlockingMap<Request, Response> {
39-
fn call(self, input: BlockingMap<Request>) -> Response;
39+
fn call(&mut self, input: BlockingMap<Request>) -> Response;
4040
}
4141

4242
impl<F, Request, Response> CallBlockingMap<Request, Response> for MapDef<F>
4343
where
44-
F: FnOnce(BlockingMap<Request>) -> Response + 'static + Send + Sync,
44+
F: FnMut(BlockingMap<Request>) -> Response + 'static + Send + Sync,
4545
Request: 'static + Send + Sync,
4646
Response: 'static + Send + Sync,
4747
{
48-
fn call(self, request: BlockingMap<Request>) -> Response {
48+
fn call(&mut self, request: BlockingMap<Request>) -> Response {
4949
(self.0)(request)
5050
}
5151
}
@@ -111,25 +111,25 @@ pub struct BlockingMapAdapter<F>(F);
111111

112112
impl<F, Request, Response> CallBlockingMap<Request, Response> for BlockingMapAdapter<F>
113113
where
114-
F: FnOnce(Request) -> Response,
114+
F: FnMut(Request) -> Response,
115115
{
116-
fn call(self, BlockingMap{ request }: BlockingMap<Request>) -> Response {
116+
fn call(&mut self, BlockingMap{ request }: BlockingMap<Request>) -> Response {
117117
(self.0)(request)
118118
}
119119
}
120120

121121
pub(crate) trait CallAsyncMap<Request, Task, Streams: StreamPack> {
122-
fn call(self, input: AsyncMap<Request, Streams>) -> Task;
122+
fn call(&mut self, input: AsyncMap<Request, Streams>) -> Task;
123123
}
124124

125125
impl<F, Request, Task, Streams> CallAsyncMap<Request, Task, Streams> for MapDef<F>
126126
where
127-
F: FnOnce(AsyncMap<Request, Streams>) -> Task + 'static + Send + Sync,
127+
F: FnMut(AsyncMap<Request, Streams>) -> Task + 'static + Send + Sync,
128128
Request: 'static + Send + Sync,
129129
Task: 'static + Send + Sync,
130130
Streams: StreamPack,
131131
{
132-
fn call(self, input: AsyncMap<Request, Streams>) -> Task {
132+
fn call(&mut self, input: AsyncMap<Request, Streams>) -> Task {
133133
(self.0)(input)
134134
}
135135
}
@@ -198,10 +198,10 @@ pub struct AsyncMapAdapter<F>(F);
198198

199199
impl<F, Request, Task> CallAsyncMap<Request, Task, ()> for AsyncMapAdapter<F>
200200
where
201-
F: FnOnce(Request) -> Task + 'static + Send + Sync,
201+
F: FnMut(Request) -> Task + 'static + Send + Sync,
202202
Task: Future + 'static + Send + Sync,
203203
{
204-
fn call(self, AsyncMap{ request, .. }: AsyncMap<Request, ()>) -> Task {
204+
fn call(&mut self, AsyncMap{ request, .. }: AsyncMap<Request, ()>) -> Task {
205205
(self.0)(request)
206206
}
207207
}

src/operation.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ pub(crate) use operate_service::*;
6161
mod operate_task;
6262
pub(crate) use operate_task::*;
6363

64-
mod receive;
65-
pub(crate) use receive::*;
66-
6764
mod scope;
6865
pub use scope::*;
6966

@@ -265,9 +262,6 @@ pub(crate) struct Blocker {
265262
pub(crate) serve_next: fn(Blocker, &mut World, &mut OperationRoster),
266263
}
267264

268-
#[derive(Component)]
269-
pub(crate) struct BlockerStorage(pub(crate) Option<Blocker>);
270-
271265
pub enum OperationError {
272266
Broken(Option<Backtrace>),
273267
NotReady,

src/operation/operate_map.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ where
5050
pub(crate) fn new(target: Entity, f: F) -> Self {
5151
Self {
5252
storage: BlockingMapStorage {
53-
f,
53+
f: Some(f),
5454
_ignore: Default::default(),
5555
},
5656
target: SingleTargetStorage::new(target),
@@ -60,7 +60,7 @@ where
6060

6161
#[derive(Component)]
6262
struct BlockingMapStorage<F, Request, Response> {
63-
f: F,
63+
f: Option<F>,
6464
_ignore: std::marker::PhantomData<(Request, Response)>,
6565
}
6666

@@ -87,11 +87,13 @@ where
8787
let mut source_mut = world.get_entity_mut(source).or_broken()?;
8888
let target = source_mut.get::<SingleTargetStorage>().or_broken()?.0;
8989
let Input { session, data: request } = source_mut.take_input::<Request>()?;
90-
let map = source_mut.take::<BlockingMapStorage<F, Request, Response>>().or_broken()?.f;
91-
let mut target_mut = world.get_entity_mut(target).or_broken()?;
90+
let mut map = source_mut.take::<BlockingMapStorage<F, Request, Response>>().or_broken()?;
91+
let mut f = map.f.take().or_broken()?;
9292

93-
let response = map.call(BlockingMap { request });
94-
target_mut.give_input(session, response, roster)?;
93+
let response = f.call(BlockingMap { request });
94+
map.f = Some(f);
95+
96+
world.get_entity_mut(target).or_broken()?.give_input(session, response, roster)?;
9597
Ok(())
9698
}
9799

@@ -130,7 +132,7 @@ where
130132
pub(crate) fn new(target: Entity, f: F) -> Self {
131133
Self {
132134
storage: AsyncMapStorage {
133-
f,
135+
f: Some(f),
134136
_ignore: Default::default(),
135137
},
136138
target: SingleTargetStorage::new(target),
@@ -140,7 +142,7 @@ where
140142

141143
#[derive(Component)]
142144
struct AsyncMapStorage<F, Request, Task, Streams> {
143-
f: F,
145+
f: Option<F>,
144146
_ignore: std::marker::PhantomData<(Request, Task, Streams)>,
145147
}
146148

@@ -171,15 +173,19 @@ where
171173
let mut source_mut = world.get_entity_mut(source).or_broken()?;
172174
let Input { session, data: request } = source_mut.take_input::<Request>()?;
173175
let target = source_mut.get::<SingleTargetStorage>().or_broken()?.0;
174-
let map = source_mut.take::<AsyncMapStorage<F, Request, Task, Streams>>().or_broken()?.f;
176+
let mut f = source_mut.get_mut::<AsyncMapStorage<F, Request, Task, Streams>>().or_broken()?
177+
.f.take().or_broken()?;
175178

176-
let channel = InnerChannel::new(source, session, sender);
179+
let channel = InnerChannel::new(source, session, sender.clone());
177180
let channel = channel.into_specific(&world)?;
178181

179-
let task = AsyncComputeTaskPool::get().spawn(map.call(AsyncMap { request, channel }));
182+
let task = AsyncComputeTaskPool::get().spawn(f.call(AsyncMap { request, channel }));
183+
world.get_entity_mut(source).or_broken()?
184+
.get_mut::<AsyncMapStorage<F, Request, Task, Streams>>().or_broken()?
185+
.f = Some(f);
180186

181187
let task_source = world.spawn(()).id();
182-
OperateTask::new(session, source, target, task, None)
188+
OperateTask::new(task_source, session, source, target, task, None, sender)
183189
.setup(OperationSetup { source: task_source, world });
184190
roster.queue(task_source);
185191
Ok(())

0 commit comments

Comments
 (0)