Skip to content

Commit af596dc

Browse files
committed
Non-working moment
1 parent 36176eb commit af596dc

File tree

12 files changed

+212
-157
lines changed

12 files changed

+212
-157
lines changed

differential-dataflow/src/operators/arrange/agent.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,12 @@ pub struct TraceAgent<Tr: TraceReader> {
3636
logging: Option<crate::logging::Logger>,
3737
}
3838

39+
use crate::trace::implementations::LaidOut;
40+
impl<Tr: TraceReader> LaidOut for TraceAgent<Tr> {
41+
type Layout = Tr::Layout;
42+
}
43+
3944
impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
40-
type Key<'a> = Tr::Key<'a>;
41-
type Val<'a> = Tr::Val<'a>;
42-
type Time = Tr::Time;
43-
type TimeGat<'a> = Tr::TimeGat<'a>;
44-
type Diff = Tr::Diff;
45-
type DiffGat<'a> = Tr::DiffGat<'a>;
4645

4746
type Batch = Tr::Batch;
4847
type Storage = Tr::Storage;

differential-dataflow/src/trace/implementations/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub trait Layout {
100100
/// Container for times.
101101
type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
102102
/// Container for diffs.
103-
type DiffContainer: BatchContainer<Owned: Semigroup>;
103+
type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
104104
/// Container for offsets.
105105
type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
106106
}
@@ -126,7 +126,7 @@ pub trait LayoutExt : LaidOut<Layout: Layout<KeyContainer = Self::KeyContainer,
126126
/// Alias for an borrowed time of a layout.
127127
type TimeGat<'a>: Copy + Ord;
128128
/// Alias for an owned diff of a layout.
129-
type Diff: Semigroup;
129+
type Diff: Semigroup + 'static;
130130
/// Alias for an borrowed diff of a layout.
131131
type DiffGat<'a>: Copy + Ord;
132132

differential-dataflow/src/trace/implementations/ord_neu.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,11 @@ pub mod val_batch {
322322
pub updates: usize,
323323
}
324324

