Skip to content

Commit d15bf32

Browse files
Remove BatchContainer::borrow_as() (#628)
* BatchContainer::push_own takes a reference to owned data * Add BatchContainer::clear() * Remove use of borrow_as() * Remove BatchContainer::borrow_as()
1 parent 31e1c69 commit d15bf32

File tree

9 files changed

+123
-104
lines changed

9 files changed

+123
-104
lines changed

differential-dataflow/examples/columnar.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -579,8 +579,6 @@ pub mod dd_builder {
579579

580580
use columnar::Columnar;
581581

582-
use timely::container::PushInto;
583-
584582
use differential_dataflow::trace::Builder;
585583
use differential_dataflow::trace::Description;
586584
use differential_dataflow::trace::implementations::Layout;
@@ -614,9 +612,6 @@ pub mod dd_builder {
614612
layout::Val<L>: Columnar,
615613
layout::Time<L>: Columnar,
616614
layout::Diff<L>: Columnar,
617-
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
618-
for<'a> L::KeyContainer: PushInto<&'a layout::Key<L>>,
619-
for<'a> L::ValContainer: PushInto<&'a layout::Val<L>>,
620615
{
621616
type Input = Column<((layout::Key<L>,layout::Val<L>),layout::Time<L>,layout::Diff<L>)>;
622617
type Time = layout::Time<L>;
@@ -643,6 +638,9 @@ pub mod dd_builder {
643638
// Owned key and val would need to be members of `self`, as this method can be called multiple times,
644639
// and we need to correctly cache last for reasons of correctness, not just performance.
645640

641+
let mut key_con = L::KeyContainer::with_capacity(1);
642+
let mut val_con = L::ValContainer::with_capacity(1);
643+
646644
for ((key,val),time,diff) in chunk.drain() {
647645
// It would be great to avoid.
648646
let key = <layout::Key<L> as Columnar>::into_owned(key);
@@ -651,30 +649,33 @@ pub mod dd_builder {
651649
let time = <layout::Time<L> as Columnar>::into_owned(time);
652650
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);
653651

652+
key_con.clear(); key_con.push_own(&key);
653+
val_con.clear(); val_con.push_own(&val);
654+
654655
// Pre-load the first update.
655656
if self.result.keys.is_empty() {
656-
self.result.vals.vals.push_into(&val);
657-
self.result.keys.push_into(&key);
657+
self.result.vals.vals.push_own(&val);
658+
self.result.keys.push_own(&key);
658659
self.staging.push(time, diff);
659660
}
660661
// Perhaps this is a continuation of an already received key.
661-
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
662+
else if self.result.keys.last() == key_con.get(0) {
662663
// Perhaps this is a continuation of an already received value.
663-
if self.result.vals.vals.last().map(|v| L::ValContainer::borrow_as(&val).eq(&v)).unwrap_or(false) {
664+
if self.result.vals.vals.last() == val_con.get(0) {
664665
self.staging.push(time, diff);
665666
} else {
666667
// New value; complete representation of prior value.
667668
self.staging.seal(&mut self.result.upds);
668669
self.staging.push(time, diff);
669-
self.result.vals.vals.push_into(&val);
670+
self.result.vals.vals.push_own(&val);
670671
}
671672
} else {
672673
// New key; complete representation of prior key.
673674
self.staging.seal(&mut self.result.upds);
674675
self.staging.push(time, diff);
675676
self.result.vals.offs.push_ref(self.result.vals.len());
676-
self.result.vals.vals.push_into(&val);
677-
self.result.keys.push_into(&key);
677+
self.result.vals.vals.push_own(&val);
678+
self.result.keys.push_own(&key);
678679
}
679680
}
680681
}
@@ -719,9 +720,6 @@ pub mod dd_builder {
719720
layout::Val<L>: Columnar,
720721
layout::Time<L>: Columnar,
721722
layout::Diff<L>: Columnar,
722-
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
723-
for<'a> L::KeyContainer: PushInto<&'a layout::Key<L>>,
724-
for<'a> L::ValContainer: PushInto<&'a layout::Val<L>>,
725723
{
726724
type Input = Column<((layout::Key<L>,layout::Val<L>),layout::Time<L>,layout::Diff<L>)>;
727725
type Time = layout::Time<L>;
@@ -747,26 +745,30 @@ pub mod dd_builder {
747745
// Owned key and val would need to be members of `self`, as this method can be called multiple times,
748746
// and we need to correctly cache last for reasons of correctness, not just performance.
749747

748+
let mut key_con = L::KeyContainer::with_capacity(1);
749+
750750
for ((key,_val),time,diff) in chunk.drain() {
751751
// It would be great to avoid.
752752
let key = <layout::Key<L> as Columnar>::into_owned(key);
753753
// These feel fine (wrt the other versions)
754754
let time = <layout::Time<L> as Columnar>::into_owned(time);
755755
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);
756756

757+
key_con.clear(); key_con.push_own(&key);
758+
757759
// Pre-load the first update.
758760
if self.result.keys.is_empty() {
759-
self.result.keys.push_into(&key);
761+
self.result.keys.push_own(&key);
760762
self.staging.push(time, diff);
761763
}
762764
// Perhaps this is a continuation of an already received key.
763-
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
765+
else if self.result.keys.last() == key_con.get(0) {
764766
self.staging.push(time, diff);
765767
} else {
766768
// New key; complete representation of prior key.
767769
self.staging.seal(&mut self.result.upds);
768770
self.staging.push(time, diff);
769-
self.result.keys.push_into(&key);
771+
self.result.keys.push_own(&key);
770772
}
771773
}
772774
}

differential-dataflow/src/operators/arrange/upsert.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,17 @@ where
233233
// new stuff that we add.
234234
let (mut trace_cursor, trace_storage) = reader_local.cursor();
235235
let mut builder = Bu::new();
236+
let mut key_con = Tr::KeyContainer::with_capacity(1);
236237
for (key, mut list) in to_process {
237238

239+
key_con.clear(); key_con.push_own(&key);
240+
238241
// The prior value associated with the key.
239242
let mut prev_value: Option<Tr::ValOwn> = None;
240243

241244
// Attempt to find the key in the trace.
242-
trace_cursor.seek_key(&trace_storage, Tr::KeyContainer::borrow_as(&key));
243-
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&Tr::KeyContainer::borrow_as(&key))).unwrap_or(false) {
245+
trace_cursor.seek_key(&trace_storage, key_con.index(0));
246+
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&key_con.index(0))).unwrap_or(false) {
244247
// Determine the prior value associated with the key.
245248
while let Some(val) = trace_cursor.get_val(&trace_storage) {
246249
let mut count = 0;

differential-dataflow/src/operators/reduce.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -431,11 +431,16 @@ where
431431
// We first extract those times from this list that lie in the interval we will process.
432432
sort_dedup(&mut interesting);
433433
// `exposed` contains interesting (key, time)s now below `upper_limit`
434-
let exposed = {
435-
let (exposed, new_interesting) = interesting.drain(..).partition(|(_, time)| !upper_limit.less_equal(time));
436-
interesting = new_interesting;
437-
exposed
438-
};
434+
let mut exposed_keys = T1::KeyContainer::with_capacity(0);
435+
let mut exposed_time = T1::TimeContainer::with_capacity(0);
436+
// Keep pairs greater or equal to `upper_limit`, and "expose" other pairs.
437+
interesting.retain(|(key, time)| {
438+
if upper_limit.less_equal(time) { true } else {
439+
exposed_keys.push_own(key);
440+
exposed_time.push_own(time);
441+
false
442+
}
443+
});
439444

440445
// Prepare an output buffer and builder for each capability.
441446
//
@@ -471,12 +476,10 @@ where
471476
// indicates whether more data remain. We move through `exposed` using (index) `exposed_position`.
472477
// There could perhaps be a less provocative variable name.
473478
let mut exposed_position = 0;
474-
while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() {
475-
476-
use std::borrow::Borrow;
479+
while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() {
477480

478481
// Determine the next key we will work on; could be synthetic, could be from a batch.
479-
let key1 = exposed.get(exposed_position).map(|x| T1::KeyContainer::borrow_as(&x.0));
482+
let key1 = exposed_keys.get(exposed_position);
480483
let key2 = batch_cursor.get_key(batch_storage);
481484
let key = match (key1, key2) {
482485
(Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
@@ -492,8 +495,8 @@ where
492495
interesting_times.clear();
493496

494497
// Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
495-
while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(&T1::KeyContainer::borrow_as(&k))).unwrap_or(false) {
496-
interesting_times.push(exposed[exposed_position].1.clone());
498+
while exposed_keys.get(exposed_position) == Some(key) {
499+
interesting_times.push(T1::owned_time(exposed_time.index(exposed_position)));
497500
exposed_position += 1;
498501
}
499502

differential-dataflow/src/trace/implementations/huffman_container.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,16 @@ impl<B: Ord + Clone> HuffmanContainer<B> {
3030
}
3131
}
3232

33-
impl<B: Ord + Clone + 'static> PushInto<Vec<B>> for HuffmanContainer<B> {
34-
fn push_into(&mut self, item: Vec<B>) {
33+
impl<'a, B: Ord + Clone + 'static> PushInto<&'a Vec<B>> for HuffmanContainer<B> {
34+
fn push_into(&mut self, item: &'a Vec<B>) {
3535
for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
3636
match &mut self.inner {
3737
Ok((huffman, bytes)) => {
3838
bytes.extend(huffman.encode(item.iter()));
3939
self.offsets.push(bytes.len());
4040
},
4141
Err(raw) => {
42-
raw.extend(item);
42+
raw.extend(item.iter().cloned());
4343
self.offsets.push(raw.len());
4444
}
4545
}
@@ -95,12 +95,12 @@ impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
9595
Err(bytes) => other.extend_from_slice(bytes),
9696
}
9797
}
98-
fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { Self::ReadItem { inner: Err(&owned[..]) } }
99-
10098
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
10199

102100
fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
103-
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
101+
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
102+
103+
fn clear(&mut self) { *self = Self::default(); }
104104

105105
fn with_capacity(size: usize) -> Self {
106106
let mut offsets = OffsetList::with_capacity(size + 1);

differential-dataflow/src/trace/implementations/mod.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -353,12 +353,12 @@ impl BatchContainer for OffsetList {
353353
#[inline(always)]
354354
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item }
355355
#[inline(always)]
356-
fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { *owned }
357-
#[inline(always)]
358356
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
359357

360358
fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
361-
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
359+
fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) }
360+
361+
fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); }
362362

363363
fn with_capacity(size: usize) -> Self {
364364
Self::with_capacity(size)
@@ -536,15 +536,14 @@ pub mod containers {
536536
fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
537537
*other = Self::into_owned(item);
538538
}
539-
/// Borrows an owned instance as oneself.
540-
#[must_use]
541-
fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a>;
542-
543539

544540
/// Push an item into this container
545541
fn push_ref(&mut self, item: Self::ReadItem<'_>);
546542
/// Push an item into this container
547-
fn push_own(&mut self, item: Self::Owned);
543+
fn push_own(&mut self, item: &Self::Owned);
544+
545+
/// Clears the container. May not release resources.
546+
fn clear(&mut self);
548547

549548
/// Creates a new container with sufficient capacity.
550549
fn with_capacity(size: usize) -> Self;
@@ -632,12 +631,13 @@ pub mod containers {
632631

633632
#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
634633
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
635-
#[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { owned }
636634

637635
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
638636

639637
fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
640-
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
638+
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) }
639+
640+
fn clear(&mut self) { self.clear() }
641641

642642
fn with_capacity(size: usize) -> Self {
643643
Vec::with_capacity(size)
@@ -664,12 +664,13 @@ pub mod containers {
664664

665665
#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
666666
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
667-
#[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { owned }
668667

669668
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
670669

671670
fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
672-
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
671+
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
672+
673+
fn clear(&mut self) { self.clear() }
673674

674675
fn with_capacity(size: usize) -> Self {
675676
Self::with_capacity(size)
@@ -713,15 +714,6 @@ pub mod containers {
713714
}
714715
}
715716

716-
impl<B> PushInto<Vec<B>> for SliceContainer<B> {
717-
fn push_into(&mut self, item: Vec<B>) {
718-
for x in item.into_iter() {
719-
self.inner.push(x);
720-
}
721-
self.offsets.push(self.inner.len());
722-
}
723-
}
724-
725717
impl<B> BatchContainer for SliceContainer<B>
726718
where
727719
B: Ord + Clone + Sized + 'static,
@@ -731,12 +723,17 @@ pub mod containers {
731723

732724
#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() }
733725
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); }
734-
#[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { &owned[..] }
735726

736727
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
737728

738729
fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
739-
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
730+
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
731+
732+
fn clear(&mut self) {
733+
self.offsets.clear();
734+
self.offsets.push(0);
735+
self.inner.clear();
736+
}
740737

741738
fn with_capacity(size: usize) -> Self {
742739
let mut offsets = Vec::with_capacity(size + 1);

differential-dataflow/src/trace/implementations/ord_neu.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,15 @@ pub mod layers {
207207
///
208208
/// Tracked independently to account for duplicate compression.
209209
total: usize,
210+
211+
/// Time container to stage singleton times for evaluation.
212+
time_con: T,
213+
/// Diff container to stage singleton times for evaluation.
214+
diff_con: D,
210215
}
211216

212217
impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
213-
fn default() -> Self { Self { stash: Vec::default(), total: 0, } }
218+
fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
214219
}
215220

216221

@@ -230,34 +235,28 @@ pub mod layers {
230235
pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
231236
use crate::consolidation;
232237
consolidation::consolidate(&mut self.stash);
233-
if !self.stash.is_empty() {
234-
// If there is a single element, equal to a just-prior recorded update,
235-
// we push nothing and report an unincremented offset to encode this case.
236-
let time_diff = upds.times.last().zip(upds.diffs.last());
237-
let last_eq = self.stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
238-
let t1 = T::borrow_as(t1);
239-
let d1 = D::borrow_as(d1);
240-
t1.eq(&t2) && d1.eq(&d2)
241-
});
242-
if self.stash.len() == 1 && last_eq.unwrap_or(false) {
243-
// Just clear out the stash, as we won't drain it here.
238+
// If everything consolidates away, return false.
239+
if self.stash.is_empty() { return false; }
240+
// If there is a singleton, we may be able to optimize.
241+
if self.stash.len() == 1 {
242+
let (time, diff) = self.stash.last().unwrap();
243+
self.time_con.clear(); self.time_con.push_own(time);
244+
self.diff_con.clear(); self.diff_con.push_own(diff);
245+
if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
244246
self.total += 1;
245247
self.stash.clear();
246248
upds.offs.push_ref(upds.times.len());
249+
return true;
247250
}
248-
else {
249-
// Conventional; move `stash` into `updates`.
250-
self.total += self.stash.len();
251-
for (time, diff) in self.stash.drain(..) {
252-
upds.times.push_own(time);
253-
upds.diffs.push_own(diff);
254-
}
255-
upds.offs.push_ref(upds.times.len());
256-
}
257-
true
258-
} else {
259-
false
260251
}
252+
// Conventional; move `stash` into `updates`.
253+
self.total += self.stash.len();
254+
for (time, diff) in self.stash.drain(..) {
255+
upds.times.push_own(&time);
256+
upds.diffs.push_own(&diff);
257+
}
258+
upds.offs.push_ref(upds.times.len());
259+
true
261260
}
262261

263262
/// Completes the building and returns the total updates sealed.

0 commit comments

Comments
 (0)