Skip to content

Commit 8b61715

Browse files
Update batch processing to support compacted inputs (#530)
* Update batch processing to support compacted inputs * correct lower_limit oversight
1 parent 5835419 commit 8b61715

File tree

2 files changed

+113
-76
lines changed

2 files changed

+113
-76
lines changed

src/operators/count.rs

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -69,51 +69,70 @@ where
6969

7070
self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {
7171

72-
// tracks the upper limit of known-complete timestamps.
72+
// tracks the lower and upper limit of received batches.
73+
let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
7374
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
7475

7576
move |input, output| {
7677

77-
use crate::trace::cursor::IntoOwned;
78+
let mut batch_cursors = Vec::new();
79+
let mut batch_storage = Vec::new();
80+
81+
// Downgrade previous upper limit to be current lower limit.
82+
lower_limit.clear();
83+
lower_limit.extend(upper_limit.borrow().iter().cloned());
84+
85+
let mut cap = None;
7886
input.for_each(|capability, batches| {
87+
if cap.is_none() { // NB: Assumes batches are in-order
88+
cap = Some(capability.retain());
89+
}
7990
batches.swap(&mut buffer);
80-
let mut session = output.session(&capability);
8191
for batch in buffer.drain(..) {
82-
let mut batch_cursor = batch.cursor();
83-
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
84-
upper_limit.clone_from(batch.upper());
85-
86-
while let Some(key) = batch_cursor.get_key(&batch) {
87-
let mut count: Option<T1::Diff> = None;
88-
89-
trace_cursor.seek_key(&trace_storage, key);
90-
if trace_cursor.get_key(&trace_storage) == Some(key) {
91-
trace_cursor.map_times(&trace_storage, |_, diff| {
92-
count.as_mut().map(|c| c.plus_equals(&diff));
93-
if count.is_none() { count = Some(diff.into_owned()); }
94-
});
95-
}
92+
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
93+
batch_cursors.push(batch.cursor());
94+
batch_storage.push(batch);
95+
}
96+
});
9697

97-
batch_cursor.map_times(&batch, |time, diff| {
98+
if let Some(capability) = cap {
9899

99-
if let Some(count) = count.as_ref() {
100-
if !count.is_zero() {
101-
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
102-
}
103-
}
100+
let mut session = output.session(&capability);
101+
102+
use crate::trace::cursor::CursorList;
103+
let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
104+
let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
105+
106+
while let Some(key) = batch_cursor.get_key(&batch_storage) {
107+
let mut count: Option<T1::Diff> = None;
108+
109+
trace_cursor.seek_key(&trace_storage, key);
110+
if trace_cursor.get_key(&trace_storage) == Some(key) {
111+
trace_cursor.map_times(&trace_storage, |_, diff| {
104112
count.as_mut().map(|c| c.plus_equals(&diff));
105113
if count.is_none() { count = Some(diff.into_owned()); }
106-
if let Some(count) = count.as_ref() {
107-
if !count.is_zero() {
108-
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
109-
}
110-
}
111114
});
112-
113-
batch_cursor.step_key(&batch);
114115
}
116+
117+
batch_cursor.map_times(&batch_storage, |time, diff| {
118+
119+
if let Some(count) = count.as_ref() {
120+
if !count.is_zero() {
121+
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
122+
}
123+
}
124+
count.as_mut().map(|c| c.plus_equals(&diff));
125+
if count.is_none() { count = Some(diff.into_owned()); }
126+
if let Some(count) = count.as_ref() {
127+
if !count.is_zero() {
128+
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
129+
}
130+
}
131+
});
132+
133+
batch_cursor.step_key(&batch_storage);
115134
}
116-
});
135+
}
117136

118137
// tidy up the shared input trace.
119138
trace.advance_upper(&mut upper_limit);

src/operators/threshold.rs

Lines changed: 63 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use crate::hashable::Hashable;
1515
use crate::collection::AsCollection;
1616
use crate::operators::arrange::{Arranged, ArrangeBySelf};
1717
use crate::trace::{BatchReader, Cursor, TraceReader};
18-
use crate::trace::cursor::IntoOwned;
1918

2019
/// Extension trait for the `distinct` differential dataflow method.
2120
pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
@@ -117,66 +116,85 @@ where
117116

118117
self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {
119118

120-
// tracks the upper limit of known-complete timestamps.
119+
// tracks the lower and upper limit of received batches.
120+
let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
121121
let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
122122

123123
move |input, output| {
124124

125+
let mut batch_cursors = Vec::new();
126+
let mut batch_storage = Vec::new();
127+
128+
// Downgrde previous upper limit to be current lower limit.
129+
lower_limit.clear();
130+
lower_limit.extend(upper_limit.borrow().iter().cloned());
131+
132+
let mut cap = None;
125133
input.for_each(|capability, batches| {
134+
if cap.is_none() { // NB: Assumes batches are in-order
135+
cap = Some(capability.retain());
136+
}
126137
batches.swap(&mut buffer);
127-
let mut session = output.session(&capability);
128138
for batch in buffer.drain(..) {
139+
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
140+
batch_cursors.push(batch.cursor());
141+
batch_storage.push(batch);
142+
}
143+
});
129144

130-
let mut batch_cursor = batch.cursor();
131-
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
145+
use crate::trace::cursor::IntoOwned;
146+
if let Some(capability) = cap {
132147

133-
upper_limit.clone_from(batch.upper());
148+
let mut session = output.session(&capability);
134149

135-
while let Some(key) = batch_cursor.get_key(&batch) {
136-
let mut count: Option<T1::Diff> = None;
150+
use crate::trace::cursor::CursorList;
151+
let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
152+
let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
137153

138-
// Compute the multiplicity of this key before the current batch.
139-
trace_cursor.seek_key(&trace_storage, key);
140-
if trace_cursor.get_key(&trace_storage) == Some(key) {
141-
trace_cursor.map_times(&trace_storage, |_, diff| {
142-
count.as_mut().map(|c| c.plus_equals(&diff));
143-
if count.is_none() { count = Some(diff.into_owned()); }
144-
});
145-
}
154+
while let Some(key) = batch_cursor.get_key(&batch_storage) {
155+
let mut count: Option<T1::Diff> = None;
146156

147-
// Apply `thresh` both before and after `diff` is applied to `count`.
148-
// If the result is non-zero, send it along.
149-
batch_cursor.map_times(&batch, |time, diff| {
150-
151-
let difference =
152-
match &count {
153-
Some(old) => {
154-
let mut temp = old.clone();
155-
temp.plus_equals(&diff);
156-
thresh(key, &temp, Some(old))
157-
},
158-
None => { thresh(key, &diff.into_owned(), None) },
159-
};
160-
161-
// Either add or assign `diff` to `count`.
162-
if let Some(count) = &mut count {
163-
count.plus_equals(&diff);
164-
}
165-
else {
166-
count = Some(diff.into_owned());
167-
}
157+
// Compute the multiplicity of this key before the current batch.
158+
trace_cursor.seek_key(&trace_storage, key);
159+
if trace_cursor.get_key(&trace_storage) == Some(key) {
160+
trace_cursor.map_times(&trace_storage, |_, diff| {
161+
count.as_mut().map(|c| c.plus_equals(&diff));
162+
if count.is_none() { count = Some(diff.into_owned()); }
163+
});
164+
}
165+
166+
// Apply `thresh` both before and after `diff` is applied to `count`.
167+
// If the result is non-zero, send it along.
168+
batch_cursor.map_times(&batch_storage, |time, diff| {
169+
170+
let difference =
171+
match &count {
172+
Some(old) => {
173+
let mut temp = old.clone();
174+
temp.plus_equals(&diff);
175+
thresh(key, &temp, Some(old))
176+
},
177+
None => { thresh(key, &diff.into_owned(), None) },
178+
};
179+
180+
// Either add or assign `diff` to `count`.
181+
if let Some(count) = &mut count {
182+
count.plus_equals(&diff);
183+
}
184+
else {
185+
count = Some(diff.into_owned());
186+
}
168187

169-
if let Some(difference) = difference {
170-
if !difference.is_zero() {
171-
session.give((key.clone(), time.into_owned(), difference));
172-
}
188+
if let Some(difference) = difference {
189+
if !difference.is_zero() {
190+
session.give((key.clone(), time.into_owned(), difference));
173191
}
174-
});
192+
}
193+
});
175194

176-
batch_cursor.step_key(&batch);
177-
}
195+
batch_cursor.step_key(&batch_storage);
178196
}
179-
});
197+
}
180198

181199
// tidy up the shared input trace.
182200
trace.advance_upper(&mut upper_limit);

0 commit comments

Comments
 (0)