Skip to content

Commit 23354cd

Browse files
committed
Fix unreachability detection for streams out of blocking nodes
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 128b509 commit 23354cd

File tree

12 files changed

+163
-34
lines changed

12 files changed

+163
-34
lines changed

src/callback.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
BlockingCallback, AsyncCallback, Channel, ChannelQueue,
2020
OperationRoster, StreamPack, Input, Provider, ProvideOnce,
2121
AddOperation, OperateCallback, ManageInput, OperationError,
22-
OrBroken, OperateTask,
22+
OrBroken, OperateTask, UnusedStreams, ManageDisposal,
2323
};
2424

2525
use bevy::{
@@ -115,10 +115,17 @@ impl<'a> CallbackRequest<'a> {
115115
&mut self,
116116
session: Entity,
117117
response: Response,
118+
unused_streams: UnusedStreams,
118119
) -> Result<(), OperationError> {
120+
if !unused_streams.streams.is_empty() {
121+
self.world.get_entity_mut(self.source).or_broken()?
122+
.emit_disposal(session, unused_streams.into(), self.roster);
123+
}
124+
119125
self.world
120126
.get_entity_mut(self.target).or_broken()?
121127
.give_input(session, response, self.roster)?;
128+
122129
Ok(())
123130
}
124131

@@ -200,9 +207,12 @@ where
200207
}, input.world);
201208
self.system.apply_deferred(&mut input.world);
202209

203-
Streams::process_buffer(streams, input.source, session, input.world, input.roster)?;
210+
let mut unused_streams = UnusedStreams::new(input.source);
211+
Streams::process_buffer(
212+
streams, input.source, session, &mut unused_streams, input.world, input.roster
213+
)?;
204214

205-
input.give_response(session, response)
215+
input.give_response(session, response, unused_streams)
206216
}
207217
}
208218

@@ -321,8 +331,12 @@ where
321331
let response = (self)(BlockingCallback {
322332
request, streams: streams.clone(), source: input.source, session,
323333
});
324-
Streams::process_buffer(streams, input.source, session, input.world, input.roster)?;
325-
input.give_response(session, response)
334+
335+
let mut unused_streams = UnusedStreams::new(input.source);
336+
Streams::process_buffer(
337+
streams, input.source, session, &mut unused_streams, input.world, input.roster
338+
)?;
339+
input.give_response(session, response, unused_streams)
326340
};
327341
Callback::new(MapCallback { callback })
328342
}

src/chain.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::error::Error;
2323

2424
use crate::{
2525
UnusedTarget, AddOperation, Node, InputSlot, Builder,
26-
StreamPack, Provider, ProvideOnce, Scope,
26+
StreamPack, Provider, ProvideOnce, Scope, StreamOf,
2727
AsMap, IntoBlockingMap, IntoAsyncMap, Output, Noop,
2828
ForkTargetStorage, StreamTargetMap, ScopeSettings, CreateCancelFilter,
2929
CreateDisposalFilter,
@@ -417,6 +417,17 @@ impl<'w, 's, 'a, 'b, T: 'static + Send + Sync> Chain<'w, 's, 'a, 'b, T> {
417417
}
418418
}
419419

420+
impl<'w, 's, 'a, 'b, T> Chain<'w, 's, 'a, 'b, StreamOf<T>>
421+
where
422+
T: 'static + Send + Sync,
423+
{
424+
/// When the output value is wrapped in a [`StreamOf`] container, this will
425+
/// strip it out of that wrapper.
426+
pub fn inner(self) -> Chain<'w, 's, 'a, 'b, T> {
427+
self.map_block(|v| v.0)
428+
}
429+
}
430+
420431
impl<'w, 's, 'a, 'b, T, E> Chain<'w, 's, 'a, 'b, Result<T, E>>
421432
where
422433
T: 'static + Send + Sync,

src/disposal.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::collections::HashMap;
2828

2929
use crate::{
3030
OperationRoster, operation::ScopeStorage, Cancellation, UnhandledErrors,
31-
DisposalFailure,
31+
DisposalFailure, ImpulseMarker,
3232
};
3333

3434
#[derive(Debug, Clone)]
@@ -100,6 +100,10 @@ pub enum DisposalCause {
100100

101101
/// A scope was cancelled so its output has been disposed.
102102
Scope(Cancellation),
103+
104+
/// One or more streams from a node never emitted any signal. This can lead
105+
/// to unexpected
106+
UnusedStreams(UnusedStreams)
103107
}
104108

105109
/// A variant of [`DisposalCause`]
@@ -196,7 +200,6 @@ impl From<TaskDespawned> for DisposalCause {
196200
Self::TaskDespawned(value)
197201
}
198202
}
199-
200203
/// A variant of [`DisposalCause`]
201204
#[derive(Debug)]
202205
pub struct PoisonedMutexDisposal {
@@ -210,6 +213,26 @@ impl From<PoisonedMutexDisposal> for DisposalCause {
210213
}
211214
}
212215

