Skip to content

Commit 46edd1f

Browse files
petrosaggdtolnay
authored andcommitted
merge_batcher: use Rust iterators and VecDeque in place of VecQueue (TimelyDataflow#380)
The standard library offers `VecDeque` which also has cheap (i.e non allocating) conversions to and from `Vec`s so we can use that directly and reduce the number of unsafe calls in the project. Note that while all uses of the API of `VecQueue` were correct its methods were marked as safe but you could accidentally cause undefined behavior: ```rust let queue = VecQueue::new(); queue.pop(); // UB, length isn't checked ``` Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 7bc5338 commit 46edd1f

File tree

2 files changed

+22
-78
lines changed

2 files changed

+22
-78
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ serde = "1.0"
2929
serde_derive = "1.0"
3030
abomonation = "0.7"
3131
abomonation_derive = "0.5"
32-
#timely = { version = "0.12", default-features = false }
33-
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
32+
timely = { version = "0.12", default-features = false }
33+
#timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
3434
#timely = { path = "../timely-dataflow/timely/", default-features = false }
3535
fnv="1.0.2"
3636

src/trace/implementations/merge_batcher.rs

Lines changed: 20 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! A general purpose `Batcher` implementation based on radix sort.
22
3+
use std::collections::VecDeque;
4+
35
use timely::progress::frontier::Antichain;
46

57
use ::difference::Semigroup;
@@ -109,60 +111,6 @@ where
109111
}
110112

111113

112-
use std::slice::{from_raw_parts};
113-
114-
pub struct VecQueue<T> {
115-
list: Vec<T>,
116-
head: usize,
117-
tail: usize,
118-
}
119-
120-
impl<T> VecQueue<T> {
121-
#[inline]
122-
pub fn new() -> Self { VecQueue::from(Vec::new()) }
123-
#[inline]
124-
pub fn pop(&mut self) -> T {
125-
debug_assert!(self.head < self.tail);
126-
self.head += 1;
127-
unsafe { ::std::ptr::read(self.list.as_mut_ptr().offset((self.head as isize) - 1)) }
128-
}
129-
#[inline]
130-
pub fn peek(&self) -> &T {
131-
debug_assert!(self.head < self.tail);
132-
unsafe { self.list.get_unchecked(self.head) }
133-
}
134-
#[inline]
135-
pub fn _peek_tail(&self) -> &T {
136-
debug_assert!(self.head < self.tail);
137-
unsafe { self.list.get_unchecked(self.tail-1) }
138-
}
139-
#[inline]
140-
pub fn _slice(&self) -> &[T] {
141-
debug_assert!(self.head < self.tail);
142-
unsafe { from_raw_parts(self.list.get_unchecked(self.head), self.tail - self.head) }
143-
}
144-
#[inline]
145-
pub fn from(mut list: Vec<T>) -> Self {
146-
let tail = list.len();
147-
unsafe { list.set_len(0); }
148-
VecQueue {
149-
list: list,
150-
head: 0,
151-
tail: tail,
152-
}
153-
}
154-
// could leak, if self.head != self.tail.
155-
#[inline]
156-
pub fn done(self) -> Vec<T> {
157-
debug_assert!(self.head == self.tail);
158-
self.list
159-
}
160-
#[inline]
161-
pub fn len(&self) -> usize { self.tail - self.head }
162-
#[inline]
163-
pub fn is_empty(&self) -> bool { self.head == self.tail }
164-
}
165-
166114
#[inline]
167115
unsafe fn push_unchecked<T>(vec: &mut Vec<T>, element: T) {
168116
debug_assert!(vec.len() < vec.capacity());
@@ -253,28 +201,28 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
253201
let mut output = Vec::with_capacity(list1.len() + list2.len());
254202
let mut result = self.stash.pop().unwrap_or_else(|| Vec::with_capacity(1024));
255203

256-
let mut list1 = VecQueue::from(list1);
257-
let mut list2 = VecQueue::from(list2);
204+
let mut list1 = list1.into_iter();
205+
let mut list2 = list2.into_iter();
258206

259-
let mut head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() };
260-
let mut head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() };
207+
let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
208+
let mut head2 = VecDeque::from(list2.next().unwrap_or_default());
261209

262210
// while we have valid data in each input, merge.
263211
while !head1.is_empty() && !head2.is_empty() {
264212

265213
while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 {
266214

267215
let cmp = {
268-
let x = head1.peek();
269-
let y = head2.peek();
216+
let x = head1.front().unwrap();
217+
let y = head2.front().unwrap();
270218
(&x.0, &x.1).cmp(&(&y.0, &y.1))
271219
};
272220
match cmp {
273-
Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop()); } }
274-
Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop()); } }
221+
Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop_front().unwrap()); } }
222+
Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop_front().unwrap()); } }
275223
Ordering::Equal => {
276-
let (data1, time1, mut diff1) = head1.pop();
277-
let (_data2, _time2, diff2) = head2.pop();
224+
let (data1, time1, mut diff1) = head1.pop_front().unwrap();
225+
let (_data2, _time2, diff2) = head2.pop_front().unwrap();
278226
diff1 += &diff2;
279227
if !diff1.is_zero() {
280228
unsafe { push_unchecked(&mut result, (data1, time1, diff1)); }
@@ -289,14 +237,14 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
289237
}
290238

291239
if head1.is_empty() {
292-
let done1 = head1.done();
240+
let done1 = Vec::from(head1);
293241
if done1.capacity() == 1024 { self.stash.push(done1); }
294-
head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() };
242+
head1 = VecDeque::from(list1.next().unwrap_or_default());
295243
}
296244
if head2.is_empty() {
297-
let done2 = head2.done();
245+
let done2 = Vec::from(head2);
298246
if done2.capacity() == 1024 { self.stash.push(done2); }
299-
head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() };
247+
head2 = VecDeque::from(list2.next().unwrap_or_default());
300248
}
301249
}
302250

@@ -305,21 +253,17 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
305253

306254
if !head1.is_empty() {
307255
let mut result = self.stash.pop().unwrap_or_else(|| Vec::with_capacity(1024));
308-
for _ in 0 .. head1.len() { result.push(head1.pop()); }
256+
for item1 in head1 { result.push(item1); }
309257
output.push(result);
310258
}
311-
while !list1.is_empty() {
312-
output.push(list1.pop());
313-
}
259+
output.extend(list1);
314260

315261
if !head2.is_empty() {
316262
let mut result = self.stash.pop().unwrap_or_else(|| Vec::with_capacity(1024));
317-
for _ in 0 .. head2.len() { result.push(head2.pop()); }
263+
for item2 in head2 { result.push(item2); }
318264
output.push(result);
319265
}
320-
while !list2.is_empty() {
321-
output.push(list2.pop());
322-
}
266+
output.extend(list2);
323267

324268
output
325269
}

0 commit comments

Comments
 (0)