Skip to content

Commit 3d625d2

Browse files
committed
reduce: reuse the output buffer allocation
This patch brings back the benefits of re-using the output buffer allocation at the cost of some unsafe code reasoning. Since the buffer is always cleared after each use we can simply store its type erased pointer and temporarily bring it back to its Vec form whenever there is some processing to do. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent f0dc750 commit 3d625d2

File tree

1 file changed

+47
-4
lines changed

1 file changed

+47
-4
lines changed

src/operators/reduce.rs

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,7 @@ where
683683
/// Implementation based on replaying historical and new updates together.
684684
mod history_replay {
685685

686+
use std::mem::ManuallyDrop;
686687
use ::difference::Semigroup;
687688
use lattice::Lattice;
688689
use trace::Cursor;
@@ -691,6 +692,13 @@ mod history_replay {
691692

692693
use super::{PerKeyCompute, sort_dedup};
693694

695+
/// Clears and type erases a vector
696+
fn vec_to_parts<T>(v: Vec<T>) -> (*mut (), usize) {
697+
let mut v = ManuallyDrop::new(v);
698+
v.clear();
699+
(v.as_mut_ptr() as *mut (), v.capacity())
700+
}
701+
694702
/// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
695703
/// time order, maintaining consolidated representations of updates with respect to future interesting times.
696704
pub struct HistoryReplayer<'a, V1, V2, T, R1, R2>
@@ -705,7 +713,11 @@ mod history_replay {
705713
input_history: ValueHistory<'a, V1, T, R1>,
706714
output_history: ValueHistory<'a, V2, T, R2>,
707715
input_buffer: Vec<(&'a V1, R1)>,
708-
output_buffer_capacity: usize,
716+
// A type erased pointer and capacity for the temporary output buffer passed to `logic`.
717+
// During `compute` the vector contains references to self which will get invalid as soon
718+
// as compute returns. For this reason the temporary vector is always cleared before
719+
// decomposing it back into its type erased parts
720+
output_buffer_parts: (*mut (), usize),
709721
update_buffer: Vec<(V2, R2)>,
710722
output_produced: Vec<((V2, T), R2)>,
711723
synth_times: Vec<T>,
@@ -728,7 +740,7 @@ mod history_replay {
728740
input_history: ValueHistory::new(),
729741
output_history: ValueHistory::new(),
730742
input_buffer: Vec::new(),
731-
output_buffer_capacity: 0,
743+
output_buffer_parts: vec_to_parts(Vec::<(&V2, R2)>::new()),
732744
update_buffer: Vec::new(),
733745
output_produced: Vec::new(),
734746
synth_times: Vec::new(),
@@ -913,7 +925,16 @@ mod history_replay {
913925
}
914926
crate::consolidation::consolidate(&mut self.input_buffer);
915927

916-
let mut output_buffer = Vec::with_capacity(self.output_buffer_capacity);
928+
let (ptr, cap) = self.output_buffer_parts;
929+
// SAFETY:
930+
// * `ptr` is valid because is has been previously allocated by a Vec
931+
// constructor parameterized with the same type argument
932+
// * `len` and `cap` are valid because the vector is converted to parts
933+
// only through `vec_to_parts` which clears the vector and gets its
934+
// capacity
935+
let mut output_buffer = unsafe {
936+
Vec::from_raw_parts(ptr as *mut (&V2, R2), 0, cap)
937+
};
917938
meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet));
918939
for &((value, ref time), ref diff) in output_replay.buffer().iter() {
919940
if time.less_equal(&next_time) {
@@ -937,8 +958,8 @@ mod history_replay {
937958
if self.input_buffer.len() > 0 || output_buffer.len() > 0 {
938959
logic(key, &self.input_buffer[..], &mut output_buffer, &mut self.update_buffer);
939960
self.input_buffer.clear();
940-
self.output_buffer_capacity = output_buffer.capacity();
941961
}
962+
self.output_buffer_parts = vec_to_parts(output_buffer);
942963

943964
// output_replay.advance_buffer_by(&meet);
944965
// for &((ref value, ref time), diff) in output_replay.buffer().iter() {
@@ -1077,6 +1098,28 @@ mod history_replay {
10771098
}
10781099
}
10791100

1101+
impl<'a, V1, V2, T, R1, R2> Drop for HistoryReplayer<'a, V1, V2, T, R1, R2>
1102+
where
1103+
V1: Ord+Clone+'a,
1104+
V2: Ord+Clone+'a,
1105+
T: Lattice+Ord+Clone,
1106+
R1: Semigroup,
1107+
R2: Semigroup,
1108+
{
1109+
fn drop(&mut self) {
1110+
let (ptr, cap) = self.output_buffer_parts;
1111+
// SAFETY:
1112+
// * `ptr` is valid because is has been previously allocated by a Vec
1113+
// constructor parameterized with the same type argument
1114+
// * `len` and `cap` are valid because the vector is converted to parts
1115+
// only through `vec_to_parts` which clears the vector and gets its
1116+
// capacity
1117+
unsafe {
1118+
Vec::from_raw_parts(ptr as *mut (&V2, R2), 0, cap);
1119+
}
1120+
}
1121+
}
1122+
10801123
/// Updates an optional meet by an optional time.
10811124
fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
10821125
if let Some(time) = other {

0 commit comments

Comments
 (0)