Skip to content

Commit 91f0270

Browse files
committed
wip implementation of cross join via broadcasting batches
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent df1d937 commit 91f0270

File tree

3 files changed

+282
-1
lines changed

3 files changed

+282
-1
lines changed

examples/broadcast_cross_join.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
extern crate timely;
2+
extern crate differential_dataflow;
3+
4+
use std::ops::Mul;
5+
6+
use timely::Data;
7+
use timely::dataflow::channels::pact::Pipeline;
8+
use timely::dataflow::operators::broadcast::Broadcast;
9+
use timely::dataflow::operators::Operator;
10+
use timely::dataflow::{Scope, Stream};
11+
use timely::order::TotalOrder;
12+
use differential_dataflow::{Collection, AsCollection};
13+
use differential_dataflow::difference::Semigroup;
14+
use differential_dataflow::input::Input;
15+
use differential_dataflow::lattice::Lattice;
16+
use differential_dataflow::operators::arrange::{Arrange, Arranged};
17+
use differential_dataflow::trace::{Cursor, BatchReader, TraceReader};
18+
use differential_dataflow::trace::implementations::ord::OrdKeySpineAbomArc;
19+
20+
// This function is supposed to do one half of a cross join but its implementation is currently
21+
// incorrect
22+
// TODO: actually implement a half cross join
23+
fn half_cross_join<G, Tr1, Key2, R2, Batch2>(
24+
left: Arranged<G, Tr1>,
25+
right: &Stream<G, Batch2>,
26+
) -> Collection<G, (Tr1::Key, Key2), <R2 as Mul<Tr1::R>>::Output>
27+
where
28+
G: Scope,
29+
G::Timestamp: Lattice + TotalOrder + Ord,
30+
Tr1: TraceReader<Time = G::Timestamp> + Clone + 'static,
31+
Tr1::Key: Clone,
32+
Tr1::R: Clone,
33+
Batch2: BatchReader<Key2, (), G::Timestamp, R2> + Data,
34+
Key2: Clone + 'static,
35+
R2: Semigroup + Clone + Mul<Tr1::R>,
36+
<R2 as Mul<Tr1::R>>::Output: Semigroup,
37+
{
38+
let mut trace = left.trace;
39+
right.unary(Pipeline, "CrossJoin", move |_cap, _info| {
40+
let mut vector = Vec::new();
41+
move |input, output| {
42+
while let Some((time, data)) = input.next() {
43+
data.swap(&mut vector);
44+
for batch in vector.drain(..) {
45+
let mut cursor = batch.cursor();
46+
while let Some(key1) = cursor.get_key(&batch) {
47+
let (mut trace_cursor, trace_storage) = trace.cursor();
48+
cursor.map_times(&batch, |time1, diff1| {
49+
while let Some(key2) = trace_cursor.get_key(&trace_storage) {
50+
trace_cursor.map_times(&trace_storage, |time2, diff2| {
51+
let effect_time = std::cmp::max(time1.clone(), time2.clone());
52+
let cap_time = time.delayed(&effect_time);
53+
let diff = diff1.clone().mul(diff2.clone());
54+
let mut session = output.session(&cap_time);
55+
session.give((((key2.clone(), key1.clone())), effect_time, diff));
56+
});
57+
trace_cursor.step_key(&trace_storage);
58+
}
59+
});
60+
cursor.step_key(&batch);
61+
}
62+
}
63+
}
64+
}
65+
})
66+
.as_collection()
67+
}
68+
69+
fn main() {
70+
timely::execute_from_args(::std::env::args(), move |worker| {
71+
let worker_idx = worker.index();
72+
let (mut handle1, mut handle2, probe) = worker.dataflow::<u64, _, _>(|scope| {
73+
let (handle1, input1) = scope.new_collection();
74+
let (handle2, input2) = scope.new_collection();
75+
76+
let arranged1 = input1.arrange::<OrdKeySpineAbomArc<_, _, _>>();
77+
let arranged2 = input2.arrange::<OrdKeySpineAbomArc<_, _, _>>();
78+
79+
let batches1 = arranged1.stream.broadcast();
80+
let batches2 = arranged2.stream.broadcast();
81+
82+
// Changes from input1 need to be joined with the per-worker arrangement state of input2
83+
let cross1 = half_cross_join(arranged2, &batches1);
84+
85+
// Changes from input2 need to be joined with the per-worker arrangement state of input1
86+
let cross2 = half_cross_join(arranged1, &batches2);
87+
88+
// The final cross join is the combination of these two
89+
let cross_join = cross1.map(|(key1, key2)| (key2, key1)).concat(&cross2);
90+
91+
let probe = cross_join
92+
.inspect(move |d| {
93+
println!("worker {} produced: {:?}", worker_idx, d);
94+
})
95+
.probe();
96+
97+
(handle1, handle2, probe)
98+
});
99+
100+
handle1.insert(1i64);
101+
handle1.advance_to(1);
102+
handle1.insert(2);
103+
handle1.advance_to(2);
104+
handle1.flush();
105+
106+
handle2.insert("apple".to_string());
107+
handle2.advance_to(1);
108+
handle2.insert("orange".to_string());
109+
handle2.advance_to(2);
110+
handle2.flush();
111+
112+
while probe.less_than(handle1.time()) {
113+
worker.step();
114+
}
115+
}).unwrap();
116+
}

