Skip to content

Commit 78e7319

Browse files
Generalize TraceFrontier with upper (#370)
1 parent 5a013e0 commit 78e7319

File tree

3 files changed

+81
-44
lines changed

3 files changed

+81
-44
lines changed

examples/capture-test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ pub mod kafka {
153153
use differential_dataflow::lattice::Lattice;
154154

155155
/// Creates a Kafka source from supplied configuration information.
156-
pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any>, Stream<G, (D, T, R)>)
156+
pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
157157
where
158158
G: Scope<Timestamp = T>,
159159
D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,

src/operators/arrange/agent.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -423,19 +423,26 @@ where
423423
Tr: TraceReader,
424424
{
425425
// This frontier describes our only guarantee on the compaction frontier.
426-
let frontier = self.get_logical_compaction().to_owned();
427-
self.import_frontier_core(scope, name, frontier)
426+
let since = self.get_logical_compaction().to_owned();
427+
self.import_frontier_core(scope, name, since, Antichain::new())
428428
}
429429

430-
/// Import a trace advanced to a specific frontier.
431-
pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, frontier: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
430+
/// Import a trace restricted to a specific time interval `[since, until)`.
431+
///
432+
/// All updates present in the input trace will be first advanced to `since`, and then either emitted,
433+
/// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
434+
/// to `until` the operator capability will be dropped.
435+
///
436+
/// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
437+
/// frontier indicates the end of times.
438+
pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
432439
where
433440
G: Scope<Timestamp=Tr::Time>,
434441
Tr::Time: Timestamp+ Lattice+Ord+Clone+'static,
435442
Tr: TraceReader,
436443
{
437444
let trace = self.clone();
438-
let trace = TraceFrontier::make_from(trace, frontier.borrow());
445+
let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
439446

440447
let mut shutdown_button = None;
441448

@@ -458,18 +465,27 @@ where
458465

459466
let mut capabilities = capabilities.borrow_mut();
460467
if let Some(ref mut capabilities) = *capabilities {
461-
462468
let mut borrow = queue.1.borrow_mut();
463469
for instruction in borrow.drain(..) {
464-
match instruction {
465-
TraceReplayInstruction::Frontier(frontier) => {
466-
capabilities.downgrade(&frontier.borrow()[..]);
467-
},
468-
TraceReplayInstruction::Batch(batch, hint) => {
469-
if let Some(time) = hint {
470-
if !batch.is_empty() {
471-
let delayed = capabilities.delayed(&time);
472-
output.session(&delayed).give(BatchFrontier::make_from(batch, frontier.borrow()));
470+
// If we have dropped the capabilities due to `until`, attempt no further work.
471+
// Without the capabilities, we should soon be shut down (once this loop ends).
472+
if !capabilities.is_empty() {
473+
match instruction {
474+
TraceReplayInstruction::Frontier(frontier) => {
475+
if timely::PartialOrder::less_equal(&until, &frontier) {
476+
// It might be nice to actively *drop* `capabilities`, but it seems
477+
// complicated logically (i.e. we'd have to break out of the loop).
478+
capabilities.downgrade(&[]);
479+
} else {
480+
capabilities.downgrade(&frontier.borrow()[..]);
481+
}
482+
},
483+
TraceReplayInstruction::Batch(batch, hint) => {
484+
if let Some(time) = hint {
485+
if !batch.is_empty() {
486+
let delayed = capabilities.delayed(&time);
487+
output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
488+
}
473489
}
474490
}
475491
}

src/trace/wrappers/frontier.rs

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
//! Wrapper for frontiered trace.
22
//!
3-
//! Wraps a trace with a frontier so that all exposed timestamps are first advanced by the frontier.
4-
//! This ensures that even for traces that have been advanced, all views provided through cursors
5-
//! present deterministic times, independent of the compaction strategy.
3+
//! Wraps a trace with `since` and `upper` frontiers so that all exposed timestamps are first advanced
4+
//! by the `since` frontier and restricted by the `upper` frontier. This presents a deterministic trace
5+
//! on the interval `[since, upper)`, presenting only accumulations up to `since` (rather than partially
6+
//! accumulated updates) and no updates at times greater or equal to `upper` (even as parts of batches
7+
//! that span that time).
68
79
use timely::progress::Timestamp;
810
use timely::progress::{Antichain, frontier::AntichainRef};
@@ -17,7 +19,10 @@ where
1719
Tr: TraceReader,
1820
{
1921
trace: Tr,
20-
frontier: Antichain<Tr::Time>,
22+
/// Frontier to which all update times will be advanced.
23+
since: Antichain<Tr::Time>,
24+
/// Frontier after which all update times will be suppressed.
25+
until: Antichain<Tr::Time>,
2126
}
2227

2328
impl<Tr> Clone for TraceFrontier<Tr>
@@ -28,7 +33,8 @@ where
2833
fn clone(&self) -> Self {
2934
TraceFrontier {
3035
trace: self.trace.clone(),
31-
frontier: self.frontier.clone(),
36+
since: self.since.clone(),
37+
until: self.until.clone(),
3238
}
3339
}
3440
}
@@ -51,8 +57,9 @@ where
5157
type Cursor = CursorFrontier<Tr::Cursor>;
5258

5359
fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
54-
let frontier = self.frontier.borrow();
55-
self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), frontier)))
60+
let since = self.since.borrow();
61+
let until = self.until.borrow();
62+
self.trace.map_batches(|batch| f(&Self::Batch::make_from(batch.clone(), since, until)))
5663
}
5764

5865
fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) { self.trace.set_logical_compaction(frontier) }
@@ -62,8 +69,9 @@ where
6269
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.trace.get_physical_compaction() }
6370

6471
fn cursor_through(&mut self, upper: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, <Self::Cursor as Cursor>::Storage)> {
65-
let frontier = self.frontier.borrow();
66-
self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, frontier), y))
72+
let since = self.since.borrow();
73+
let until = self.until.borrow();
74+
self.trace.cursor_through(upper).map(|(x,y)| (CursorFrontier::new(x, since, until), y))
6775
}
6876
}
6977

