Skip to content

merge_batcher: use Rust iterators and VecDeque in place of VecQueue #380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 20 additions & 76 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! A general purpose `Batcher` implementation based on radix sort.

use std::collections::VecDeque;

use timely::communication::message::RefOrMut;
use timely::progress::frontier::Antichain;

Expand Down Expand Up @@ -120,60 +122,6 @@ where
}


use std::slice::{from_raw_parts};

pub struct VecQueue<T> {
list: Vec<T>,
head: usize,
tail: usize,
}

impl<T> VecQueue<T> {
#[inline]
pub fn new() -> Self { VecQueue::from(Vec::new()) }
#[inline]
pub fn pop(&mut self) -> T {
debug_assert!(self.head < self.tail);
self.head += 1;
unsafe { ::std::ptr::read(self.list.as_mut_ptr().offset((self.head as isize) - 1)) }
}
#[inline]
pub fn peek(&self) -> &T {
debug_assert!(self.head < self.tail);
unsafe { self.list.get_unchecked(self.head) }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This original code is UB since it is using get_unchecked to index out of bounds of the vector's contents. The vector this is indexing into has length 0 due to the set_len below.

Since rust-lang/rust#116915 (Rust 1.76+, nightly-2023-12-05+), this UB is going to cause pretty much anything calling this code in differential-dataflow 0.12.0 to miscompile. I found this by way of a miscompile of the cargo-tally crate.

Copy link
Member

@frankmcsherry frankmcsherry Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the subtext here that it would help to ship 0.13.0? I think 0.12.0 is multiple years old at this point, and perhaps it's worth a new, utterly incompatible version that has less unsafe in it? (I/we mostly use the repo head rather than anything crates.io; you might be one of the few who do).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not blocked on an official 0.13.0 being cut since I grabbed the code from master yesterday and published it to https://crates.io/crates/differential-dataflow-master myself. Cargo-tally can use it like this: dtolnay/cargo-tally@24a1946. For my purposes, this is completely fine — it lets you folks keep developing against head and lets me use snapshots of head in my crate whenever I like.

If you do 0.13.0 it would mostly be for the benefit of anyone else using differential-dataflow from crates.io. Right now 0.12.0 is effectively unusable, I think, and that was worth calling your attention to (also because the PR description says "all uses of the API were correct" and did not identify that this code is really problematic). But it does not need to translate into urgency to publish 0.13.0.

I had originally looked into whether backporting this single fix onto 0.12.0 as a 0.12.1 release would fix the miscompiles. I first confirmed that the commit in which this PR merged (6ae61ad) does not have the miscompile whereas its parent commit (cc88469) still does, so this is definitely the relevant fix. I backported it onto 7bc5338 (the v0.12.0 tag) as dtolnay-contrib@46edd1f. However that still miscompiled. There must be some other invalid unsafe code elsewhere that got removed earlier than this PR — you mentioned overall there is less unsafe now compared to 3 years ago. I figured finding all the rest of what would need to be backported was not worthwhile so this is when I switched to the approach of publishing the code from master.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We merged #413 recently, which also removes some unsafe code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also because the PR description says "all uses of the API were correct" and did not identify that this code is really problematic

Erm, it seems less problematic to be totally honest; certainly at the time. Afaict it matches the PPYP pattern, and the main thing that has changed is what Rust views as UB (to be fair, afaiu little enough is defined in the first place). The Rust 1.76 change certainly seems to mess up the PPYP pattern; is there something else that Rust folks recommend instead now?

More generally, any recommendations for where to follow and perhaps litigate that certain "optimizations" around unsafe are potentially harmful? It's a recurring anxiety for anyone who wants solid performance, that Rust might just eventually deem any use of unsafe UB and misoptimize it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There must be some other invalid unsafe code elsewhere that got removed earlier than this PR

Yeah, pretty much all use of unsafe is invalid, so this checks out. :D I think there are two uses of unsafe at the moment to work around LLVM not optimizing split_at_mut very well (in consolidation.rs). Everything else has been removed.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm here because I was sifting through backlinks)

the main thing that has changed is what Rust views as UB

Out-of-bounds indexing via get_unchecked has been documented to be UB since Rust 1.38.0, which was published in late November 2019. Not that I really expect anyone to be aware that documentation has been updated, but there was no recent change here.

The UB here has also been reliably detected by Miri for a long time, and if it is not possible to run Miri, the standard library has robust debug assertions that you can exercise today with cargo-careful or with -Zbuild-std. Some of those checks should soon be on by default without such tools.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up! Unfortunately the referenced code predates 1.38, back when the method had no documented UB. =/

Fwiw, we've run Miri a fair amount, in particular to find a miscompilation induced by another crate, and it did not detect anything. I don't know if that is surprising.

Copy link

@saethlin saethlin Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it did not detect anything. I don't know if that is surprising.

Unfortunately, it's not. In the past, out-of-bounds slice::get_unchecked was detected almost by accident by Stacked Borrows, so if you pass -Zmiri-disable-stacked-borrows or -Zmiri-tree-borrows, as I suspect some number of codebases need to in order to execute much of anything, it's wasn't checked at all. Miri understands the optimization hint that landed in 1.76 though. So this isn't a problem anymore.

The other problem is just the nature of dynamic checkers. If well-formed/non-malicious inputs never cause indexing out of bounds... well you'll never get a report from Miri. I've been meaning to write a helper to run Miri on a minimized fuzzing corpus to assist with this scenario. Still not a soundness proof though.

}
#[inline]
pub fn _peek_tail(&self) -> &T {
debug_assert!(self.head < self.tail);
unsafe { self.list.get_unchecked(self.tail-1) }
}
#[inline]
pub fn _slice(&self) -> &[T] {
debug_assert!(self.head < self.tail);
unsafe { from_raw_parts(self.list.get_unchecked(self.head), self.tail - self.head) }
}
#[inline]
pub fn from(mut list: Vec<T>) -> Self {
let tail = list.len();
unsafe { list.set_len(0); }
VecQueue {
list: list,
head: 0,
tail: tail,
}
}
// could leak, if self.head != self.tail.
#[inline]
pub fn done(self) -> Vec<T> {
debug_assert!(self.head == self.tail);
self.list
}
#[inline]
pub fn len(&self) -> usize { self.tail - self.head }
#[inline]
pub fn is_empty(&self) -> bool { self.head == self.tail }
}