src/trace/implementations/ord.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use trace::layers::Builder as TrieBuilder;
2323
use trace::layers::Cursor as TrieCursor;
2424
use trace::layers::ordered::{OrdOffset, OrderedLayer, OrderedBuilder, OrderedCursor};
2525
use trace::layers::ordered_leaf::{OrderedLeaf, OrderedLeafBuilder};
26+
use trace::abomonated_arc_blanket_impls::AbomArc;
2627
use trace::{Batch, BatchReader, Builder, Merger, Cursor};
2728
use trace::description::Description;
2829

@@ -46,6 +47,8 @@ pub type OrdKeySpine<K, T, R, O=usize> = Spine<K, (), T, R, Rc<OrdKeyBatch<K, T,
4647
/// A trace implementation for empty values using a spine of abomonated ordered lists.
4748
pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<K, (), T, R, Rc<Abomonated<OrdKeyBatch<K, T, R, O>, Vec<u8>>>>;
4849

50+
/// A trace implementation for empty values using a spine of atomic reference counted abomonated ordered lists.
51+
pub type OrdKeySpineAbomArc<K, T, R, O = usize> = Spine<K, (), T, R, AbomArc<OrdKeyBatch<K, T, R, O>>>;
4952

5053
/// An immutable collection of update tuples, from a contiguous interval of logical times.
5154
#[derive(Debug, Abomonation)]
@@ -396,7 +399,7 @@ where
396399

397400

398401
/// An immutable collection of update tuples, from a contiguous interval of logical times.
399-
#[derive(Debug, Abomonation)]
402+
#[derive(Debug, Clone, Abomonation)]
400403
pub struct OrdKeyBatch<K, T, R, O=usize>
401404
where
402405
K: Ord,

src/trace/mod.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,168 @@ pub mod rc_blanket_impls {
426426
}
427427
}
428428

