Skip to content

Commit f9e9798

Browse files
Remove IntoOwned (phase 1) (#624)
* Add IntoOwned methods to BatchContainer * Liberate ConsolidateLayout * Add batch ownership to Layout * Remove Update; replace with type aliases * Migrate a substanital amount of into_owned into containers * Remove DiffGat: IntoOwned constraint * Many IntoOwned::borrow_as to the BatchContainer version * Have consolidate require a reify function * Remove TimeGat: IntoOwned constraint * Remove BatchContaire::Owned : IntoOwned constraint * Further tidying * De-generalize CountTotal * Layout comments * More use of Layout aliases * Make BatchContainer push impls explicit * Layout prep work * Add Cursor::clone_time_onto and use it lightly * Remove IntoOwned use from cursor/mod.rs * Document consolidate_named arguments
1 parent c3ceb82 commit f9e9798

File tree

30 files changed

+575
-449
lines changed

30 files changed

+575
-449
lines changed

differential-dataflow/examples/columnar.rs

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -581,11 +581,10 @@ pub mod dd_builder {
581581

582582
use timely::container::PushInto;
583583

584-
use differential_dataflow::IntoOwned;
585584
use differential_dataflow::trace::Builder;
586585
use differential_dataflow::trace::Description;
587586
use differential_dataflow::trace::implementations::Layout;
588-
use differential_dataflow::trace::implementations::Update;
587+
use differential_dataflow::trace::implementations::layout;
589588
use differential_dataflow::trace::implementations::BatchContainer;
590589
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage, OrdKeyBatch, Vals, Upds, layers::UpdsBuilder};
591590
use differential_dataflow::trace::implementations::ord_neu::key_batch::OrdKeyStorage;
@@ -611,18 +610,16 @@ pub mod dd_builder {
611610
impl<L> Builder for OrdValBuilder<L>
612611
where
613612
L: Layout,
614-
<L::KeyContainer as BatchContainer>::Owned: Columnar,
615-
<L::ValContainer as BatchContainer>::Owned: Columnar,
616-
<L::TimeContainer as BatchContainer>::Owned: Columnar,
617-
<L::DiffContainer as BatchContainer>::Owned: Columnar,
613+
layout::Key<L>: Columnar,
614+
layout::Val<L>: Columnar,
615+
layout::Time<L>: Columnar,
616+
layout::Diff<L>: Columnar,
618617
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
619-
for<'a> L::KeyContainer: PushInto<&'a <L::KeyContainer as BatchContainer>::Owned>,
620-
for<'a> L::ValContainer: PushInto<&'a <L::ValContainer as BatchContainer>::Owned>,
621-
for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
622-
for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
618+
for<'a> L::KeyContainer: PushInto<&'a layout::Key<L>>,
619+
for<'a> L::ValContainer: PushInto<&'a layout::Val<L>>,
623620
{
624-
type Input = Column<((<L::KeyContainer as BatchContainer>::Owned,<L::ValContainer as BatchContainer>::Owned),<L::TimeContainer as BatchContainer>::Owned,<L::DiffContainer as BatchContainer>::Owned)>;
625-
type Time = <L::Target as Update>::Time;
621+
type Input = Column<((layout::Key<L>,layout::Val<L>),layout::Time<L>,layout::Diff<L>)>;
622+
type Time = layout::Time<L>;
626623
type Output = OrdValBatch<L>;
627624

628625
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
@@ -648,44 +645,44 @@ pub mod dd_builder {
648645

649646
for ((key,val),time,diff) in chunk.drain() {
650647
// It would be great to avoid.
651-
let key = <<L::KeyContainer as BatchContainer>::Owned as Columnar>::into_owned(key);
652-
let val = <<L::ValContainer as BatchContainer>::Owned as Columnar>::into_owned(val);
648+
let key = <layout::Key<L> as Columnar>::into_owned(key);
649+
let val = <layout::Val<L> as Columnar>::into_owned(val);
653650
// These feel fine (wrt the other versions)
654-
let time = <<L::TimeContainer as BatchContainer>::Owned as Columnar>::into_owned(time);
655-
let diff = <<L::DiffContainer as BatchContainer>::Owned as Columnar>::into_owned(diff);
651+
let time = <layout::Time<L> as Columnar>::into_owned(time);
652+
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);
656653

657654
// Pre-load the first update.
658655
if self.result.keys.is_empty() {
659-
self.result.vals.vals.push(&val);
660-
self.result.keys.push(&key);
656+
self.result.vals.vals.push_into(&val);
657+
self.result.keys.push_into(&key);
661658
self.staging.push(time, diff);
662659
}
663660
// Perhaps this is a continuation of an already received key.
664-
else if self.result.keys.last().map(|k| <<L::KeyContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&key).eq(&k)).unwrap_or(false) {
661+
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
665662
// Perhaps this is a continuation of an already received value.
666-
if self.result.vals.vals.last().map(|v| <<L::ValContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&val).eq(&v)).unwrap_or(false) {
663+
if self.result.vals.vals.last().map(|v| L::ValContainer::borrow_as(&val).eq(&v)).unwrap_or(false) {
667664
self.staging.push(time, diff);
668665
} else {
669666
// New value; complete representation of prior value.
670667
self.staging.seal(&mut self.result.upds);
671668
self.staging.push(time, diff);
672-
self.result.vals.vals.push(&val);
669+
self.result.vals.vals.push_into(&val);
673670
}
674671
} else {
675672
// New key; complete representation of prior key.
676673
self.staging.seal(&mut self.result.upds);
677674
self.staging.push(time, diff);
678-
self.result.vals.offs.push(self.result.vals.len());
679-
self.result.vals.vals.push(&val);
680-
self.result.keys.push(&key);
675+
self.result.vals.offs.push_ref(self.result.vals.len());
676+
self.result.vals.vals.push_into(&val);
677+
self.result.keys.push_into(&key);
681678
}
682679
}
683680
}
684681

685682
#[inline(never)]
686683
fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
687684
self.staging.seal(&mut self.result.upds);
688-
self.result.vals.offs.push(self.result.vals.len());
685+
self.result.vals.offs.push_ref(self.result.vals.len());
689686
OrdValBatch {
690687
updates: self.staging.total(),
691688
storage: self.result,
@@ -718,18 +715,16 @@ pub mod dd_builder {
718715
impl<L> Builder for OrdKeyBuilder<L>
719716
where
720717
L: Layout,
721-
<L::KeyContainer as BatchContainer>::Owned: Columnar,
722-
<L::ValContainer as BatchContainer>::Owned: Columnar,
723-
<L::TimeContainer as BatchContainer>::Owned: Columnar,
724-
<L::DiffContainer as BatchContainer>::Owned: Columnar,
718+
layout::Key<L>: Columnar,
719+
layout::Val<L>: Columnar,
720+
layout::Time<L>: Columnar,
721+
layout::Diff<L>: Columnar,
725722
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
726-
for<'a> L::KeyContainer: PushInto<&'a <L::KeyContainer as BatchContainer>::Owned>,
727-
for<'a> L::ValContainer: PushInto<&'a <L::ValContainer as BatchContainer>::Owned>,
728-
for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
729-
for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
723+
for<'a> L::KeyContainer: PushInto<&'a layout::Key<L>>,
724+
for<'a> L::ValContainer: PushInto<&'a layout::Val<L>>,
730725
{
731-
type Input = Column<((<L::KeyContainer as BatchContainer>::Owned,<L::ValContainer as BatchContainer>::Owned),<L::TimeContainer as BatchContainer>::Owned,<L::DiffContainer as BatchContainer>::Owned)>;
732-
type Time = <L::Target as Update>::Time;
726+
type Input = Column<((layout::Key<L>,layout::Val<L>),layout::Time<L>,layout::Diff<L>)>;
727+
type Time = layout::Time<L>;
733728
type Output = OrdKeyBatch<L>;
734729

735730
fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
@@ -754,24 +749,24 @@ pub mod dd_builder {
754749

755750
for ((key,_val),time,diff) in chunk.drain() {
756751
// It would be great to avoid.
757-
let key = <<L::KeyContainer as BatchContainer>::Owned as Columnar>::into_owned(key);
752+
let key = <layout::Key<L> as Columnar>::into_owned(key);
758753
// These feel fine (wrt the other versions)
759-
let time = <<L::TimeContainer as BatchContainer>::Owned as Columnar>::into_owned(time);
760-
let diff = <<L::DiffContainer as BatchContainer>::Owned as Columnar>::into_owned(diff);
754+
let time = <layout::Time<L> as Columnar>::into_owned(time);
755+
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);
761756

762757
// Pre-load the first update.
763758
if self.result.keys.is_empty() {
764-
self.result.keys.push(&key);
759+
self.result.keys.push_into(&key);
765760
self.staging.push(time, diff);
766761
}
767762
// Perhaps this is a continuation of an already received key.
768-
else if self.result.keys.last().map(|k| <<L::KeyContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&key).eq(&k)).unwrap_or(false) {
763+
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
769764
self.staging.push(time, diff);
770765
} else {
771766
// New key; complete representation of prior key.
772767
self.staging.seal(&mut self.result.upds);
773768
self.staging.push(time, diff);
774-
self.result.keys.push(&key);
769+
self.result.keys.push_into(&key);
775770
}
776771
}
777772
}

differential-dataflow/examples/cursors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ fn main() {
9595

9696
/* Return trace content after the last round. */
9797
let (mut cursor, storage) = graph_trace.cursor();
98-
cursor.to_vec(&storage)
98+
cursor.to_vec(&storage, |k| k.clone(), |v| v.clone())
9999
})
100100
.unwrap().join();
101101

differential-dataflow/examples/spines.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,6 @@ fn main() {
4848
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
4949
.probe_with(&mut probe);
5050
},
51-
"slc" => {
52-
53-
use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine};
54-
55-
let data =
56-
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
57-
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
58-
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
59-
let keys =
60-
keys.map(|x| (x.clone().into_bytes(), 7))
61-
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
62-
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
63-
64-
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
65-
.probe_with(&mut probe);
66-
},
6751
_ => {
6852
println!("unrecognized mode: {:?}", mode)
6953
}