@@ -73,10 +81,11 @@ where
7381
Tr::Time: Timestamp,
7482
{
7583
/// Makes a new trace wrapper
76-
pub fn make_from(trace: Tr, frontier: AntichainRef<Tr::Time>) -> Self {
84+
pub fn make_from(trace: Tr, since: AntichainRef<Tr::Time>, until: AntichainRef<Tr::Time>) -> Self {
7785
TraceFrontier {
7886
trace,
79-
frontier: frontier.to_owned(),
87+
since: since.to_owned(),
88+
until: until.to_owned(),
8089
}
8190
}
8291
}
@@ -86,10 +95,11 @@ where
8695
#[derive(Clone)]
8796
pub struct BatchFrontier<B: BatchReader> {
8897
batch: B,
89-
frontier: Antichain<B::Time>,
98+
since: Antichain<B::Time>,
99+
until: Antichain<B::Time>,
90100
}
91101

92-
impl<B: BatchReader> BatchReader for BatchFrontier<B>
102+
impl<B> BatchReader for BatchFrontier<B>
93103
where
94104
B: BatchReader,
95105
B::Time: Timestamp+Lattice,
@@ -102,7 +112,7 @@ where
102112
type Cursor = BatchCursorFrontier<B>;
103113

104114
fn cursor(&self) -> Self::Cursor {
105-
BatchCursorFrontier::new(self.batch.cursor(), self.frontier.borrow())
115+
BatchCursorFrontier::new(self.batch.cursor(), self.since.borrow(), self.until.borrow())
106116
}
107117
fn len(&self) -> usize { self.batch.len() }
108118
fn description(&self) -> &Description<B::Time> { &self.batch.description() }
@@ -114,25 +124,28 @@ where
114124
B::Time: Timestamp+Lattice,
115125
{
116126
/// Makes a new batch wrapper
117-
pub fn make_from(batch: B, frontier: AntichainRef<B::Time>) -> Self {
127+
pub fn make_from(batch: B, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
118128
BatchFrontier {
119129
batch,
120-
frontier: frontier.to_owned(),
130+
since: since.to_owned(),
131+
until: until.to_owned(),
121132
}
122133
}
123134
}
124135

125136
/// Wrapper to provide cursor to nested scope.
126137
pub struct CursorFrontier<C: Cursor> {
127138
cursor: C,
128-
frontier: Antichain<C::Time>,
139+
since: Antichain<C::Time>,
140+
until: Antichain<C::Time>
129141
}
130142

131143
impl<C: Cursor> CursorFrontier<C> where C::Time: Clone {
132-
fn new(cursor: C, frontier: AntichainRef<C::Time>) -> Self {
144+
fn new(cursor: C, since: AntichainRef<C::Time>, until: AntichainRef<C::Time>) -> Self {
133145
CursorFrontier {
134146
cursor,
135-
frontier: frontier.to_owned(),
147+
since: since.to_owned(),
148+
until: until.to_owned(),
136149
}
137150
}
138151
}
@@ -157,12 +170,15 @@ where
157170

158171
#[inline]
159172
fn map_times<L: FnMut(&Self::Time,&Self::R)>(&mut self, storage: &Self::Storage, mut logic: L) {
160-
let frontier = self.frontier.borrow();
173+
let since = self.since.borrow();
174+
let until = self.until.borrow();
161175
let mut temp: C::Time = <C::Time as timely::progress::Timestamp>::minimum();
162176
self.cursor.map_times(storage, |time, diff| {
163177
temp.clone_from(time);
164-
temp.advance_by(frontier);
165-
logic(&temp, diff);
178+
temp.advance_by(since);
179+
if !until.less_equal(&temp) {
180+
logic(&temp, diff);
181+
}
166182
})
167183
}
168184

@@ -181,14 +197,16 @@ where
181197
/// Wrapper to provide cursor to nested scope.
182198
pub struct BatchCursorFrontier<B: BatchReader> {
183199
cursor: B::Cursor,
184-
frontier: Antichain<B::Time>,
200+
since: Antichain<B::Time>,
201+
until: Antichain<B::Time>,
185202
}
186203

187204
impl<B: BatchReader> BatchCursorFrontier<B> where B::Time: Clone {
188-
fn new(cursor: B::Cursor, frontier: AntichainRef<B::Time>) -> Self {
205+
fn new(cursor: B::Cursor, since: AntichainRef<B::Time>, until: AntichainRef<B::Time>) -> Self {
189206
BatchCursorFrontier {
190207
cursor,
191-
frontier: frontier.to_owned(),
208+
since: since.to_owned(),
209+
until: until.to_owned(),
192210
}
193211
}
194212
}
@@ -212,12 +230,15 @@ where
212230

213231
#[inline]
214232
fn map_times<L: FnMut(&Self::Time,&Self::R)>(&mut self, storage: &Self::Storage, mut logic: L) {
215-
let frontier = self.frontier.borrow();
233+
let since = self.since.borrow();
234+
let until = self.until.borrow();
216235
let mut temp: B::Time = <B::Time as timely::progress::Timestamp>::minimum();
217236
self.cursor.map_times(&storage.batch, |time, diff| {
218237
temp.clone_from(time);
219-
temp.advance_by(frontier);
220-
logic(&temp, diff);
238+
temp.advance_by(since);
239+
if !until.less_equal(&temp) {
240+
logic(&temp, diff);
241+
}
221242
})
222243
}
223244

0 commit comments

Comments
 (0)