216+
#[derive(Debug)]
217+
pub struct UnusedStreams {
218+
/// The node which did not use all its streams
219+
pub node: Entity,
220+
/// The streams which went unused.
221+
pub streams: Vec<&'static str>,
222+
}
223+
224+
impl UnusedStreams {
225+
pub fn new(node: Entity) -> Self {
226+
Self { node, streams: Default::default() }
227+
}
228+
}
229+
230+
impl From<UnusedStreams> for DisposalCause {
231+
fn from(value: UnusedStreams) -> Self {
232+
Self::UnusedStreams(value)
233+
}
234+
}
235+
213236
pub trait ManageDisposal {
214237
fn emit_disposal(
215238
&mut self,
@@ -233,15 +256,19 @@ impl<'w> ManageDisposal for EntityMut<'w> {
233256
roster: &mut OperationRoster,
234257
) {
235258
let Some(scope) = self.get::<ScopeStorage>() else {
236-
let broken_node = self.id();
237-
self.world_scope(|world| {
238-
world
239-
.get_resource_or_insert_with(|| UnhandledErrors::default())
240-
.disposals
241-
.push(DisposalFailure {
242-
disposal, broken_node, backtrace: Some(Backtrace::new())
259+
if !self.contains::<ImpulseMarker>() {
260+
// If the emitting node does not have a scope as not part of an
261+
// impulse chain, then something is broken.
262+
let broken_node = self.id();
263+
self.world_scope(|world| {
264+
world
265+
.get_resource_or_insert_with(|| UnhandledErrors::default())
266+
.disposals
267+
.push(DisposalFailure {
268+
disposal, broken_node, backtrace: Some(Backtrace::new())
269+
});
243270
});
244-
});
271+
}
245272
return;
246273
};
247274
let scope = scope.get();

src/impulse.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ where
124124
let target = self.commands.spawn((
125125
Detached::default(),
126126
UnusedTarget,
127+
ImpulseMarker,
127128
)).id();
128129

129130
// We should automatically delete the previous step in the chain once

src/impulse/internal.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ pub(crate) trait Impulsive {
4242
fn execute(request: OperationRequest) -> OperationResult;
4343
}
4444

45+
#[derive(Component)]
46+
pub(crate) struct ImpulseMarker;
47+
4548
pub(crate) struct AddImpulse<I: Impulsive> {
4649
source: Entity,
4750
impulse: I,
@@ -64,6 +67,7 @@ impl<I: Impulsive + 'static + Sync + Send> Command for AddImpulse<I> {
6467
.insert((
6568
OperationExecuteStorage(perform_impulse::<I>),
6669
Cancellable::new(cancel_impulse),
70+
ImpulseMarker,
6771
))
6872
.remove::<UnusedTarget>();
6973
}

src/impulse/map.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
Impulsive, OperationSetup, OperationRequest, SingleTargetStorage, StreamPack,
2727
InputBundle, OperationResult, OrBroken, Input, ManageInput,
2828
ChannelQueue, BlockingMap, AsyncMap, Channel, OperateTask, ActiveTasksStorage,
29-
CallBlockingMapOnce, CallAsyncMapOnce,
29+
CallBlockingMapOnce, CallAsyncMapOnce, UnusedStreams,
3030
};
3131

3232
/// The key difference between this and [`crate::OperateBlockingMap`] is that
@@ -94,7 +94,12 @@ where
9494

9595
let response = f.call(BlockingMap { request, streams: streams.clone(), source, session });
9696

97-
Streams::process_buffer(streams, source, session, world, roster)?;
97+
let mut unused_streams = UnusedStreams::new(source);
98+
Streams::process_buffer(
99+
streams, source, session, &mut unused_streams, world, roster
100+
)?;
101+
// Note: We do not need to emit a disposal for any unused streams since
102+
// this is only used for impulses, not workflows.
98103

99104
world.get_entity_mut(target).or_broken()?.give_input(session, response, roster)?;
100105
Ok(())

src/input.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use backtrace::Backtrace;
3232
use crate::{
3333
OperationRoster, OperationError, OrBroken,
3434
DeferredRoster, Cancel, Cancellation, CancellationCause, Broken,
35-
BufferSettings, RetentionPolicy, ForkTargetStorage,
35+
BufferSettings, RetentionPolicy, ForkTargetStorage, UnusedTarget,
3636
};
3737

3838
/// Typical container for input data accompanied by its session information.
@@ -210,8 +210,18 @@ impl<'w> ManageInput for EntityMut<'w> {
210210
session: Entity,
211211
data: T,
212212
) -> Result<(), OperationError> {
213-
let mut storage = self.get_mut::<InputStorage<T>>().or_broken()?;
214-
storage.reverse_queue.insert(0, Input { session, data });
213+
if let Some(mut storage) = self.get_mut::<InputStorage<T>>() {
214+
storage.reverse_queue.insert(0, Input { session, data });
215+
} else if !self.contains::<UnusedTarget>() {
216+
// If the input is being fed to an unused target then we can
217+
// generally ignore it, although it may indicate a bug in the user's
218+
// workflow because workflow branches that end in an unused target
219+
// will be spuriously dropped when the scope terminates.
220+
221+
// However in this case, the target is not unused but also does not
222+
// have the correct input storage type. This indicates
223+
None.or_broken()?;
224+
}
215225
Ok(())
216226
}
217227

src/operation/operate_map.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ use crate::{
2727
SingleTargetStorage, StreamPack, Input, ManageInput, OperationCleanup,
2828
CallBlockingMap, CallAsyncMap, SingleInputStorage, OperationResult,
2929
OrBroken, OperationSetup, OperationRequest, OperateTask, ActiveTasksStorage,
30-
OperationReachability, ReachabilityResult, InputBundle,
30+
OperationReachability, ReachabilityResult, InputBundle, UnusedStreams,
31+
ManageDisposal,
3132
};
3233

3334
#[derive(Bundle)]
@@ -96,7 +97,14 @@ where
9697
let response = f.call(BlockingMap { request, streams: streams.clone(), source, session });
9798
map.f = Some(f);
9899

99-
Streams::process_buffer(streams, source, session, world, roster)?;
100+
let mut unused_streams = UnusedStreams::new(source);
101+
Streams::process_buffer(
102+
streams, source, session, &mut unused_streams, world, roster
103+
)?;
104+
if !unused_streams.streams.is_empty() {
105+
world.get_entity_mut(source).or_broken()?
106+
.emit_disposal(session, unused_streams.into(), roster);
107+
}
100108

101109
world.get_entity_mut(target).or_broken()?.give_input(session, response, roster)?;
102110
Ok(())

src/request.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::future::Future;
2121

2222
use crate::{
2323
UnusedTarget, StreamPack, ProvideOnce, IntoBlockingMapOnce, IntoAsyncMap,
24-
Impulse, Detached, InputCommand, Cancellable, cancel_impulse,
24+
Impulse, Detached, InputCommand, Cancellable, ImpulseMarker, cancel_impulse,
2525
};
2626

2727
/// Extensions for creating impulse chains by making a request to a provider or
@@ -88,10 +88,14 @@ impl<'w, 's> RequestExt<'w, 's> for Commands<'w, 's> {
8888
let target = self.spawn((
8989
Detached::default(),
9090
UnusedTarget,
91+
ImpulseMarker,
9192
)).id();
9293

9394
let source = self
94-
.spawn(Cancellable::new(cancel_impulse))
95+
.spawn((
96+
Cancellable::new(cancel_impulse),
97+
ImpulseMarker,
98+
))
9599
// We set the parent of this source to the target so that when the
96100
// target gets despawned, this will also be despawned.
97101
.set_parent(target)

src/service/blocking.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use crate::{
1919
BlockingService, BlockingServiceInput, IntoService, ServiceTrait, ServiceRequest,
2020
Input, ManageInput, ServiceBundle, OperationRequest, OperationError, StreamPack,
21-
OrBroken, dispose_for_despawned_service,
21+
UnusedStreams, ManageDisposal, OrBroken, dispose_for_despawned_service,
2222
service::builder::BlockingChosen,
2323
};
2424

@@ -115,7 +115,9 @@ where
115115
request, streams: streams.clone(), provider, source, session,
116116
}, world);
117117
service.apply_deferred(world);
118-
Streams::process_buffer(streams, source, session, world, roster)?;
118+
119+
let mut unused_streams = UnusedStreams::new(source);
120+
Streams::process_buffer(streams, source, session, &mut unused_streams, world, roster)?;
119121

120122
if let Some(mut provider_mut) = world.get_entity_mut(provider) {
121123
if let Some(mut storage) = provider_mut.get_mut::<BlockingServiceStorage<Request, Response, Streams>>() {
@@ -131,6 +133,11 @@ where
131133
// not consider this to be cancelled.
132134
}
133135

136+
if !unused_streams.streams.is_empty() {
137+
world.get_entity_mut(source).or_broken()?
138+
.emit_disposal(session, unused_streams.into(), roster);
139+
}
140+
134141
world.get_entity_mut(target).or_broken()?
135142
.give_input(session, response, roster)?;
136143
Ok(())

0 commit comments

Comments
 (0)