Skip to content

Commit 8123b20

Browse files
Further tidying up of submitted PRs (#367)
* remove transmute and fix rust warning while `()` is a ZST the potentially dangling reference is still undefined behaviour. Making it a static is a trivial fix. Signed-off-by: Petros Angelatos <petrosagg@gmail.com> * trace: define `BatchReader` with associated types The `TraceReader` trait uses associated types to define its `Key`, `Val`, `Time`, `Diff` but the `BatchReader` trait did not, even though they are very similar in nature. Usually the choice between asssociated types or generic parameters on a trait is determined by whether or not a particular type is expected to implement the same trait multiple times. My starting point was that these two trait should at the very least be consistent with respect to their structure and either both use generic parameters or both use associated types. All the uses in this repo (and also that I can imagine being useful) don't really need `BatchReader` to be polymorphic for a particular type and so I chose to change that one to make it consistent with `TraceReader`. The result is quite pleasing as in many cases a lot of generic parameters are erased. In order to keep this PR short I left the `Cursor` trait untouched, but I believe a similar transformation would be beneficial there too, simplifying further many type signatures. Signed-off-by: Petros Angelatos <petrosagg@gmail.com> * trace: redefine Cursor with associated types Signed-off-by: Petros Angelatos <petrosagg@gmail.com> * remove transmute and fix rust warning while `()` is a ZST the potentially dangling reference is still undefined behaviour. Making it a static is a trivial fix. Signed-off-by: Petros Angelatos <petrosagg@gmail.com> * simplify generics of batch related traits * simplify generics of join's deferred struct * merge CursorDebug trait methods into Cursor * various code style changes Co-authored-by: Frank McSherry <fmcsherry@me.com>
1 parent 337f847 commit 8123b20

32 files changed

+605
-600
lines changed

dogsdogsdogs/src/operators/count.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
44
use differential_dataflow::difference::{Monoid, Multiply};
55
use differential_dataflow::lattice::Lattice;
66
use differential_dataflow::operators::arrange::Arranged;
7-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
7+
use differential_dataflow::trace::TraceReader;
88

99
/// Reports a number of extensions to a stream of prefixes.
1010
///
@@ -23,8 +23,6 @@ where
2323
G::Timestamp: Lattice,
2424
Tr: TraceReader<Val=(), Time=G::Timestamp, R=isize>+Clone+'static,
2525
Tr::Key: Ord+Hashable+Default,
26-
Tr::Batch: BatchReader<Tr::Key, (), Tr::Time, Tr::R>,
27-
Tr::Cursor: Cursor<Tr::Key, (), Tr::Time, Tr::R>,
2826
R: Monoid+Multiply<Output = R>+ExchangeData,
2927
F: Fn(&P)->Tr::Key+Clone+'static,
3028
P: ExchangeData,

dogsdogsdogs/src/operators/half_join.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
4242
use differential_dataflow::difference::{Monoid, Semigroup};
4343
use differential_dataflow::lattice::Lattice;
4444
use differential_dataflow::operators::arrange::Arranged;
45-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
45+
use differential_dataflow::trace::{Cursor, TraceReader};
4646
use differential_dataflow::consolidation::{consolidate, consolidate_updates};
4747

4848
/// A binary equijoin that responds to updates on only its first input.
@@ -81,8 +81,6 @@ where
8181
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
8282
Tr::Key: Ord+Hashable+ExchangeData,
8383
Tr::Val: Clone,
84-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
85-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
8684
Tr::R: Monoid+ExchangeData,
8785
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
8886
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
@@ -137,8 +135,6 @@ where
137135
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
138136
Tr::Key: Ord+Hashable+ExchangeData,
139137
Tr::Val: Clone,
140-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
141-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
142138
Tr::R: Monoid+ExchangeData,
143139
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
144140
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,

dogsdogsdogs/src/operators/lookup_map.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
1010
use differential_dataflow::difference::{Semigroup, Monoid};
1111
use differential_dataflow::lattice::Lattice;
1212
use differential_dataflow::operators::arrange::Arranged;
13-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
13+
use differential_dataflow::trace::{Cursor, TraceReader};
1414

1515
/// Proposes extensions to a stream of prefixes.
1616
///
@@ -32,8 +32,6 @@ where
3232
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
3333
Tr::Key: Ord+Hashable,
3434
Tr::Val: Clone,
35-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
36-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
3735
Tr::R: Monoid+ExchangeData,
3836
F: FnMut(&D, &mut Tr::Key)+Clone+'static,
3937
D: ExchangeData,

dogsdogsdogs/src/operators/propose.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
44
use differential_dataflow::difference::{Monoid, Multiply};
55
use differential_dataflow::lattice::Lattice;
66
use differential_dataflow::operators::arrange::Arranged;
7-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
7+
use differential_dataflow::trace::TraceReader;
88

99
/// Proposes extensions to a prefix stream.
1010
///
@@ -25,8 +25,6 @@ where
2525
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
2626
Tr::Key: Ord+Hashable+Default,
2727
Tr::Val: Clone,
28-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
29-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
3028
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
3129
F: Fn(&P)->Tr::Key+Clone+'static,
3230
P: ExchangeData,
@@ -58,8 +56,6 @@ where
5856
Tr: TraceReader<Time=G::Timestamp>+Clone+'static,
5957
Tr::Key: Ord+Hashable+Default,
6058
Tr::Val: Clone,
61-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
62-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
6359
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
6460
F: Fn(&P)->Tr::Key+Clone+'static,
6561
P: ExchangeData,

dogsdogsdogs/src/operators/validate.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use differential_dataflow::{ExchangeData, Collection};
66
use differential_dataflow::difference::{Monoid, Multiply};
77
use differential_dataflow::lattice::Lattice;
88
use differential_dataflow::operators::arrange::Arranged;
9-
use differential_dataflow::trace::{Cursor, TraceReader, BatchReader};
9+
use differential_dataflow::trace::TraceReader;
1010

1111
/// Proposes extensions to a stream of prefixes.
1212
///
@@ -24,8 +24,6 @@ where
2424
Tr: TraceReader<Key=(K,V), Val=(), Time=G::Timestamp>+Clone+'static,
2525
K: Ord+Hash+Clone+Default,
2626
V: ExchangeData+Hash+Default,
27-
Tr::Batch: BatchReader<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
28-
Tr::Cursor: Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>,
2927
Tr::R: Monoid+Multiply<Output = Tr::R>+ExchangeData,
3028
F: Fn(&P)->K+Clone+'static,
3129
P: ExchangeData,

examples/cursors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use timely::progress::frontier::AntichainRef;
4343
use differential_dataflow::input::Input;
4444
use differential_dataflow::operators::arrange::ArrangeByKey;
4545
use differential_dataflow::operators::*;
46-
use differential_dataflow::trace::cursor::CursorDebug;
46+
use differential_dataflow::trace::cursor::Cursor;
4747
use differential_dataflow::trace::TraceReader;
4848

4949
type Node = u32;

src/algorithms/graphs/bfs.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ where
3030
G::Timestamp: Lattice+Ord,
3131
N: ExchangeData+Hash,
3232
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
33-
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
34-
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
3533
{
3634
// initialize roots as reaching themselves at distance 0
3735
let nodes = roots.map(|x| (x, 0));

src/algorithms/graphs/bijkstra.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ where
4646
G::Timestamp: Lattice+Ord,
4747
N: ExchangeData+Hash,
4848
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=isize>+Clone+'static,
49-
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
50-
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
5149
{
5250
forward
5351
.stream

src/algorithms/graphs/propagate.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ where
6565
R: From<i8>,
6666
L: ExchangeData,
6767
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=R>+Clone+'static,
68-
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
69-
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
7068
F: Fn(&L)->u64+Clone+'static,
7169
{
7270
// Morally the code performs the following iterative computation. However, in the interest of a simplified

src/operators/arrange/agent.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ where
7777
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
7878
self.physical_compaction.borrow()
7979
}
80-
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor<Tr::Key, Tr::Val, Tr::Time, Tr::R>>::Storage)> {
80+
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor>::Storage)> {
8181
self.trace.borrow_mut().trace.cursor_through(frontier)
8282
}
8383
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
@@ -92,7 +92,7 @@ where
9292
pub fn new(trace: Tr, operator: ::timely::dataflow::operators::generic::OperatorInfo, logging: Option<::logging::Logger>) -> (Self, TraceWriter<Tr>)
9393
where
9494
Tr: Trace,
95-
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
95+
Tr::Batch: Batch,
9696
{
9797
let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
9898
let queues = Rc::new(RefCell::new(Vec::new()));

0 commit comments

Comments
 (0)