differential-dataflow/src/consolidation.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::cmp::Ordering;
1414
use std::collections::VecDeque;
1515
use timely::Container;
1616
use timely::container::{ContainerBuilder, PushInto};
17-
use crate::{IntoOwned, Data};
17+
use crate::Data;
1818
use crate::difference::{IsZero, Semigroup};
1919

2020
/// Sorts and consolidates `vec`.
@@ -231,11 +231,14 @@ pub trait ConsolidateLayout: Container {
231231
type Key<'a>: Eq where Self: 'a;
232232

233233
/// GAT diff type.
234-
type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a;
234+
type Diff<'a>;
235235

236236
/// Owned diff type.
237237
type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;
238238

239+
/// Converts a reference diff into an owned diff.
240+
fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned;
241+
239242
/// Deconstruct an item into key and diff. Must be cheap.
240243
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);
241244

@@ -266,7 +269,7 @@ pub trait ConsolidateLayout: Container {
266269

267270
let (k, d) = Self::into_parts(item);
268271
let mut prev_key = k;
269-
let mut prev_diff = d.into_owned();
272+
let mut prev_diff = Self::owned_diff(d);
270273

271274
for item in iter {
272275
let (next_key, next_diff) = Self::into_parts(item);
@@ -278,7 +281,7 @@ pub trait ConsolidateLayout: Container {
278281
target.push_with_diff(prev_key, prev_diff);
279282
}
280283
prev_key = next_key;
281-
prev_diff = next_diff.into_owned();
284+
prev_diff = Self::owned_diff(next_diff);
282285
}
283286
}
284287

