Skip to content

Commit eab1ab6

Browse files
committed
Fixed task cleanup race conditions
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 16b6ffa commit eab1ab6

File tree

5 files changed

+61
-14
lines changed

5 files changed

+61
-14
lines changed

src/flush.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{
3636
UnusedTarget, ServiceLifecycle, ServiceLifecycleChannel, MiscellaneousFailure,
3737
OperationRequest, ImpulseLifecycleChannel, AddImpulse, Finished, OperationCleanup,
3838
UnhandledErrors, UnusedTargetDrop, ValidateScopeReachability, OperationError,
39-
execute_operation, dispose_for_despawned_service,
39+
execute_operation, awaken_task, dispose_for_despawned_service,
4040
};
4141

4242
#[derive(Resource, Default)]
@@ -93,6 +93,11 @@ pub fn flush_impulses(
9393
garbage_cleanup(world, &mut roster);
9494
}
9595

96+
while let Some(source) = roster.awake.pop_front() {
97+
awaken_task(OperationRequest { source, world, roster: &mut roster });
98+
garbage_cleanup(world, &mut roster);
99+
}
100+
96101
collect_from_channels(new_service_query, world, &mut roster);
97102
}
98103
}
@@ -152,7 +157,7 @@ fn collect_from_channels(
152157
.receiver
153158
.try_iter()
154159
{
155-
roster.queue(wakeable);
160+
roster.awake(wakeable);
156161
}
157162

158163
let mut unused_targets_state: SystemState<Query<(Entity, &Detached), With<UnusedTarget>>> =

src/operation.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ pub(crate) struct UnusedTarget;
182182
pub struct OperationRoster {
183183
/// Operation sources that should be triggered
184184
pub(crate) queue: VecDeque<Entity>,
185+
/// Tasks that should be awoken. If the task is already despawned, then
186+
/// it should not be considered an error.
187+
pub(crate) awake: VecDeque<Entity>,
185188
/// Operation sources that should be cancelled
186189
pub(crate) cancel: VecDeque<Cancel>,
187190
/// Async services that should pull their next item
@@ -201,6 +204,10 @@ impl OperationRoster {
201204
self.queue.push_back(source);
202205
}
203206

207+
pub fn awake(&mut self, source: Entity) {
208+
self.awake.push_back(source);
209+
}
210+
204211
pub fn cancel(&mut self, source: Cancel) {
205212
self.cancel.push_back(source);
206213
}
@@ -218,8 +225,11 @@ impl OperationRoster {
218225
}
219226

220227
pub fn is_empty(&self) -> bool {
221-
self.queue.is_empty() && self.cancel.is_empty()
222-
&& self.unblock.is_empty() && self.disposed.is_empty()
228+
self.queue.is_empty()
229+
&& self.awake.is_empty()
230+
&& self.cancel.is_empty()
231+
&& self.unblock.is_empty()
232+
&& self.disposed.is_empty()
223233
&& self.cleanup_finished.is_empty()
224234
}
225235

@@ -626,6 +636,15 @@ pub fn execute_operation(request: OperationRequest) {
626636
operator(request);
627637
}
628638

639+
pub fn awaken_task(request: OperationRequest) {
640+
let Some(operator) = request.world.get::<OperationExecuteStorage>(request.source) else {
641+
// If the task is not available, we just accept that it has despawned.
642+
return;
643+
};
644+
let operator = operator.0;
645+
operator(request);
646+
}
647+
629648
fn perform_operation<Op: Operation>(
630649
OperationRequest { source, world, roster }: OperationRequest
631650
) {

src/operation/operate_task.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,8 @@ impl<Response: 'static + Send + Sync> OperateTask<Response> {
123123

124124
impl<Response: 'static + Send + Sync> Drop for OperateTask<Response> {
125125
fn drop(&mut self) {
126-
println!(" ==== DROPPING TASK: {:?}:\n{:?}", self.source, backtrace::Backtrace::new());
127126
if self.finished_normally {
128127
// The task finished normally so no special action needs to be taken
129-
dbg!(self.source);
130128
return;
131129
}
132130

@@ -187,8 +185,20 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
187185
fn execute(
188186
OperationRequest { source, world, roster }: OperationRequest
189187
) -> OperationResult {
190-
let mut source_mut = world.get_entity_mut(source).or_broken()?;
191-
let mut operation = source_mut.get_mut::<OperateTask<Response>>().or_broken()?;
188+
// It's possible for a task to get into the roster after it has despawned
189+
// so we'll just exit early when that happens. However this should not
190+
// actually be possible because the OperationExecuteStorage must still be
191+
// accessible in order to be inside this function.
192+
let mut source_mut = world.get_entity_mut(source).or_not_ready()?;
193+
// If the task has been stopped / cancelled then OperateTask will have
194+
// been removed, even if it has not despawned yet.
195+
let mut operation = source_mut.get_mut::<OperateTask<Response>>().or_not_ready()?;
196+
if operation.being_cleaned {
197+
// The operation is being cleaned up, so the task will not be
198+
// available and there will be nothing for us to do here. We should
199+
// simply return immediately.
200+
return Ok(());
201+
}
192202
let mut task = operation.task.take().or_broken()?;
193203
let target = operation.target;
194204
let session = operation.session;
@@ -215,8 +225,9 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
215225
) {
216226
Poll::Ready(result) => {
217227
// Task has finished
218-
world.get_mut::<OperateTask<Response>>(source).or_broken()?.finished_normally = true;
228+
println!(" == READY: {source:?}");
219229
let r = world.entity_mut(target).give_input(session, result, roster);
230+
world.get_mut::<OperateTask<Response>>(source).or_broken()?.finished_normally = true;
220231
cleanup_task::<Response>(session, source, node, unblock, being_cleaned, world, roster);
221232
r?;
222233
}
@@ -264,7 +275,9 @@ impl<Response: 'static + Send + Sync> Operation for OperateTask<Response> {
264275
dbg!(source, node);
265276
AsyncComputeTaskPool::get().spawn(async move {
266277
if let Some(task) = task {
278+
dbg!(source, node);
267279
task.cancel().await;
280+
dbg!(source, node);
268281
}
269282
if let Err(err) = sender.send(Box::new(move |world: &mut World, roster: &mut OperationRoster| {
270283
cleanup_task::<Response>(session, source, node, unblock, true, world, roster);
@@ -316,6 +329,7 @@ fn cleanup_task<Response>(
316329
if being_cleaned && cleanup_ready {
317330
// We are notifying about the cleanup on behalf of the node that
318331
// created this task, so we set initialize as source: node
332+
dbg!(source, node);
319333
let mut cleanup = OperationCleanup {
320334
source: node, session, world, roster
321335
};
@@ -331,6 +345,8 @@ fn cleanup_task<Response>(
331345
if world.get_entity(source).is_some() {
332346
world.despawn(source);
333347
}
348+
349+
roster.purge(source);
334350
}
335351

336352
#[derive(Component, Clone, Copy)]

src/testing.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub use bevy::{
2424
ecs::system::CommandQueue,
2525
};
2626

27-
pub use std::time::Duration;
27+
pub use std::time::{Duration, Instant};
2828

2929
use crate::{
3030
Promise, Service, InAsyncService, InBlockingService, UnhandledErrors,
@@ -262,8 +262,15 @@ pub struct WaitRequest<Value> {
262262
#[cfg(test)]
263263
pub async fn wait<Value>(request: WaitRequest<Value>) -> Value {
264264
use async_std::future;
265-
let never = future::pending::<()>();
266-
let _ = future::timeout(request.duration, never);
265+
let start = Instant::now();
266+
let mut elapsed = start.elapsed();
267+
while elapsed < request.duration {
268+
let never = future::pending::<()>();
269+
let timeout = request.duration - elapsed;
270+
dbg!(request.duration, elapsed, timeout);
271+
let _ = future::timeout(timeout, never).await;
272+
elapsed = start.elapsed();
273+
}
267274
request.value
268275
}
269276

src/workflow.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,14 +343,14 @@ mod tests {
343343
|chain: Chain<f64>| chain
344344
.map_block(|t| WaitRequest { duration: Duration::from_secs_f64(10.0*t), value: 10.0*t })
345345
.map(|r: AsyncMap<WaitRequest<f64>>| {
346-
dbg!(r.source);
346+
dbg!(r.source, r.request.value);
347347
wait(r.request)
348348
})
349349
.connect(scope.terminate),
350350
|chain: Chain<f64>| chain
351351
.map_block(|t| WaitRequest { duration: Duration::from_secs_f64(t/100.0), value: t/100.0 })
352352
.map(|r: AsyncMap<WaitRequest<f64>>| {
353-
dbg!(r.source);
353+
dbg!(r.source, r.request.value);
354354
wait(r.request)
355355
})
356356
.connect(scope.terminate),

0 commit comments

Comments
 (0)