Skip to content

Commit 1afe576

Browse files
committed
Update flush implementation
Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
1 parent 1b7417d commit 1afe576

File tree

8 files changed

+274
-78
lines changed

8 files changed

+274
-78
lines changed

src/errors.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub struct UnhandledErrors {
3232
pub disposals: Vec<DisposalFailure>,
3333
pub stop_tasks: Vec<StopTaskFailure>,
3434
pub broken: Vec<Broken>,
35+
pub unused_targets: Vec<UnusedTargetDrop>,
3536
pub miscellaneous: Vec<MiscellaneousFailure>,
3637
}
3738

@@ -70,6 +71,15 @@ pub struct StopTaskFailure {
7071
pub backtrace: Option<Backtrace>,
7172
}
7273

74+
/// An impulse chain was dropped because its final target was unused but `detach()`
75+
/// was not called on it. This is almost always a usage error, so we report it here.
76+
pub struct UnusedTargetDrop {
77+
/// Which target was dropped.
78+
pub unused_target: Entity,
79+
/// Which impulses were dropped as a consequence of the unused target.
80+
pub dropped_impulses: Vec<Entity>,
81+
}
82+
7383
/// Use this for any failures that are not covered by the other categories
7484
pub struct MiscellaneousFailure {
7585
pub error: Anyhow,

src/flush.rs

Lines changed: 166 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,92 @@
1818
use bevy::{
1919
prelude::{
2020
Entity, World, Query, QueryState, Added, With, Resource, Deref, DerefMut,
21+
Children, BuildWorldChildren, DespawnRecursiveExt,
2122
},
22-
ecs::system::SystemState,
23+
ecs::system::{SystemState, Command},
2324
};
2425

2526
use smallvec::SmallVec;
2627

2728
use crate::{
28-
ChannelQueue, WakeQueue, OperationRoster, ServiceHook, InputReady,
29-
Cancel, UnusedTarget, ServiceLifecycle, ServiceLifecycleChannel,
30-
OperationRequest,
29+
ChannelQueue, WakeQueue, OperationRoster, ServiceHook, Detached,
30+
UnusedTarget, ServiceLifecycle, ServiceLifecycleChannel,
31+
OperationRequest, ImpulseLifecycleChannel, AddImpulse, Finished,
32+
UnhandledErrors, UnusedTargetDrop,
3133
execute_operation, dispose_for_despawned_service,
3234
};
3335

36+
#[derive(Resource, Default)]
37+
pub struct FlushParameters {
38+
/// By default, a flush will loop until the whole [`OperationRoster`] is empty.
39+
/// If there are loops of blocking services then it is possible for the flush
40+
/// to loop indefinitely, creating the appearance of a blockup in the
41+
/// application, or delaying other systems from running for a prolonged amount
42+
/// of time.
43+
///
44+
/// Use this limit to prevent the flush from blocking for too long in those
45+
/// scenarios. If the flush loops beyond this limit, anything remaining in
46+
/// the roster will be moved into the [`DeferredRoster`] to be processed
47+
/// during the next flush.
48+
///
49+
/// A value of `None` means the flush can loop indefinitely (this is the default).
50+
pub flush_loop_limit: Option<usize>,
51+
}
52+
3453
#[allow(private_interfaces)]
3554
pub fn flush_impulses(
3655
world: &mut World,
37-
input_ready_query: &mut QueryState<Entity, Added<InputReady>>,
3856
new_service_query: &mut QueryState<(Entity, &mut ServiceHook), Added<ServiceHook>>,
3957
) {
4058
let mut roster = OperationRoster::new();
59+
collect_from_channels(new_service_query, world, &mut roster);
4160

42-
world.get_resource_or_insert_with(|| ServiceLifecycleChannel::new());
43-
world.resource_scope::<ServiceLifecycleChannel, ()>(|world, lifecycles| {
44-
// Clean up the dangling requests of any services that have been despawned.
45-
for removed_service in lifecycles.receiver.try_iter() {
46-
dispose_for_despawned_service(removed_service, world, &mut roster)
61+
let mut loop_count = 0;
62+
while !roster.is_empty() {
63+
if world.get_resource_or_insert_with(
64+
|| FlushParameters::default()
65+
).flush_loop_limit.is_some_and(
66+
|limit| limit <= loop_count
67+
) {
68+
// We have looped beyoond the limit, so we will defer anything that
69+
// remains in the roster and stop looping from here.
70+
world.get_resource_or_insert_with(|| DeferredRoster::default())
71+
.append(&mut roster);
72+
break;
4773
}
4874

49-
// Add a lifecycle tracker to any new services that might have shown up
50-
for (e, mut hook) in new_service_query.iter_mut(world) {
51-
hook.lifecycle = Some(ServiceLifecycle::new(e, lifecycles.sender.clone()));
75+
garbage_cleanup(world, &mut roster);
76+
77+
while let Some(unblock) = roster.unblock.pop_front() {
78+
let serve_next = unblock.serve_next;
79+
serve_next(unblock, world, &mut roster);
80+
garbage_cleanup(world, &mut roster);
5281
}
53-
});
5482

83+
while let Some(source) = roster.queue.pop_front() {
84+
execute_operation(OperationRequest { source, world, roster: &mut roster });
85+
garbage_cleanup(world, &mut roster);
86+
}
87+
88+
collect_from_channels(new_service_query, world, &mut roster);
89+
}
90+
}
91+
92+
fn garbage_cleanup(world: &mut World, roster: &mut OperationRoster) {
93+
while let Some(cleanup) = roster.cleanup_finished.pop() {
94+
cleanup.trigger(world, roster);
95+
}
96+
97+
while let Some(cancel) = roster.cancel.pop_front() {
98+
cancel.trigger(world, roster);
99+
}
100+
}
101+
102+
fn collect_from_channels(
103+
new_service_query: &mut QueryState<(Entity, &mut ServiceHook), Added<ServiceHook>>,
104+
world: &mut World,
105+
roster: &mut OperationRoster,
106+
) {
55107
// Get the receiver for async task commands
56108
let async_receiver = world.get_resource_or_insert_with(|| ChannelQueue::new()).receiver.clone();
57109

@@ -60,10 +112,24 @@ pub fn flush_impulses(
60112
(item)(world, &mut roster);
61113
}
62114

63-
// Queue any operations whose inputs are ready
64-
for e in input_ready_query.iter(world) {
65-
roster.queue(e);
66-
}
115+
world.get_resource_or_insert_with(|| ServiceLifecycleChannel::new());
116+
world.resource_scope(|world, lifecycles: ServiceLifecycleChannel| {
117+
// Clean up the dangling requests of any services that have been despawned.
118+
for removed_service in lifecycles.receiver.try_iter() {
119+
dispose_for_despawned_service(removed_service, world, &mut roster)
120+
}
121+
122+
// Add a lifecycle tracker to any new services that might have shown up
123+
// TODO(@mxgrey): Make sure this works for services which are spawned by
124+
// providers that are being flushed.
125+
for (e, mut hook) in new_service_query.iter_mut(world) {
126+
hook.lifecycle = Some(ServiceLifecycle::new(e, lifecycles.sender.clone()));
127+
}
128+
});
129+
130+
// Queue any operations that needed to be deferred
131+
let mut deferred = world.get_resource_or_insert_with(|| DeferredRoster::default());
132+
roster.append(&mut deferred);
67133

68134
// Collect any tasks that are ready to be woken
69135
for wakeable in world
@@ -74,49 +140,103 @@ pub fn flush_impulses(
74140
roster.queue(wakeable);
75141
}
76142

77-
let mut unused_targets_state: SystemState<Query<Entity, With<UnusedTarget>>> =
143+
let mut unused_targets_state: SystemState<Query<(Entity, &Detached), With<UnusedTarget>>> =
78144
SystemState::new(world);
79-
let mut unused_targets: SmallVec<[_; 8]> = unused_targets_state.get(world).iter().collect();
80-
for target in unused_targets.drain(..) {
81-
roster.drop_dependency(Cancel::unused_target(target));
145+
146+
let mut add_finish: SmallVec<[_; 8]> = SmallVec::new();
147+
let mut drop_targets: SmallVec<[_; 8]> = SmallVec::new();
148+
for (e, detached) in unused_targets_state.get(world).iter() {
149+
if detached.is_detached() {
150+
add_finish.push(e);
151+
} else {
152+
drop_targets.push(e);
153+
}
154+
}
155+
156+
for e in add_finish {
157+
// Add a Finished impulse to the unused target of a detached impulse
158+
// chain.
159+
AddImpulse::new(e, Finished).apply(world);
160+
}
161+
162+
for target in drop_targets.drain(..) {
163+
drop_target(target, world, roster, true);
82164
}
83165

84-
unused_targets.extend(
166+
drop_targets.extend(
85167
world
86-
.get_resource_or_insert_with(|| DroppedPromiseQueue::new())
168+
.get_resource_or_insert_with(|| ImpulseLifecycleChannel::default())
87169
.receiver
88170
.try_iter()
89171
);
90-
for target in unused_targets.drain(..) {
91-
roster.drop_dependency(Cancel::dropped(target))
92-
}
93172

94-
while !roster.is_empty() {
95-
while let Some(unblock) = roster.unblock.pop_front() {
96-
let serve_next = unblock.serve_next;
97-
serve_next(unblock, world, &mut roster);
98-
99-
while let Some(cleanup) = roster.cleanup_finished.pop() {
100-
cleanup.trigger(world, &mut roster);
101-
}
173+
for target in drop_targets.drain(..) {
174+
drop_target(target, world, roster, false);
175+
}
176+
}
102177

103-
while let Some(cancel) = roster.cancel.pop_front() {
104-
cancel.trigger(world, &mut roster);
178+
fn drop_target(
179+
target: Entity,
180+
world: &mut World,
181+
roster: &mut OperationRoster,
182+
unused: bool,
183+
) {
184+
roster.purge(target);
185+
let mut dropped_impulses = Vec::new();
186+
let mut detached_impulse = None;
187+
188+
let mut impulse = target;
189+
let mut search_state: SystemState<(
190+
Query<&Children>,
191+
Query<&Detached>,
192+
)> = SystemState::new(world);
193+
194+
let (q_children, q_detached) = search_state.get(world);
195+
loop {
196+
if let Ok(children) = q_children.get(impulse) {
197+
for child in children {
198+
let Ok(detached) = q_detached.get(*child) else {
199+
continue;
200+
};
201+
if detached.is_detached() {
202+
// This child is detached so we will not include it in the
203+
// dropped impulses. We need to de-parent it so that it does
204+
// not get despawned with the rest of the impulses that we
205+
// are dropping.
206+
detached_impulse = Some(*child);
207+
break;
208+
} else {
209+
// This child is not detached, so we will include it in our
210+
// dropped impulses, and crawl towards one of it children.
211+
if unused {
212+
dropped_impulses.push(impulse);
213+
}
214+
roster.purge(impulse);
215+
impulse = *child;
216+
continue;
217+
}
105218
}
106219
}
107220

108-
while let Some(source) = roster.queue.pop_front() {
109-
execute_operation(OperationRequest { source, world, roster: &mut roster });
110-
111-
while let Some(cleanup) = roster.cleanup_finished.pop() {
112-
cleanup.trigger(world, &mut roster);
113-
}
221+
// There is nothing further to include in the drop
222+
break;
223+
}
114224

115-
while let Some(cancel) = roster.cancel.pop_front() {
116-
cancel.trigger(world, &mut roster);
117-
}
225+
if let Some(detached_impulse) = detached_impulse {
226+
if let Some(mut detached_impulse_mut) = world.get_entity_mut(detached_impulse) {
227+
detached_impulse_mut.remove_parent();
118228
}
119229
}
230+
231+
if let Some(mut unused_target_mut) = world.get_entity_mut(target) {
232+
unused_target_mut.despawn_recursive();
233+
}
234+
235+
if unused {
236+
world.get_resource_or_insert_with(|| UnhandledErrors::default())
237+
.unused_targets
238+
.push(UnusedTargetDrop { unused_target: target, dropped_impulses });
239+
}
120240
}
121241

122242
/// This resource is used to queue up operations in the roster in situations

src/impulse.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ use crate::{
2929
mod detach;
3030
pub(crate) use detach::*;
3131

32+
mod finished;
33+
pub(crate) use finished::*;
34+
3235
mod insert;
3336
pub(crate) use insert::*;
3437

@@ -63,10 +66,20 @@ where
6366
Response: 'static + Send + Sync,
6467
Streams: StreamPack,
6568
{
66-
/// Keep carrying out the impulse chain up to here even if a downstream
67-
/// dependent was dropped.
69+
/// Keep executing out the impulse chain up to here even if a downstream
70+
/// dependent was dropped. If you continue building the chain from this
71+
/// point then the later impulses will not be affected by this use of
72+
/// detached.
73+
///
74+
/// Downstream dependencies get dropped in the following situations:
75+
/// - [`Self::take`] or [`Self::take_response`]: The promise containing the response is dropped.
76+
/// - [`Self::store`], [`Self::push`], or [`Self::insert`]: The target entity of the operation is despawned.
77+
/// - [`Self::send_event`]: This will never be dropped, effectively making it detached automatically.
78+
/// - Not using any of the above: The dependency will immediately be dropped during a flush.
79+
/// If you do not use detach in this scenario, then the chain will be immediately dropped
80+
/// without being run at all. This will also push an error into [`UnhandledErrors`](crate::UnhandledErrors).
6881
pub fn detach(self) -> Impulse<'w, 's, 'a, Response, Streams> {
69-
self.commands.add(Detach { session: self.target });
82+
self.commands.add(Detach { target: self.target });
7083
self
7184
}
7285

@@ -278,6 +291,7 @@ where
278291
}
279292
}
280293

294+
/// Contains the final response and streams produced at the end of an impulse chain.
281295
pub struct Recipient<Response, Streams: StreamPack> {
282296
pub response: Promise<Response>,
283297
pub streams: Streams::Receiver,

src/impulse/detach.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,20 @@ impl Default for Detached {
3535
}
3636
}
3737

38+
impl Detached {
39+
pub fn is_detached(&self) {
40+
self.0
41+
}
42+
}
43+
3844
pub(crate) struct Detach {
39-
pub(crate) session: Entity,
45+
pub(crate) target: Entity,
4046
}
4147

4248
impl Command for Detach {
4349
fn apply(self, world: &mut World) {
4450
let backtrace;
45-
if let Some(mut session_mut) = world.get_entity_mut(self.session) {
51+
if let Some(mut session_mut) = world.get_entity_mut(self.target) {
4652
if let Some(mut detached) = session_mut.get_mut::<Detached>() {
4753
detached.0 = true;
4854
session_mut.remove::<UnusedTarget>();
@@ -59,7 +65,7 @@ impl Command for Detach {
5965
}
6066

6167
let failure = MiscellaneousFailure {
62-
error: anyhow!("Unable to detach target {:?}", self.session),
68+
error: anyhow!("Unable to detach target {:?}", self.target),
6369
backtrace: Some(backtrace),
6470
};
6571
world.get_resource_or_insert_with(|| UnhandledErrors::default())

0 commit comments

Comments
 (0)