@@ -293,12 +296,14 @@ impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
293296
where
294297
D: Ord + Clone + 'static,
295298
T: Ord + Clone + 'static,
296-
R: Semigroup + for<'a> IntoOwned<'a, Owned = R> + Clone + 'static,
299+
R: Semigroup + Clone + 'static,
297300
{
298301
type Key<'a> = (D, T) where Self: 'a;
299302
type Diff<'a> = R where Self: 'a;
300303
type DiffOwned = R;
301304

305+
fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff }
306+
302307
fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
303308
((data, time), diff)
304309
}

differential-dataflow/src/operators/arrange/arrangement.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ where
215215
while let Some(val) = cursor.get_val(batch) {
216216
for datum in logic(key, val) {
217217
cursor.map_times(batch, |time, diff| {
218-
session.give((datum.clone(), time.into_owned(), diff.into_owned()));
218+
session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
219219
});
220220
}
221221
cursor.step_val(batch);

differential-dataflow/src/operators/arrange/upsert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ where
244244
// Determine the prior value associated with the key.
245245
while let Some(val) = trace_cursor.get_val(&trace_storage) {
246246
let mut count = 0;
247-
trace_cursor.map_times(&trace_storage, |_time, diff| count += diff.into_owned());
247+
trace_cursor.map_times(&trace_storage, |_time, diff| count += Tr::owned_diff(diff));
248248
assert!(count == 0 || count == 1);
249249
if count == 1 {
250250
assert!(prev_value.is_none());

differential-dataflow/src/operators/consolidate.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
99
use timely::dataflow::Scope;
1010

11-
use crate::{IntoOwned, Collection, ExchangeData, Hashable};
11+
use crate::{Collection, ExchangeData, Hashable};
1212
use crate::consolidation::ConsolidatingContainerBuilder;
1313
use crate::difference::Semigroup;
1414

@@ -45,20 +45,22 @@ where
4545
/// ```
4646
pub fn consolidate(&self) -> Self {
4747
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
48-
self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>>("Consolidate")
48+
self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
4949
}
5050

51-
/// As `consolidate` but with the ability to name the operator and specify the trace type.
52-
pub fn consolidate_named<Ba, Bu, Tr>(&self, name: &str) -> Self
51+
/// As `consolidate` but with the ability to name the operator, specify the trace type,
52+
/// and provide the function `reify` to produce owned keys and values..
53+
pub fn consolidate_named<Ba, Bu, Tr, F>(&self, name: &str, reify: F) -> Self
5354
where
5455
Ba: Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
55-
Tr: for<'a> crate::trace::Trace<Key<'a>: IntoOwned<'a, Owned = D>,Time=G::Timestamp,Diff=R>+'static,
56+
Tr: for<'a> crate::trace::Trace<Time=G::Timestamp,Diff=R>+'static,
5657
Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
58+
F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
5759
{
5860
use crate::operators::arrange::arrangement::Arrange;
5961
self.map(|k| (k, ()))
6062
.arrange_named::<Ba, Bu, Tr>(name)
61-
.as_collection(|d, _| d.into_owned())
63+
.as_collection(reify)
6264
}
6365

6466
/// Aggregates the weights of equal records.

differential-dataflow/src/operators/count.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use timely::dataflow::operators::Operator;
66
use timely::dataflow::channels::pact::Pipeline;
77

88
use crate::lattice::Lattice;
9-
use crate::{IntoOwned, ExchangeData, Collection};
9+
use crate::{ExchangeData, Collection};
1010
use crate::difference::{IsZero, Semigroup};
1111
use crate::hashable::Hashable;
1212
use crate::collection::AsCollection;
@@ -56,7 +56,7 @@ impl<G, K, T1> CountTotal<G, K, T1::Diff> for Arranged<G, T1>
5656
where
5757
G: Scope<Timestamp=T1::Time>,
5858
T1: for<'a> TraceReader<
59-
Key<'a>: IntoOwned<'a, Owned = K>,
59+
Key<'a> = &'a K,
6060
Val<'a>=&'a (),
6161
Time: TotalOrder,
6262
Diff: ExchangeData+Semigroup<T1::DiffGat<'a>>
@@ -109,22 +109,22 @@ where
109109
if trace_cursor.get_key(&trace_storage) == Some(key) {
110110
trace_cursor.map_times(&trace_storage, |_, diff| {
111111
count.as_mut().map(|c| c.plus_equals(&diff));
112-
if count.is_none() { count = Some(diff.into_owned()); }
112+
if count.is_none() { count = Some(T1::owned_diff(diff)); }
113113
});
114114
}
115115

116116
batch_cursor.map_times(&batch_storage, |time, diff| {
117117

118118
if let Some(count) = count.as_ref() {
119119
if !count.is_zero() {
120-
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
120+
session.give(((key.clone(), count.clone()), T1::owned_time(time), R2::from(-1i8)));
121121
}
122122
}
123123
count.as_mut().map(|c| c.plus_equals(&diff));
124-
if count.is_none() { count = Some(diff.into_owned()); }
124+
if count.is_none() { count = Some(T1::owned_diff(diff)); }
125125
if let Some(count) = count.as_ref() {
126126
if !count.is_zero() {
127-
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
127+
session.give(((key.clone(), count.clone()), T1::owned_time(time), R2::from(1i8)));
128128
}
129129
}
130130
});

differential-dataflow/src/operators/join.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ where
171171
V: Data + 'static,
172172
{
173173
fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <Tr::Diff as Multiply<R2>>::Output>
174-
where
174+
where
175175
Tr::Diff: Multiply<R2, Output: Semigroup+'static>,
176176
L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static,
177177
{
@@ -660,14 +660,12 @@ where
660660
Ordering::Greater => batch.seek_key(batch_storage, trace_key),
661661
Ordering::Equal => {
662662

663-
use crate::IntoOwned;
664-
665663
thinker.history1.edits.load(trace, trace_storage, |time| {
666-
let mut time = time.into_owned();
664+
let mut time = C1::owned_time(time);
667665
time.join_assign(meet);
668666
time
669667
});
670-
thinker.history2.edits.load(batch, batch_storage, |time| time.into_owned());
668+
thinker.history2.edits.load(batch, batch_storage, |time| C2::owned_time(time));
671669

672670
// populate `temp` with the results in the best way we know how.
673671
thinker.think(|v1,v2,t,r1,r2| {

0 commit comments

Comments
 (0)