Skip to content

Commit 4d4258f

Browse files
committed
Fix bug where events where not being dropped
1 parent b592a72 commit 4d4258f

File tree

3 files changed

+108
-13
lines changed

3 files changed

+108
-13
lines changed

crates/bevy_app/src/app.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{First, Main, MainSchedulePlugin, Plugin, Plugins, StateTransition};
1+
use crate::{First, FixedPostUpdate, Main, MainSchedulePlugin, Plugin, Plugins, StateTransition};
22
pub use bevy_derive::AppLabel;
33
use bevy_ecs::{
44
prelude::*,
@@ -213,6 +213,7 @@ pub enum PluginsState {
213213

214214
// Dummy plugin used to temporary hold the place in the plugin registry
215215
struct PlaceholderPlugin;
216+
216217
impl Plugin for PlaceholderPlugin {
217218
fn build(&self, _app: &mut App) {}
218219
}
@@ -507,6 +508,11 @@ impl App {
507508
bevy_ecs::event::event_update_system::<T>
508509
.run_if(bevy_ecs::event::event_update_condition::<T>),
509510
);
511+
self.init_resource::<bevy_ecs::event::EventUpdateSignal<T>>()
512+
.add_systems(
513+
FixedPostUpdate,
514+
bevy_ecs::event::event_queue_update_system::<T>,
515+
);
510516
}
511517
self
512518
}

