Skip to content

Commit 3d5425d

Browse files
authored
Fix detachment (#40)
Signed-off-by: Michael X. Grey <greyxmike@gmail.com>
1 parent 0def9db commit 3d5425d

File tree

8 files changed

+215
-64
lines changed

8 files changed

+215
-64
lines changed

src/builder.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -959,11 +959,15 @@ mod tests {
959959
let mut promise = context.command(|commands| commands.request(5, workflow).take_response());
960960

961961
context.run_with_conditions(&mut promise, Duration::from_secs(2));
962+
assert!(
963+
context.no_unhandled_errors(),
964+
"{:#?}",
965+
context.get_unhandled_errors(),
966+
);
962967
assert!(promise.peek().is_cancelled());
963968
let channel_output = receiver.try_recv().unwrap();
964969
assert_eq!(channel_output, 5);
965970
assert!(receiver.try_recv().is_err());
966-
assert!(context.no_unhandled_errors());
967971
assert!(context.confirm_buffers_empty().is_ok());
968972

969973
let (cancel_sender, mut cancel_receiver) = unbounded_channel();

src/flush.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ fn flush_impulses_impl(
8787

8888
let mut loop_count = 0;
8989
while !roster.is_empty() {
90+
for e in roster.deferred_despawn.drain(..) {
91+
if let Some(e_mut) = world.get_entity_mut(e) {
92+
e_mut.despawn_recursive();
93+
}
94+
}
95+
9096
let parameters = world.get_resource_or_insert_with(FlushParameters::default);
9197
let flush_loop_limit = parameters.flush_loop_limit;
9298
let single_threaded_poll_limit = parameters.single_threaded_poll_limit;

src/impulse.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ where
131131
// this one is finished.
132132
self.commands
133133
.entity(source)
134-
.insert(Cancellable::new(cancel_impulse))
134+
.insert((Cancellable::new(cancel_impulse), ImpulseMarker))
135+
.remove::<UnusedTarget>()
135136
.set_parent(target);
136137
provider.connect(None, source, target, self.commands);
137138
Impulse {
@@ -484,6 +485,38 @@ mod tests {
484485
assert!(context.no_unhandled_errors());
485486
}
486487

488+
#[test]
489+
fn test_detach() {
490+
// This is a regression test that covers a bug which existed due to
491+
// an incorrect handling of detached impulses when giving input.
492+
let mut context = TestingContext::minimal_plugins();
493+
let service = context.spawn_delayed_map(Duration::from_millis(1), |n| n + 1);
494+
495+
context.command(|commands| {
496+
commands.provide(0).then(service).detach();
497+
});
498+
499+
let (sender, mut promise) = Promise::<()>::new();
500+
context.run_with_conditions(&mut promise, Duration::from_millis(5));
501+
assert!(
502+
context.no_unhandled_errors(),
503+
"Unhandled errors: {:#?}",
504+
context.get_unhandled_errors(),
505+
);
506+
507+
// The promise and sender only exist because run_with_conditions requires
508+
// them. Moreover we need to make sure that sender does not get dropped
509+
// prematurely by the compiler, otherwise the promise will have the run
510+
// exit prematurely. Therefore we call .send(()) here to guarantee the
511+
// compiler knows to keep it alive until the running is finished.
512+
//
513+
// We have observed that using `let (_, mut promise) = ` will cause the
514+
// sender to drop prematurely, so we don't want to risk that there are
515+
// other cases where that may happen. It is important for the run to
516+
// last multiple cycles.
517+
sender.send(()).ok();
518+
}
519+
487520
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
488521
struct UnitLabel;
489522

@@ -547,6 +580,20 @@ mod tests {
547580
);
548581

549582
verify_delivery_instruction_matrix(service, &mut context);
583+
584+
let async_service = service;
585+
let service = context.spawn_io_workflow(|scope, builder| {
586+
scope
587+
.input
588+
.chain(builder)
589+
.then(async_service)
590+
.connect(scope.terminate);
591+
});
592+
593+
verify_delivery_instruction_matrix(service, &mut context);
594+
595+
// We don't test blocking services because blocking services are always
596+
// serial no matter what, so delivery instructions have no effect for them.
550597
}
551598

552599
fn verify_delivery_instruction_matrix(

src/input.rs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ use smallvec::SmallVec;
2626
use backtrace::Backtrace;
2727

2828
use crate::{
29-
Broken, BufferStorage, Cancel, Cancellation, CancellationCause, DeferredRoster, OperationError,
30-
OperationRoster, OrBroken, SessionStatus, UnusedTarget,
29+
Broken, BufferStorage, Cancel, Cancellation, CancellationCause, DeferredRoster, Detached,
30+
MiscellaneousFailure, OperationError, OperationRoster, OrBroken, SessionStatus,
31+
UnhandledErrors, UnusedTarget,
3132
};
3233

3334
/// This contains data that has been provided as input into an operation, along
@@ -69,15 +70,31 @@ impl<T> Default for InputStorage<T> {
6970
}
7071
}
7172

73+
/// Used to keep track of the expected input type for an operation
74+
#[derive(Component)]
75+
pub(crate) struct InputTypeIndicator {
76+
pub(crate) name: &'static str,
77+
}
78+
79+
impl InputTypeIndicator {
80+
fn new<T>() -> Self {
81+
Self {
82+
name: std::any::type_name::<T>(),
83+
}
84+
}
85+
}
86+
7287
#[derive(Bundle)]
7388
pub struct InputBundle<T: 'static + Send + Sync> {
7489
storage: InputStorage<T>,
90+
indicator: InputTypeIndicator,
7591
}
7692

7793
impl<T: 'static + Send + Sync> InputBundle<T> {
7894
pub fn new() -> Self {
7995
Self {
8096
storage: Default::default(),
97+
indicator: InputTypeIndicator::new::<T>(),
8198
}
8299
}
83100
}
@@ -125,6 +142,7 @@ pub trait ManageInput {
125142
session: Entity,
126143
data: T,
127144
only_if_active: bool,
145+
roster: &mut OperationRoster,
128146
) -> Result<bool, OperationError>;
129147

130148
/// Get an input that is ready to be taken, or else produce an error.
@@ -150,7 +168,7 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
150168
data: T,
151169
roster: &mut OperationRoster,
152170
) -> Result<(), OperationError> {
153-
if unsafe { self.sneak_input(session, data, true)? } {
171+
if unsafe { self.sneak_input(session, data, true, roster)? } {
154172
roster.queue(self.id());
155173
}
156174
Ok(())
@@ -162,7 +180,7 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
162180
data: T,
163181
roster: &mut OperationRoster,
164182
) -> Result<(), OperationError> {
165-
if unsafe { self.sneak_input(session, data, true)? } {
183+
if unsafe { self.sneak_input(session, data, true, roster)? } {
166184
roster.defer(self.id());
167185
}
168186
Ok(())
@@ -173,6 +191,7 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
173191
session: Entity,
174192
data: T,
175193
only_if_active: bool,
194+
roster: &mut OperationRoster,
176195
) -> Result<bool, OperationError> {
177196
if only_if_active {
178197
let active_session =
@@ -193,13 +212,40 @@ impl<'w> ManageInput for EntityWorldMut<'w> {
193212
if let Some(mut storage) = self.get_mut::<InputStorage<T>>() {
194213
storage.reverse_queue.insert(0, Input { session, data });
195214
} else if !self.contains::<UnusedTarget>() {
215+
let id = self.id();
216+
if let Some(detached) = self.get::<Detached>() {
217+
if detached.is_detached() {
218+
// The input is going to a detached impulse that will not
219+
// react any further. We need to tell that detached impulse
220+
// to despawn since it is no longer needed.
221+
roster.defer_despawn(id);
222+
223+
// No error occurred, but the caller should not queue the
224+
// operation into the roster because it is being despawned.
225+
return Ok(false);
226+
}
227+
}
228+
229+
let expected = self.get::<InputTypeIndicator>().map(|i| i.name);
196230
// If the input is being fed to an unused target then we can
197231
// generally ignore it, although it may indicate a bug in the user's
198232
// workflow because workflow branches that end in an unused target
199233
// will be spuriously dropped when the scope terminates.
200234

201235
// However in this case, the target is not unused but also does not
202236
// have the correct input storage type. This indicates
237+
self.world_mut()
238+
.get_resource_or_insert_with(|| UnhandledErrors::default())
239+
.miscellaneous
240+
.push(MiscellaneousFailure {
241+
error: std::sync::Arc::new(anyhow::anyhow!(
242+
"Incorrect input type for operation [{:?}]: received [{}], expected [{}]",
243+
id,
244+
std::any::type_name::<T>(),
245+
expected.unwrap_or("<null>"),
246+
)),
247+
backtrace: None,
248+
});
203249
None.or_broken()?;
204250
}
205251
Ok(true)

src/operation.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,9 @@ pub struct OperationRoster {
223223
pub(crate) disposed: Vec<DisposalNotice>,
224224
/// Tell a scope to attempt cleanup
225225
pub(crate) cleanup_finished: Vec<Cleanup>,
226+
/// Despawn these entities while no other operation is running. This is used
227+
/// to cleanup detached impulses that receive no input.
228+
pub(crate) deferred_despawn: Vec<Entity>,
226229
}
227230

228231
impl OperationRoster {
@@ -262,6 +265,10 @@ impl OperationRoster {
262265
self.cleanup_finished.push(cleanup);
263266
}
264267

268+
pub fn defer_despawn(&mut self, source: Entity) {
269+
self.deferred_despawn.push(source);
270+
}
271+
265272
pub fn is_empty(&self) -> bool {
266273
self.queue.is_empty()
267274
&& self.awake.is_empty()
@@ -270,6 +277,7 @@ impl OperationRoster {
270277
&& self.unblock.is_empty()
271278
&& self.disposed.is_empty()
272279
&& self.cleanup_finished.is_empty()
280+
&& self.deferred_despawn.is_empty()
273281
}
274282

275283
pub fn append(&mut self, other: &mut Self) {
@@ -319,6 +327,17 @@ pub(crate) struct Blocker {
319327
pub(crate) serve_next: fn(Blocker, &mut World, &mut OperationRoster),
320328
}
321329

330+
impl std::fmt::Debug for Blocker {
331+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332+
f.debug_struct("Blocker")
333+
.field("provider", &self.provider)
334+
.field("source", &self.source)
335+
.field("session", &self.session)
336+
.field("label", &self.label)
337+
.finish()
338+
}
339+
}
340+
322341
#[derive(Clone, Debug)]
323342
pub enum OperationError {
324343
Broken(Option<Backtrace>),

src/operation/injection.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,16 @@ where
120120
// roster to register the task as an operation. In fact it does not
121121
// implement Operation at all. It is just a temporary container for the
122122
// input and the stream targets.
123-
unsafe {
123+
let execute = unsafe {
124124
world
125125
.entity_mut(task)
126-
.sneak_input(session, request, false)?;
126+
.sneak_input(session, request, false, roster)?
127+
};
128+
129+
if !execute {
130+
// If giving the input failed then this workflow will not be able to
131+
// proceed. Therefore we should report that this is broken.
132+
None.or_broken()?;
127133
}
128134

129135
let mut storage = world.get_mut::<InjectionStorage>(source).or_broken()?;

0 commit comments

Comments
 (0)