Skip to content

Commit 61d3c9b

Browse files
committed
reduce: reuse the output buffer allocation
This patch brings back the benefits of re-using the output buffer allocation at the cost of some unsafe code reasoning. Since the buffer is always cleared after each use we can simply store its type erased pointer and temporarily bring it back to its Vec form whenever there is some processing to do. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 4be0b44 commit 61d3c9b

File tree

1 file changed

+25
-2
lines changed

1 file changed

+25
-2
lines changed

src/operators/reduce.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,7 @@ where
683683
/// Implementation based on replaying historical and new updates together.
684684
mod history_replay {
685685

686+
use std::mem::ManuallyDrop;
686687
use ::difference::Semigroup;
687688
use lattice::Lattice;
688689
use trace::Cursor;
@@ -691,6 +692,13 @@ mod history_replay {
691692

692693
use super::{PerKeyCompute, sort_dedup};
693694

695+
/// Clears and type erases a vector
696+
fn vec_to_parts<T>(v: Vec<T>) -> (*mut (), usize) {
697+
let mut v = ManuallyDrop::new(v);
698+
v.clear();
699+
(v.as_mut_ptr() as *mut (), v.capacity())
700+
}
701+
694702
/// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
695703
/// time order, maintaining consolidated representations of updates with respect to future interesting times.
696704
pub struct HistoryReplayer<'a, V1, V2, T, R1, R2>
@@ -705,6 +713,11 @@ mod history_replay {
705713
input_history: ValueHistory<'a, V1, T, R1>,
706714
output_history: ValueHistory<'a, V2, T, R2>,
707715
input_buffer: Vec<(&'a V1, R1)>,
716+
// A type erased pointer and capacity for the temporary output buffer passed to `logic`.
717+
// During `compute` the vector contains references to self which will get invalid as soon
718+
// as compute returns. For this reason the temporary vector is always cleared before
719+
// decomposing it back into its type erased parts
720+
output_buffer_parts: (*mut (), usize),
708721
update_buffer: Vec<(V2, R2)>,
709722
output_produced: Vec<((V2, T), R2)>,
710723
synth_times: Vec<T>,
@@ -727,6 +740,7 @@ mod history_replay {
727740
input_history: ValueHistory::new(),
728741
output_history: ValueHistory::new(),
729742
input_buffer: Vec::new(),
743+
output_buffer_parts: vec_to_parts(Vec::<(&V2, R2)>::new()),
730744
update_buffer: Vec::new(),
731745
output_produced: Vec::new(),
732746
synth_times: Vec::new(),
@@ -911,7 +925,16 @@ mod history_replay {
911925
}
912926
crate::consolidation::consolidate(&mut self.input_buffer);
913927

914-
let mut output_buffer: Vec<(&V2, R2)> = Vec::new();
928+
let (ptr, cap) = self.output_buffer_parts;
929+
// SAFETY:
930+
// * `ptr` is valid because is has been previously allocated by a Vec
931+
// constructor parameterized with the same type argument
932+
// * `len` and `cap` are valid because the vector is converted to parts
933+
// only through `vec_to_parts` which clears the vector and gets its
934+
// capacity
935+
let mut output_buffer = unsafe {
936+
Vec::from_raw_parts(ptr as *mut (&V2, R2), 0, cap)
937+
};
915938
meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet));
916939
for &((ref value, ref time), ref diff) in output_replay.buffer().iter() {
917940
if time.less_equal(&next_time) {
@@ -936,7 +959,7 @@ mod history_replay {
936959
logic(key, &self.input_buffer[..], &mut output_buffer, &mut self.update_buffer);
937960
self.input_buffer.clear();
938961
}
939-
drop(output_buffer);
962+
self.output_buffer_parts = vec_to_parts(output_buffer);
940963

941964
// output_replay.advance_buffer_by(&meet);
942965
// for &((ref value, ref time), diff) in output_replay.buffer().iter() {

0 commit comments

Comments
 (0)