429+
/// Blanket implementations for atomic reference counted batches.
430+
pub mod abomonated_arc_blanket_impls {
431+
use std::io::Write;
432+
use std::mem;
433+
use std::ops::Deref;
434+
use std::sync::Arc;
435+
436+
use abomonation::Abomonation;
437+
438+
/// Wrapper over Arc that can be safely Abomonated.
439+
pub enum AbomArc<T> {
440+
/// An Arc that has been constructed normally
441+
Owned(Arc<T>),
442+
/// The result of decoding an abomonated AbomArc
443+
Abomonated(Box<T>),
444+
}
445+
446+
impl<T> AbomArc<T> {
447+
fn new(inner: T) -> Self {
448+
Self::Owned(Arc::new(inner))
449+
}
450+
}
451+
452+
impl<T: Clone> Clone for AbomArc<T> {
453+
fn clone(&self) -> Self {
454+
match self {
455+
Self::Owned(arc) => Self::Owned(Arc::clone(arc)),
456+
Self::Abomonated(val) => Self::Owned(Arc::new(T::clone(&*val))),
457+
}
458+
}
459+
}
460+
461+
impl<T> Deref for AbomArc<T> {
462+
type Target = T;
463+
fn deref(&self) -> &Self::Target {
464+
match self {
465+
Self::Owned(arc) => &**arc,
466+
Self::Abomonated(val) => &*val,
467+
}
468+
}
469+
}
470+
471+
impl<T: Abomonation> Abomonation for AbomArc<T> {
472+
unsafe fn entomb<W: Write>(&self, bytes: &mut W) -> std::io::Result<()> {
473+
bytes.write_all(std::slice::from_raw_parts(mem::transmute(&**self), mem::size_of::<T>()))?;
474+
(**self).entomb(bytes)
475+
}
476+
unsafe fn exhume<'a,'b>(&'a mut self, bytes: &'b mut [u8]) -> Option<&'b mut [u8]> {
477+
let binary_len = mem::size_of::<T>();
478+
if binary_len > bytes.len() {
479+
None
480+
} else {
481+
let (mine, rest) = bytes.split_at_mut(binary_len);
482+
let mut value = Box::from_raw(mine.as_mut_ptr() as *mut T);
483+
let rest = (*value).exhume(rest)?;
484+
std::ptr::write(self, Self::Abomonated(value));
485+
Some(rest)
486+
}
487+
}
488+
fn extent(&self) -> usize {
489+
mem::size_of::<T>() + (&**self).extent()
490+
}
491+
}
492+
493+
use timely::progress::{Antichain, frontier::AntichainRef};
494+
use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description};
495+
496+
impl<K, V, T, R, B: BatchReader<K,V,T,R>> BatchReader<K,V,T,R> for AbomArc<B> {
497+
498+
/// The type used to enumerate the batch's contents.
499+
type Cursor = ArcBatchCursor<K, V, T, R, B>;
500+
/// Acquires a cursor to the batch's contents.
501+
fn cursor(&self) -> Self::Cursor {
502+
ArcBatchCursor::new((&**self).cursor())
503+
}
504+
505+
/// The number of updates in the batch.
506+
fn len(&self) -> usize { (&**self).len() }
507+
/// Describes the times of the updates in the batch.
508+
fn description(&self) -> &Description<T> { (&**self).description() }
509+
}
510+
511+
/// Wrapper to provide cursor to nested scope.
512+
pub struct ArcBatchCursor<K, V, T, R, B: BatchReader<K, V, T, R>> {
513+
phantom: ::std::marker::PhantomData<(K, V, T, R)>,
514+
cursor: B::Cursor,
515+
}
516+
517+
impl<K, V, T, R, B: BatchReader<K, V, T, R>> ArcBatchCursor<K, V, T, R, B> {
518+
fn new(cursor: B::Cursor) -> Self {
519+
ArcBatchCursor {
520+
cursor,
521+
phantom: ::std::marker::PhantomData,
522+
}
523+
}
524+
}
525+
526+
impl<K, V, T, R, B: BatchReader<K, V, T, R>> Cursor<K, V, T, R> for ArcBatchCursor<K, V, T, R, B> {
527+
528+
type Storage = AbomArc<B>;
529+
530+
#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
531+
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
532+
533+
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) }
534+
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) }
535+
536+
#[inline]
537+
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, logic: L) {
538+
self.cursor.map_times(storage, logic)
539+
}
540+
541+
#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
542+
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) }
543+
544+
#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
545+
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) }
546+
547+
#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
548+
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
549+
}
550+
551+
/// An immutable collection of updates.
552+
impl<K,V,T,R,B: Batch<K,V,T,R>> Batch<K, V, T, R> for AbomArc<B> {
553+
type Batcher = ArcBatcher<K, V, T, R, B>;
554+
type Builder = ArcBuilder<K, V, T, R, B>;
555+
type Merger = ArcMerger<K, V, T, R, B>;
556+
}
557+
558+
/// Wrapper type for batching reference counted batches.
559+
pub struct ArcBatcher<K,V,T,R,B:Batch<K,V,T,R>> { batcher: B::Batcher }
560+
561+
/// Functionality for collecting and batching updates.
562+
impl<K,V,T,R,B:Batch<K,V,T,R>> Batcher<K, V, T, R, AbomArc<B>> for ArcBatcher<K,V,T,R,B> {
563+
fn new() -> Self { ArcBatcher { batcher: <B::Batcher as Batcher<K,V,T,R,B>>::new() } }
564+
fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) }
565+
fn seal(&mut self, upper: Antichain<T>) -> AbomArc<B> { AbomArc::new(self.batcher.seal(upper)) }
566+
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> { self.batcher.frontier() }
567+
}
568+
569+
/// Wrapper type for building reference counted batches.
570+
pub struct ArcBuilder<K,V,T,R,B:Batch<K,V,T,R>> { builder: B::Builder }
571+
572+
/// Functionality for building batches from ordered update sequences.
573+
impl<K,V,T,R,B:Batch<K,V,T,R>> Builder<K, V, T, R, AbomArc<B>> for ArcBuilder<K,V,T,R,B> {
574+
fn new() -> Self { ArcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::new() } }
575+
fn with_capacity(cap: usize) -> Self { ArcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::with_capacity(cap) } }
576+
fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) }
577+
fn done(self, lower: Antichain<T>, upper: Antichain<T>, since: Antichain<T>) -> AbomArc<B> { AbomArc::new(self.builder.done(lower, upper, since)) }
578+
}
579+
580+
/// Wrapper type for merging reference counted batches.
581+
pub struct ArcMerger<K,V,T,R,B:Batch<K,V,T,R>> { merger: B::Merger }
582+
583+
/// Represents a merge in progress.
584+
impl<K,V,T,R,B:Batch<K,V,T,R>> Merger<K, V, T, R, AbomArc<B>> for ArcMerger<K,V,T,R,B> {
585+
fn new(source1: &AbomArc<B>, source2: &AbomArc<B>, compaction_frontier: Option<AntichainRef<T>>) -> Self { ArcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
586+
fn work(&mut self, source1: &AbomArc<B>, source2: &AbomArc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
587+
fn done(self) -> AbomArc<B> { AbomArc::new(self.merger.done()) }
588+
}
589+
}
590+
429591

430592
/// Blanket implementations for reference counted batches.
431593
pub mod abomonated_blanket_impls {

0 commit comments

Comments
 (0)