#[inline]
unsafe fn push_unchecked<T>(vec: &mut Vec<T>, element: T) {
debug_assert!(vec.len() < vec.capacity());
Expand Down Expand Up @@ -277,28 +225,28 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
let mut output = Vec::with_capacity(list1.len() + list2.len());
let mut result = self.empty();

let mut list1 = VecQueue::from(list1);
let mut list2 = VecQueue::from(list2);
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();

let mut head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() };
let mut head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() };
let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
let mut head2 = VecDeque::from(list2.next().unwrap_or_default());

// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {

while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 {

let cmp = {
let x = head1.peek();
let y = head2.peek();
let x = head1.front().unwrap();
let y = head2.front().unwrap();
(&x.0, &x.1).cmp(&(&y.0, &y.1))
};
match cmp {
Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop()); } }
Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop()); } }
Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop_front().unwrap()); } }
Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop_front().unwrap()); } }
Ordering::Equal => {
let (data1, time1, mut diff1) = head1.pop();
let (_data2, _time2, diff2) = head2.pop();
let (data1, time1, mut diff1) = head1.pop_front().unwrap();
let (_data2, _time2, diff2) = head2.pop_front().unwrap();
diff1.plus_equals(&diff2);
if !diff1.is_zero() {
unsafe { push_unchecked(&mut result, (data1, time1, diff1)); }
Expand All @@ -313,14 +261,14 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
}

if head1.is_empty() {
let done1 = head1.done();
let done1 = Vec::from(head1);
if done1.capacity() == Self::buffer_size() { self.stash.push(done1); }
head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() };
head1 = VecDeque::from(list1.next().unwrap_or_default());
}
if head2.is_empty() {
let done2 = head2.done();
let done2 = Vec::from(head2);
if done2.capacity() == Self::buffer_size() { self.stash.push(done2); }
head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() };
head2 = VecDeque::from(list2.next().unwrap_or_default());
}
}

Expand All @@ -329,21 +277,17 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {

if !head1.is_empty() {
let mut result = self.empty();
for _ in 0 .. head1.len() { result.push(head1.pop()); }
for item1 in head1 { result.push(item1); }
output.push(result);
}
while !list1.is_empty() {
output.push(list1.pop());
}
output.extend(list1);

if !head2.is_empty() {
let mut result = self.empty();
for _ in 0 .. head2.len() { result.push(head2.pop()); }
for item2 in head2 { result.push(item2); }
output.push(result);
}
while !list2.is_empty() {
output.push(list2.pop());
}
output.extend(list2);

output
}
Expand Down