crates/bevy_ecs/src/event.rs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate as bevy_ecs;
44
use crate::system::{Local, Res, ResMut, Resource, SystemParam};
55
pub use bevy_ecs_macros::Event;
66
use bevy_utils::detailed_trace;
7+
use bevy_utils::tracing::error;
78
use std::ops::{Deref, DerefMut};
89
use std::{
910
cmp::Ordering,
@@ -13,6 +14,7 @@ use std::{
1314
marker::PhantomData,
1415
slice::Iter,
1516
};
17+
1618
/// A type that can be stored in an [`Events<E>`] resource
1719
/// You can conveniently access events using the [`EventReader`] and [`EventWriter`] system parameter.
1820
///
@@ -33,6 +35,7 @@ pub struct EventId<E: Event> {
3335
}
3436

3537
impl<E: Event> Copy for EventId<E> {}
38+
3639
impl<E: Event> Clone for EventId<E> {
3740
fn clone(&self) -> Self {
3841
*self
@@ -747,27 +750,51 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
747750
}
748751
}
749752

753+
#[doc(hidden)]
754+
#[derive(Resource)]
755+
pub struct EventUpdateSignal<T: Event> {
756+
signal: bool,
757+
_marker: PhantomData<fn() -> T>,
758+
}
759+
760+
impl<T: Event> Default for EventUpdateSignal<T> {
761+
fn default() -> Self {
762+
Self {
763+
signal: false,
764+
_marker: PhantomData,
765+
}
766+
}
767+
}
768+
750769
#[doc(hidden)]
751770
#[derive(Resource, Default)]
752-
pub struct EventUpdateSignal(bool);
771+
pub struct EventUpdateShouldWaitForFixedUpdate;
753772

754773
/// A system that queues a call to [`Events::update`].
755-
pub fn event_queue_update_system(signal: Option<ResMut<EventUpdateSignal>>) {
774+
pub fn event_queue_update_system<T: Event>(signal: Option<ResMut<EventUpdateSignal<T>>>) {
756775
if let Some(mut s) = signal {
757-
s.0 = true;
776+
s.signal = true;
758777
}
759778
}
760779

761780
/// A system that calls [`Events::update`].
762781
pub fn event_update_system<T: Event>(
763-
signal: Option<ResMut<EventUpdateSignal>>,
782+
should_wait: Option<Res<EventUpdateShouldWaitForFixedUpdate>>,
783+
should_update: Option<ResMut<EventUpdateSignal<T>>>,
764784
mut events: ResMut<Events<T>>,
765785
) {
766-
if let Some(mut s) = signal {
767-
// If we haven't got a signal to update the events, but we *could* get such a signal
768-
// return early and update the events later.
769-
if !std::mem::replace(&mut s.0, false) {
770-
return;
786+
if should_wait.is_some() {
787+
match should_update {
788+
Some(mut should_update) => {
789+
// If we haven't got a signal to update the events, but we *could* get such a signal
790+
// return early and update the events later.
791+
if !std::mem::replace(&mut should_update.signal, false) {
792+
return;
793+
}
794+
}
795+
None => {
796+
error!("EventUpdateSignal<{0}> resource not found but fixed update systems are active. Please add EventUpdateSignal<{0}> as a resource", std::any::type_name::<T>());
797+
}
771798
}
772799
}
773800

crates/bevy_time/src/lib.rs

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub mod prelude {
2525
}
2626

2727
use bevy_app::{prelude::*, RunFixedMainLoop};
28-
use bevy_ecs::event::{event_queue_update_system, EventUpdateSignal};
28+
use bevy_ecs::event::EventUpdateShouldWaitForFixedUpdate;
2929
use bevy_ecs::prelude::*;
3030
use bevy_utils::{tracing::warn, Duration, Instant};
3131
pub use crossbeam_channel::TrySendError;
@@ -60,8 +60,7 @@ impl Plugin for TimePlugin {
6060
.add_systems(RunFixedMainLoop, run_fixed_main_schedule);
6161

6262
// ensure the events are not dropped until `FixedMain` systems can observe them
63-
app.init_resource::<EventUpdateSignal>()
64-
.add_systems(FixedPostUpdate, event_queue_update_system);
63+
app.init_resource::<EventUpdateShouldWaitForFixedUpdate>();
6564

6665
#[cfg(feature = "bevy_ci_testing")]
6766
if let Some(ci_testing_config) = app
@@ -142,3 +141,66 @@ fn time_system(
142141
TimeUpdateStrategy::ManualDuration(duration) => time.update_with_duration(*duration),
143142
}
144143
}
144+
145+
#[cfg(test)]
146+
mod tests {
147+
use crate::{Fixed, Time, TimePlugin, TimeUpdateStrategy};
148+
use bevy_app::{App, Startup, Update};
149+
use bevy_ecs::event::{Event, EventReader, EventWriter};
150+
use std::error::Error;
151+
use std::time::Duration;
152+
153+
#[derive(Event)]
154+
struct TestEvent<T: Default> {
155+
sender: std::sync::mpsc::Sender<T>,
156+
}
157+
158+
impl<T: Default> Drop for TestEvent<T> {
159+
fn drop(&mut self) {
160+
self.sender
161+
.send(T::default())
162+
.expect("Failed to send drop signal");
163+
}
164+
}
165+
166+
#[test]
167+
fn events_get_dropped_regression_test_11528() -> Result<(), impl Error> {
168+
let (tx1, rx1) = std::sync::mpsc::channel();
169+
let (tx2, rx2) = std::sync::mpsc::channel();
170+
let mut app = App::new();
171+
app.add_plugins(TimePlugin)
172+
.add_event::<TestEvent<i32>>()
173+
.add_event::<TestEvent<()>>()
174+
.add_systems(Startup, move |mut ev2: EventWriter<TestEvent<()>>| {
175+
ev2.send(TestEvent {
176+
sender: tx2.clone(),
177+
});
178+
})
179+
.add_systems(Update, move |mut ev1: EventWriter<TestEvent<i32>>| {
180+
// Keep adding events so this event type is processed every update
181+
ev1.send(TestEvent {
182+
sender: tx1.clone(),
183+
});
184+
})
185+
.add_systems(
186+
Update,
187+
|mut ev1: EventReader<TestEvent<i32>>, mut ev2: EventReader<TestEvent<()>>| {
188+
// Read events so they can be dropped
189+
for _ in ev1.read() {}
190+
for _ in ev2.read() {}
191+
},
192+
)
193+
.insert_resource(TimeUpdateStrategy::ManualDuration(
194+
Time::<Fixed>::default().timestep(),
195+
));
196+
197+
for _ in 0..10 {
198+
app.update();
199+
}
200+
201+
// Check event type 1 as been dropped at least once
202+
let _drop_signal = rx1.recv_timeout(Duration::from_millis(1000));
203+
// Check event type 2 has been dropped
204+
rx2.recv_timeout(Duration::from_millis(1000))
205+
}
206+
}

0 commit comments

Comments
 (0)