Skip to content

Commit e9be54b

Browse files
yyogojames7132pablo-lua
authored
Parallel event reader (#12554)
# Objective Allow parallel iteration over events, resolve #10766 ## Solution - Add `EventParIter` which works similarly to `QueryParIter`, implementing a `for_each{_with_id}` operator. I chose to not mirror `EventIteratorWithId` and instead implement both operations on a single struct. - Reuse `BatchingStrategy` from `QueryParIter` ## Changelog - `EventReader` now supports parallel event iteration using `par_read().for_each(|event| ...)`. --------- Co-authored-by: James Liu <contact@jamessliu.com> Co-authored-by: Pablo Reinhardt <126117294+pablo-lua@users.noreply.github.com>
1 parent 4735776 commit e9be54b

File tree

7 files changed

+341
-121
lines changed

7 files changed

+341
-121
lines changed

crates/bevy_ecs/src/batching.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
//! Types for controlling batching behavior during parallel processing.
2+
3+
use std::ops::Range;
4+
5+
/// Dictates how a parallel operation chunks up large quantities
6+
/// during iteration.
7+
///
8+
/// A parallel query will chunk up large tables and archetypes into
9+
/// chunks of at most a certain batch size. Similarly, a parallel event
10+
/// reader will chunk up the remaining events.
11+
///
12+
/// By default, this batch size is automatically determined by dividing
13+
/// the size of the largest matched archetype by the number
14+
/// of threads (rounded up). This attempts to minimize the overhead of scheduling
15+
/// tasks onto multiple threads, but assumes each entity has roughly the
16+
/// same amount of work to be done, which may not hold true in every
17+
/// workload.
18+
///
19+
/// See [`Query::par_iter`], [`EventReader::par_read`] for more information.
20+
///
21+
/// [`Query::par_iter`]: crate::system::Query::par_iter
22+
/// [`EventReader::par_read`]: crate::event::EventReader::par_read
23+
#[derive(Clone, Debug)]
24+
pub struct BatchingStrategy {
25+
/// The upper and lower limits for a batch of entities.
26+
///
27+
/// Setting the bounds to the same value will result in a fixed
28+
/// batch size.
29+
///
30+
/// Defaults to `[1, usize::MAX]`.
31+
pub batch_size_limits: Range<usize>,
32+
/// The number of batches per thread in the [`ComputeTaskPool`].
33+
/// Increasing this value will decrease the batch size, which may
34+
/// increase the scheduling overhead for the iteration.
35+
///
36+
/// Defaults to 1.
37+
///
38+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
39+
pub batches_per_thread: usize,
40+
}
41+
42+
impl Default for BatchingStrategy {
43+
fn default() -> Self {
44+
Self::new()
45+
}
46+
}
47+
48+
impl BatchingStrategy {
49+
/// Creates a new unconstrained default batching strategy.
50+
pub const fn new() -> Self {
51+
Self {
52+
batch_size_limits: 1..usize::MAX,
53+
batches_per_thread: 1,
54+
}
55+
}
56+
57+
/// Declares a batching strategy with a fixed batch size.
58+
pub const fn fixed(batch_size: usize) -> Self {
59+
Self {
60+
batch_size_limits: batch_size..batch_size,
61+
batches_per_thread: 1,
62+
}
63+
}
64+
65+
/// Configures the minimum allowed batch size of this instance.
66+
pub const fn min_batch_size(mut self, batch_size: usize) -> Self {
67+
self.batch_size_limits.start = batch_size;
68+
self
69+
}
70+
71+
/// Configures the maximum allowed batch size of this instance.
72+
pub const fn max_batch_size(mut self, batch_size: usize) -> Self {
73+
self.batch_size_limits.end = batch_size;
74+
self
75+
}
76+
77+
/// Configures the number of batches to assign to each thread for this instance.
78+
pub fn batches_per_thread(mut self, batches_per_thread: usize) -> Self {
79+
assert!(
80+
batches_per_thread > 0,
81+
"The number of batches per thread must be non-zero."
82+
);
83+
self.batches_per_thread = batches_per_thread;
84+
self
85+
}
86+
87+
/// Calculate the batch size according to the given thread count and max item count.
88+
/// The count is provided as a closure so that it can be calculated only if needed.
89+
///
90+
/// # Panics
91+
///
92+
/// Panics if `thread_count` is 0.
93+
///
94+
#[inline]
95+
pub fn calc_batch_size(&self, max_items: impl FnOnce() -> usize, thread_count: usize) -> usize {
96+
if self.batch_size_limits.is_empty() {
97+
return self.batch_size_limits.start;
98+
}
99+
assert!(
100+
thread_count > 0,
101+
"Attempted to run parallel iteration with an empty TaskPool"
102+
);
103+
let batches = thread_count * self.batches_per_thread;
104+
// Round up to the nearest batch size.
105+
let batch_size = (max_items() + batches - 1) / batches;
106+
batch_size.clamp(self.batch_size_limits.start, self.batch_size_limits.end)
107+
}
108+
}

crates/bevy_ecs/src/event.rs

Lines changed: 204 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Event handling types.
22
33
use crate as bevy_ecs;
4+
use crate::batching::BatchingStrategy;
45
use crate::change_detection::MutUntyped;
56
use crate::{
67
change_detection::{DetectChangesMut, Mut},
@@ -30,7 +31,7 @@ pub trait Event: Send + Sync + 'static {}
3031
/// An `EventId` uniquely identifies an event stored in a specific [`World`].
3132
///
3233
/// An `EventId` can among other things be used to trace the flow of an event from the point it was
33-
/// sent to the point it was processed.
34+
/// sent to the point it was processed. `EventId`s increase montonically by send order.
3435
///
3536
/// [`World`]: crate::world::World
3637
pub struct EventId<E: Event> {
@@ -446,6 +447,46 @@ impl<'w, 's, E: Event> EventReader<'w, 's, E> {
446447
self.reader.read_with_id(&self.events)
447448
}
448449

450+
/// Returns a parallel iterator over the events this [`EventReader`] has not seen yet.
451+
/// See also [`for_each`](EventParIter::for_each).
452+
///
453+
/// # Example
454+
/// ```
455+
/// # use bevy_ecs::prelude::*;
456+
/// # use std::sync::atomic::{AtomicUsize, Ordering};
457+
///
458+
/// #[derive(Event)]
459+
/// struct MyEvent {
460+
/// value: usize,
461+
/// }
462+
///
463+
/// #[derive(Resource, Default)]
464+
/// struct Counter(AtomicUsize);
465+
///
466+
/// // setup
467+
/// let mut world = World::new();
468+
/// world.init_resource::<Events<MyEvent>>();
469+
/// world.insert_resource(Counter::default());
470+
///
471+
/// let mut schedule = Schedule::default();
472+
/// schedule.add_systems(|mut events: EventReader<MyEvent>, counter: Res<Counter>| {
473+
/// events.par_read().for_each(|MyEvent { value }| {
474+
/// counter.0.fetch_add(*value, Ordering::Relaxed);
475+
/// });
476+
/// });
477+
/// for value in 0..100 {
478+
/// world.send_event(MyEvent { value });
479+
/// }
480+
/// schedule.run(&mut world);
481+
/// let Counter(counter) = world.remove_resource::<Counter>().unwrap();
482+
/// // all events were processed
483+
/// assert_eq!(counter.into_inner(), 4950);
484+
/// ```
485+
///
486+
pub fn par_read(&mut self) -> EventParIter<'_, E> {
487+
self.reader.par_read(&self.events)
488+
}
489+
449490
/// Determines the number of events available to be read from this [`EventReader`] without consuming any.
450491
pub fn len(&self) -> usize {
451492
self.reader.len(&self.events)
@@ -647,6 +688,11 @@ impl<E: Event> ManualEventReader<E> {
647688
EventIteratorWithId::new(self, events)
648689
}
649690

691+
/// See [`EventReader::par_read`]
692+
pub fn par_read<'a>(&'a mut self, events: &'a Events<E>) -> EventParIter<'a, E> {
693+
EventParIter::new(self, events)
694+
}
695+
650696
/// See [`EventReader::len`]
651697
pub fn len(&self, events: &Events<E>) -> usize {
652698
// The number of events in this reader is the difference between the most recent event
@@ -810,6 +856,135 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
810856
}
811857
}
812858

859+
/// A parallel iterator over `Event`s.
860+
#[derive(Debug)]
861+
pub struct EventParIter<'a, E: Event> {
862+
reader: &'a mut ManualEventReader<E>,
863+
slices: [&'a [EventInstance<E>]; 2],
864+
batching_strategy: BatchingStrategy,
865+
}
866+
867+
impl<'a, E: Event> EventParIter<'a, E> {
868+
/// Creates a new parallel iterator over `events` that have not yet been seen by `reader`.
869+
pub fn new(reader: &'a mut ManualEventReader<E>, events: &'a Events<E>) -> Self {
870+
let a_index = reader
871+
.last_event_count
872+
.saturating_sub(events.events_a.start_event_count);
873+
let b_index = reader
874+
.last_event_count
875+
.saturating_sub(events.events_b.start_event_count);
876+
let a = events.events_a.get(a_index..).unwrap_or_default();
877+
let b = events.events_b.get(b_index..).unwrap_or_default();
878+
879+
let unread_count = a.len() + b.len();
880+
// Ensure `len` is implemented correctly
881+
debug_assert_eq!(unread_count, reader.len(events));
882+
reader.last_event_count = events.event_count - unread_count;
883+
884+
Self {
885+
reader,
886+
slices: [a, b],
887+
batching_strategy: BatchingStrategy::default(),
888+
}
889+
}
890+
891+
/// Changes the batching strategy used when iterating.
892+
///
893+
/// For more information on how this affects the resultant iteration, see
894+
/// [`BatchingStrategy`].
895+
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
896+
self.batching_strategy = strategy;
897+
self
898+
}
899+
900+
/// Runs the provided closure for each unread event in parallel.
901+
///
902+
/// Unlike normal iteration, the event order is not guaranteed in any form.
903+
///
904+
/// # Panics
905+
/// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
906+
/// initialized and run from the ECS scheduler, this should never panic.
907+
///
908+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
909+
pub fn for_each<FN: Fn(&'a E) + Send + Sync + Clone>(self, func: FN) {
910+
self.for_each_with_id(move |e, _| func(e));
911+
}
912+
913+
/// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each),
914+
/// but additionally provides the `EventId` to the closure.
915+
///
916+
/// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order.
917+
///
918+
/// # Panics
919+
/// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
920+
/// initialized and run from the ECS scheduler, this should never panic.
921+
///
922+
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
923+
pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(self, func: FN) {
924+
#[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))]
925+
{
926+
self.into_iter().for_each(|(e, i)| func(e, i));
927+
}
928+
929+
#[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))]
930+
{
931+
let pool = bevy_tasks::ComputeTaskPool::get();
932+
let thread_count = pool.thread_num();
933+
if thread_count <= 1 {
934+
return self.into_iter().for_each(|(e, i)| func(e, i));
935+
}
936+
937+
let batch_size = self
938+
.batching_strategy
939+
.calc_batch_size(|| self.len(), thread_count);
940+
let chunks = self.slices.map(|s| s.chunks_exact(batch_size));
941+
let remainders = chunks.each_ref().map(|c| c.remainder());
942+
943+
pool.scope(|scope| {
944+
for batch in chunks.into_iter().flatten().chain(remainders) {
945+
let func = func.clone();
946+
scope.spawn(async move {
947+
for event in batch {
948+
func(&event.event, event.event_id);
949+
}
950+
});
951+
}
952+
});
953+
}
954+
}
955+
956+
/// Returns the number of [`Event`]s to be iterated.
957+
pub fn len(&self) -> usize {
958+
self.slices.iter().map(|s| s.len()).sum()
959+
}
960+
961+
/// Returns [`true`] if there are no events remaining in this iterator.
962+
pub fn is_empty(&self) -> bool {
963+
self.slices.iter().all(|x| x.is_empty())
964+
}
965+
}
966+
967+
impl<'a, E: Event> IntoIterator for EventParIter<'a, E> {
968+
type IntoIter = EventIteratorWithId<'a, E>;
969+
type Item = <Self::IntoIter as Iterator>::Item;
970+
971+
fn into_iter(self) -> Self::IntoIter {
972+
let EventParIter {
973+
reader,
974+
slices: [a, b],
975+
..
976+
} = self;
977+
let unread = a.len() + b.len();
978+
let chain = a.iter().chain(b);
979+
EventIteratorWithId {
980+
reader,
981+
chain,
982+
unread,
983+
}
984+
}
985+
}
986+
987+
#[doc(hidden)]
813988
struct RegisteredEvent {
814989
component_id: ComponentId,
815990
// Required to flush the secondary buffer and drop events even if left unchanged.
@@ -1326,4 +1501,32 @@ mod tests {
13261501
"Only sent two events; got more than two IDs"
13271502
);
13281503
}
1504+
1505+
#[cfg(feature = "multi-threaded")]
1506+
#[test]
1507+
fn test_events_par_iter() {
1508+
use std::{collections::HashSet, sync::mpsc};
1509+
1510+
use crate::prelude::*;
1511+
1512+
let mut world = World::new();
1513+
world.init_resource::<Events<TestEvent>>();
1514+
for i in 0..100 {
1515+
world.send_event(TestEvent { i });
1516+
}
1517+
1518+
let mut schedule = Schedule::default();
1519+
1520+
schedule.add_systems(|mut events: EventReader<TestEvent>| {
1521+
let (tx, rx) = mpsc::channel();
1522+
events.par_read().for_each(|event| {
1523+
tx.send(event.i).unwrap();
1524+
});
1525+
drop(tx);
1526+
1527+
let observed: HashSet<_> = rx.into_iter().collect();
1528+
assert_eq!(observed, HashSet::from_iter(0..100));
1529+
});
1530+
schedule.run(&mut world);
1531+
}
13291532
}

crates/bevy_ecs/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
compile_error!("bevy_ecs cannot safely compile for a 16-bit platform.");
1313

1414
pub mod archetype;
15+
pub mod batching;
1516
pub mod bundle;
1617
pub mod change_detection;
1718
pub mod component;

0 commit comments

Comments
 (0)