325+
impl<L: Layout> LaidOut for OrdValBatch<L> {
326+
type Layout = L;
327+
}
328+
325329
impl<L: Layout> BatchReader for OrdValBatch<L> {
326-
type Key<'a> = layout::KeyRef<'a, L>;
327-
type Val<'a> = layout::ValRef<'a, L>;
328-
type Time = layout::Time<L>;
329-
type TimeGat<'a> = layout::TimeRef<'a, L>;
330-
type Diff = layout::Diff<L>;
331-
type DiffGat<'a> = layout::DiffRef<'a, L>;
332330

333331
type Cursor = OrdValCursor<L>;
334332
fn cursor(&self) -> Self::Cursor {
@@ -796,14 +794,11 @@ pub mod key_batch {
796794
pub updates: usize,
797795
}
798796

799-
impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> BatchReader for OrdKeyBatch<L> {
797+
impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> LaidOut for OrdKeyBatch<L> {
798+
type Layout = L;
799+
}
800800

801-
type Key<'a> = layout::KeyRef<'a, L>;
802-
type Val<'a> = &'a ();
803-
type Time = layout::Time<L>;
804-
type TimeGat<'a> = layout::TimeRef<'a, L>;
805-
type Diff = layout::Diff<L>;
806-
type DiffGat<'a> = layout::DiffRef<'a, L>;
801+
impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> BatchReader for OrdKeyBatch<L> {
807802

808803
type Cursor = OrdKeyCursor<L>;
809804
fn cursor(&self) -> Self::Cursor {

differential-dataflow/src/trace/implementations/rhh.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,18 +273,19 @@ mod val_batch {
273273
pub updates: usize,
274274
}
275275

276-
impl<L: Layout> BatchReader for RhhValBatch<L>
276+
impl<L: Layout> LaidOut for RhhValBatch<L>
277277
where
278278
layout::Key<L>: Default + HashOrdered,
279279
for<'a> layout::KeyRef<'a, L>: HashOrdered,
280280
{
281-
type Key<'a> = layout::KeyRef<'a, L>;
282-
type Val<'a> = layout::ValRef<'a, L>;
283-
type Time = layout::Time<L>;
284-
type TimeGat<'a> = layout::TimeRef<'a, L>;
285-
type Diff = layout::Diff<L>;
286-
type DiffGat<'a> = layout::DiffRef<'a, L>;
281+
type Layout = L;
282+
}
287283

284+
impl<L: Layout> BatchReader for RhhValBatch<L>
285+
where
286+
layout::Key<L>: Default + HashOrdered,
287+
for<'a> layout::KeyRef<'a, L>: HashOrdered,
288+
{
288289
type Cursor = RhhValCursor<L>;
289290
fn cursor(&self) -> Self::Cursor {
290291
let mut cursor = RhhValCursor {

differential-dataflow/src/trace/implementations/spine_fueled.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,18 @@ pub struct Spine<B: Batch> {
9999
exert_logic: Option<ExertionLogic>,
100100
}
101101

102+
use crate::trace::LaidOut;
103+
impl<B: Batch> LaidOut for Spine<B> {
104+
type Layout = B::Layout;
105+
}
106+
102107
impl<B: Batch+Clone+'static> TraceReader for Spine<B> {
103-
type Key<'a> = B::Key<'a>;
104-
type Val<'a> = B::Val<'a>;
105-
type Time = B::Time;
106-
type TimeGat<'a> = B::TimeGat<'a>;
107-
type Diff = B::Diff;
108-
type DiffGat<'a> = B::DiffGat<'a>;
108+
// type Key<'a> = B::Key<'a>;
109+
// type Val<'a> = B::Val<'a>;
110+
// type Time = B::Time;
111+
// type TimeGat<'a> = B::TimeGat<'a>;
112+
// type Diff = B::Diff;
113+
// type DiffGat<'a> = B::DiffGat<'a>;
109114

110115
type Batch = B;
111116
type Storage = Vec<B>;

differential-dataflow/src/trace/mod.rs

Lines changed: 59 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ use timely::progress::{Antichain, frontier::AntichainRef};
1616
use timely::progress::Timestamp;
1717

1818
use crate::logging::Logger;
19-
use crate::difference::Semigroup;
20-
use crate::lattice::Lattice;
2119
pub use self::cursor::Cursor;
2220
pub use self::description::Description;
2321

@@ -46,34 +44,48 @@ pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(&'a [(usize, usize, usize
4644
/// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods
4745
/// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's
4846
/// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen).
49-
pub trait TraceReader {
50-
51-
/// Key by which updates are indexed.
52-
type Key<'a>: Copy + Clone + Ord;
53-
/// Values associated with keys.
54-
type Val<'a>: Copy + Clone;
55-
/// Timestamps associated with updates
56-
type Time: Timestamp + Lattice + Ord + Clone;
57-
/// Borrowed form of timestamp.
58-
type TimeGat<'a>: Copy;
59-
/// Owned form of update difference.
60-
type Diff: Semigroup + 'static;
61-
/// Borrowed form of update difference.
62-
type DiffGat<'a> : Copy;
63-
64-
/// An owned copy of a reference to a time.
65-
#[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { Self::Cursor::owned_time(time) }
66-
/// An owned copy of a reference to a diff.
67-
#[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { Self::Cursor::owned_diff(diff) }
47+
pub trait TraceReader : LayoutExt {
6848

6949
/// The type of an immutable collection of updates.
70-
type Batch: for<'a> BatchReader<Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>+Clone+'static;
50+
type Batch:
51+
'static +
52+
Clone +
53+
BatchReader +
54+
LaidOut<Layout = Self::Layout> +
55+
for<'a> LayoutExt<
56+
Key<'a> = Self::Key<'a>,
57+
Val<'a> = Self::Val<'a>,
58+
Time = Self::Time,
59+
TimeGat<'a> = Self::TimeGat<'a>,
60+
Diff = Self::Diff,
61+
DiffGat<'a> = Self::DiffGat<'a>,
62+
KeyContainer = Self::KeyContainer,
63+
ValContainer = Self::ValContainer,
64+
TimeContainer = Self::TimeContainer,
65+
DiffContainer = Self::DiffContainer,
66+
>;
67+
7168

7269
/// Storage type for `Self::Cursor`. Likely related to `Self::Batch`.
7370
type Storage;
7471

7572
/// The type used to enumerate the collections contents.
76-
type Cursor: for<'a> Cursor<Storage=Self::Storage, Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>;
73+
type Cursor:
74+
Cursor<Storage=Self::Storage> +
75+
LaidOut<Layout = Self::Layout> +
76+
for<'a> LayoutExt<
77+
Key<'a> = Self::Key<'a>,
78+
Val<'a> = Self::Val<'a>,
79+
Time = Self::Time,
80+
TimeGat<'a> = Self::TimeGat<'a>,
81+
Diff = Self::Diff,
82+
DiffGat<'a> = Self::DiffGat<'a>,
83+
KeyContainer = Self::KeyContainer,
84+
ValContainer = Self::ValContainer,
85+
TimeContainer = Self::TimeContainer,
86+
DiffContainer = Self::DiffContainer,
87+
>;
88+
7789

7890
/// Provides a cursor over updates contained in the trace.
7991
fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
@@ -221,34 +233,34 @@ pub trait Trace : TraceReader<Batch: Batch> {
221233
fn close(&mut self);
222234
}
223235

236+
use crate::trace::implementations::LaidOut;
237+
224238
/// A batch of updates whose contents may be read.
225239
///
226240
/// This is a restricted interface to batches of updates, which support the reading of the batch's contents,
227241
/// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is
228242
/// especially useful for views derived from other sources in ways that prevent the construction of batches
229243
/// from the type of data in the view (for example, filtered views, or views with extended time coordinates).
230-
pub trait BatchReader : Sized {
231-
/// Key by which updates are indexed.
232-
type Key<'a>: Copy + Clone + Ord;
233-
/// Values associated with keys.
234-
type Val<'a>: Copy + Clone;
235-
/// Timestamps associated with updates
236-
type Time: Timestamp + Lattice + Ord + Clone;
237-
/// Borrowed form of timestamp.
238-
type TimeGat<'a>: Copy;
239-
/// Owned form of update difference.
240-
type Diff: Semigroup + 'static;
241-
/// Borrowed form of update difference.
242-
type DiffGat<'a> : Copy;
243-
244-
/// An owned copy of a reference to a time.
245-
#[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { Self::Cursor::owned_time(time) }
246-
/// An owned copy of a reference to a diff.
247-
#[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { Self::Cursor::owned_diff(diff) }
244+
pub trait BatchReader : LayoutExt + Sized {
248245

249246
/// The type used to enumerate the batch's contents.
250-
type Cursor: for<'a> Cursor<Storage=Self, Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>;
251-
/// Acquires a cursor to the batch's contents.
247+
type Cursor:
248+
Cursor<Storage=Self> +
249+
LaidOut<Layout = Self::Layout> +
250+
for<'a> LayoutExt<
251+
Key<'a> = Self::Key<'a>,
252+
Val<'a> = Self::Val<'a>,
253+
Time = Self::Time,
254+
TimeGat<'a> = Self::TimeGat<'a>,
255+
Diff = Self::Diff,
256+
DiffGat<'a> = Self::DiffGat<'a>,
257+
KeyContainer = Self::KeyContainer,
258+
ValContainer = Self::ValContainer,
259+
TimeContainer = Self::TimeContainer,
260+
DiffContainer = Self::DiffContainer,
261+
>;
262+
263+
/// Acquires a cursor to the batch's contents.
252264
fn cursor(&self) -> Self::Cursor;
253265
/// The number of updates in the batch.
254266
fn len(&self) -> usize;
@@ -359,13 +371,11 @@ pub mod rc_blanket_impls {
359371
use timely::progress::{Antichain, frontier::AntichainRef};
360372
use super::{Batch, BatchReader, Builder, Merger, Cursor, Description};
361373

374+
impl<B: BatchReader> LaidOut for Rc<B> {
375+
type Layout = B::Layout;
376+
}
377+
362378
impl<B: BatchReader> BatchReader for Rc<B> {
363-
type Key<'a> = B::Key<'a>;
364-
type Val<'a> = B::Val<'a>;
365-
type Time = B::Time;
366-
type TimeGat<'a> = B::TimeGat<'a>;
367-
type Diff = B::Diff;
368-
type DiffGat<'a> = B::DiffGat<'a>;
369379

370380
/// The type used to enumerate the batch's contents.
371381
type Cursor = RcBatchCursor<B::Cursor>;

differential-dataflow/src/trace/wrappers/enter.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,25 @@ impl<Tr: TraceReader + Clone, TInner> Clone for TraceEnter<Tr, TInner> {
2525
}
2626
}
2727

28-
impl<Tr, TInner> TraceReader for TraceEnter<Tr, TInner>
28+
impl<Tr, TInner> LaidOut for TraceEnter<Tr, TInner>
2929
where
3030
Tr: TraceReader<Batch: Clone>,
3131
TInner: Refines<Tr::Time>+Lattice,
3232
{
33-
type Key<'a> = Tr::Key<'a>;
34-
type Val<'a> = Tr::Val<'a>;
35-
type Time = TInner;
36-
type TimeGat<'a> = &'a TInner;
37-
type Diff = Tr::Diff;
38-
type DiffGat<'a> = Tr::DiffGat<'a>;
33+
type Layout = (
34+
<Tr::Layout as Layout>::KeyContainer,
35+
<Tr::Layout as Layout>::ValContainer,
36+
Vec<TInner>,
37+
<Tr::Layout as Layout>::DiffContainer,
38+
<Tr::Layout as Layout>::OffsetContainer,
39+
);
40+
}
3941

42+
impl<Tr, TInner> TraceReader for TraceEnter<Tr, TInner>
43+
where
44+
Tr: TraceReader<Batch: Clone>,
45+
TInner: Refines<Tr::Time>+Lattice,
46+
{
4047
type Batch = BatchEnter<Tr::Batch, TInner>;
4148
type Storage = Tr::Storage;
4249
type Cursor = CursorEnter<Tr::Cursor, TInner>;
@@ -109,18 +116,25 @@ pub struct BatchEnter<B, TInner> {
109116
description: Description<TInner>,
110117
}
111118

112-
impl<B, TInner> BatchReader for BatchEnter<B, TInner>
119+
impl<B, TInner> LaidOut for BatchEnter<B, TInner>
113120
where
114121
B: BatchReader,
115122
TInner: Refines<B::Time>+Lattice,
116123
{
117-
type Key<'a> = B::Key<'a>;
118-
type Val<'a> = B::Val<'a>;
119-
type Time = TInner;
120-
type TimeGat<'a> = &'a TInner;
121-
type Diff = B::Diff;
122-
type DiffGat<'a> = B::DiffGat<'a>;
124+
type Layout = (
125+
<B::Layout as Layout>::KeyContainer,
126+
<B::Layout as Layout>::ValContainer,
127+
Vec<TInner>,
128+
<B::Layout as Layout>::DiffContainer,
129+
<B::Layout as Layout>::OffsetContainer,
130+
);
131+
}
123132

133+
impl<B, TInner> BatchReader for BatchEnter<B, TInner>
134+
where
135+
B: BatchReader,
136+
TInner: Refines<B::Time>+Lattice,
137+
{
124138
type Cursor = BatchCursorEnter<B::Cursor, TInner>;
125139

126140
fn cursor(&self) -> Self::Cursor {

0 commit comments

Comments
 (0)