Skip to content

Commit faceb47

Browse files
committed
Remove generics from upsert
1 parent f19aca6 commit faceb47

File tree

1 file changed

+9
-11
lines changed
  • differential-dataflow/src/operators/arrange

1 file changed

+9
-11
lines changed

differential-dataflow/src/operators/arrange/upsert.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
//! use differential_dataflow::operators::arrange::upsert;
6060
//!
6161
//! let stream = scope.input_from(&mut input);
62-
//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
62+
//! let arranged = upsert::arrange_from_upsert::<_, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
6363
//!
6464
//! arranged
6565
//! .as_collection(|k,v| (k.clone(), v.clone()))
@@ -127,21 +127,19 @@ use super::TraceAgent;
127127
/// This method is only implemented for totally ordered times, as we do not yet
128128
/// understand what a "sequence" of upserts would mean for partially ordered
129129
/// timestamps.
130-
pub fn arrange_from_upsert<G, K, V, Bu, Tr>(
131-
stream: &Stream<G, (K, Option<V>, G::Timestamp)>,
130+
pub fn arrange_from_upsert<G, Bu, Tr>(
131+
stream: &Stream<G, (Tr::KeyOwn, Option<Tr::ValOwn>, G::Timestamp)>,
132132
name: &str,
133133
) -> Arranged<G, TraceAgent<Tr>>
134134
where
135135
G: Scope<Timestamp=Tr::Time>,
136136
Tr: for<'a> Trace<
137-
KeyOwn = K,
138-
ValOwn = V,
137+
KeyOwn: ExchangeData+Hashable+std::hash::Hash,
138+
ValOwn: ExchangeData,
139139
Time: TotalOrder+ExchangeData,
140140
Diff=isize,
141141
>+'static,
142-
K: ExchangeData+Hashable+std::hash::Hash,
143-
V: ExchangeData,
144-
Bu: Builder<Time=G::Timestamp, Input = Vec<((K, V), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
142+
Bu: Builder<Time=G::Timestamp, Input = Vec<((Tr::KeyOwn, Tr::ValOwn), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
145143
{
146144
let mut reader: Option<TraceAgent<Tr>> = None;
147145

@@ -150,7 +148,7 @@ where
150148

151149
let reader = &mut reader;
152150

153-
let exchange = Exchange::new(move |update: &(K,Option<V>,G::Timestamp)| (update.0).hashed().into());
151+
let exchange = Exchange::new(move |update: &(Tr::KeyOwn,Option<Tr::ValOwn>,G::Timestamp)| (update.0).hashed().into());
154152

155153
stream.unary_frontier(exchange, name, move |_capability, info| {
156154

@@ -175,7 +173,7 @@ where
175173
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
176174

177175
// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
178-
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, K, Option<V>)>>::new();
176+
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::KeyOwn, Option<Tr::ValOwn>)>>::new();
179177
let mut updates = Vec::new();
180178

181179
move |input, output| {
@@ -238,7 +236,7 @@ where
238236
for (key, mut list) in to_process {
239237

240238
// The prior value associated with the key.
241-
let mut prev_value: Option<V> = None;
239+
let mut prev_value: Option<Tr::ValOwn> = None;
242240

243241
// Attempt to find the key in the trace.
244242
trace_cursor.seek_key(&trace_storage, Tr::KeyContainer::borrow_as(&key));

0 commit comments

Comments
 (0)