From 032a4e3d2e82b37b217eb6b50c7bf50c3175bc9a Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Thu, 23 Feb 2023 17:20:31 +0100 Subject: [PATCH] merge_batcher: use Rust iterators and VecDeque in place of VecQueue The standard library offers `VecDeque` which also has cheap (i.e non allocating) conversions to and from `Vec`s so we can use that directly and reduce the number of unsafe calls in the project. Note that while all uses of the API of `VecQueue` were correct its methods were marked as safe but you could accidentally cause undefined behavior: ```rust let queue = VecQueue::new(); queue.pop(); // UB, length isn't checked ``` Signed-off-by: Petros Angelatos --- src/trace/implementations/merge_batcher.rs | 96 +++++----------------- 1 file changed, 20 insertions(+), 76 deletions(-) diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 83f4f63f9..f0d96beff 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,5 +1,7 @@ //! A general purpose `Batcher` implementation based on radix sort. +use std::collections::VecDeque; + use timely::communication::message::RefOrMut; use timely::progress::frontier::Antichain; @@ -120,60 +122,6 @@ where } -use std::slice::{from_raw_parts}; - -pub struct VecQueue { - list: Vec, - head: usize, - tail: usize, -} - -impl VecQueue { - #[inline] - pub fn new() -> Self { VecQueue::from(Vec::new()) } - #[inline] - pub fn pop(&mut self) -> T { - debug_assert!(self.head < self.tail); - self.head += 1; - unsafe { ::std::ptr::read(self.list.as_mut_ptr().offset((self.head as isize) - 1)) } - } - #[inline] - pub fn peek(&self) -> &T { - debug_assert!(self.head < self.tail); - unsafe { self.list.get_unchecked(self.head) } - } - #[inline] - pub fn _peek_tail(&self) -> &T { - debug_assert!(self.head < self.tail); - unsafe { self.list.get_unchecked(self.tail-1) } - } - #[inline] - pub fn _slice(&self) -> &[T] { - debug_assert!(self.head < self.tail); - unsafe { from_raw_parts(self.list.get_unchecked(self.head), self.tail - self.head) } - } - #[inline] - pub fn from(mut list: Vec) -> Self { - let tail = list.len(); - unsafe { list.set_len(0); } - VecQueue { - list: list, - head: 0, - tail: tail, - } - } - // could leak, if self.head != self.tail. - #[inline] - pub fn done(self) -> Vec { - debug_assert!(self.head == self.tail); - self.list - } - #[inline] - pub fn len(&self) -> usize { self.tail - self.head } - #[inline] - pub fn is_empty(&self) -> bool { self.head == self.tail } -} - #[inline] unsafe fn push_unchecked(vec: &mut Vec, element: T) { debug_assert!(vec.len() < vec.capacity()); @@ -277,11 +225,11 @@ impl MergeSorter { let mut output = Vec::with_capacity(list1.len() + list2.len()); let mut result = self.empty(); - let mut list1 = VecQueue::from(list1); - let mut list2 = VecQueue::from(list2); + let mut list1 = list1.into_iter(); + let mut list2 = list2.into_iter(); - let mut head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() }; - let mut head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() }; + let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); + let mut head2 = VecDeque::from(list2.next().unwrap_or_default()); // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { @@ -289,16 +237,16 @@ impl MergeSorter { while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 { let cmp = { - let x = head1.peek(); - let y = head2.peek(); + let x = head1.front().unwrap(); + let y = head2.front().unwrap(); (&x.0, &x.1).cmp(&(&y.0, &y.1)) }; match cmp { - Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop()); } } - Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop()); } } + Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop_front().unwrap()); } } + Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop_front().unwrap()); } } Ordering::Equal => { - let (data1, time1, mut diff1) = head1.pop(); - let (_data2, _time2, diff2) = head2.pop(); + let (data1, time1, mut diff1) = head1.pop_front().unwrap(); + let (_data2, _time2, diff2) = head2.pop_front().unwrap(); diff1.plus_equals(&diff2); if !diff1.is_zero() { unsafe { push_unchecked(&mut result, (data1, time1, diff1)); } @@ -313,14 +261,14 @@ impl MergeSorter { } if head1.is_empty() { - let done1 = head1.done(); + let done1 = Vec::from(head1); if done1.capacity() == Self::buffer_size() { self.stash.push(done1); } - head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() }; + head1 = VecDeque::from(list1.next().unwrap_or_default()); } if head2.is_empty() { - let done2 = head2.done(); + let done2 = Vec::from(head2); if done2.capacity() == Self::buffer_size() { self.stash.push(done2); } - head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() }; + head2 = VecDeque::from(list2.next().unwrap_or_default()); } } @@ -329,21 +277,17 @@ impl MergeSorter { if !head1.is_empty() { let mut result = self.empty(); - for _ in 0 .. head1.len() { result.push(head1.pop()); } + for item1 in head1 { result.push(item1); } output.push(result); } - while !list1.is_empty() { - output.push(list1.pop()); - } + output.extend(list1); if !head2.is_empty() { let mut result = self.empty(); - for _ in 0 .. head2.len() { result.push(head2.pop()); } + for item2 in head2 { result.push(item2); } output.push(result); } - while !list2.is_empty() { - output.push(list2.pop()); - } + output.extend(list2); output }