@@ -127,21 +127,19 @@ use super::TraceAgent;
127
127
/// This method is only implemented for totally ordered times, as we do not yet
128
128
/// understand what a "sequence" of upserts would mean for partially ordered
129
129
/// timestamps.
130
- pub fn arrange_from_upsert < G , K , V , Bu , Tr > (
131
- stream : & Stream < G , ( K , Option < V > , G :: Timestamp ) > ,
130
+ pub fn arrange_from_upsert < G , Bu , Tr > (
131
+ stream : & Stream < G , ( Tr :: KeyOwn , Option < Tr :: ValOwn > , G :: Timestamp ) > ,
132
132
name : & str ,
133
133
) -> Arranged < G , TraceAgent < Tr > >
134
134
where
135
135
G : Scope < Timestamp =Tr :: Time > ,
136
136
Tr : for < ' a > Trace <
137
- KeyOwn = K ,
138
- ValOwn = V ,
137
+ KeyOwn : ExchangeData + Hashable +std :: hash :: Hash ,
138
+ ValOwn : ExchangeData ,
139
139
Time : TotalOrder +ExchangeData ,
140
140
Diff =isize ,
141
141
> +' static ,
142
- K : ExchangeData +Hashable +std:: hash:: Hash ,
143
- V : ExchangeData ,
144
- Bu : Builder < Time =G :: Timestamp , Input = Vec < ( ( K , V ) , Tr :: Time , Tr :: Diff ) > , Output = Tr :: Batch > ,
142
+ Bu : Builder < Time =G :: Timestamp , Input = Vec < ( ( Tr :: KeyOwn , Tr :: ValOwn ) , Tr :: Time , Tr :: Diff ) > , Output = Tr :: Batch > ,
145
143
{
146
144
let mut reader: Option < TraceAgent < Tr > > = None ;
147
145
@@ -150,7 +148,7 @@ where
150
148
151
149
let reader = & mut reader;
152
150
153
- let exchange = Exchange :: new ( move |update : & ( K , Option < V > , G :: Timestamp ) | ( update. 0 ) . hashed ( ) . into ( ) ) ;
151
+ let exchange = Exchange :: new ( move |update : & ( Tr :: KeyOwn , Option < Tr :: ValOwn > , G :: Timestamp ) | ( update. 0 ) . hashed ( ) . into ( ) ) ;
154
152
155
153
stream. unary_frontier ( exchange, name, move |_capability, info| {
156
154
@@ -175,7 +173,7 @@ where
175
173
let mut prev_frontier = Antichain :: from_elem ( <G :: Timestamp as Timestamp >:: minimum ( ) ) ;
176
174
177
175
// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
178
- let mut priority_queue = BinaryHeap :: < std:: cmp:: Reverse < ( G :: Timestamp , K , Option < V > ) > > :: new ( ) ;
176
+ let mut priority_queue = BinaryHeap :: < std:: cmp:: Reverse < ( G :: Timestamp , Tr :: KeyOwn , Option < Tr :: ValOwn > ) > > :: new ( ) ;
179
177
let mut updates = Vec :: new ( ) ;
180
178
181
179
move |input, output| {
@@ -238,7 +236,7 @@ where
238
236
for ( key, mut list) in to_process {
239
237
240
238
// The prior value associated with the key.
241
- let mut prev_value: Option < V > = None ;
239
+ let mut prev_value: Option < Tr :: ValOwn > = None ;
242
240
243
241
// Attempt to find the key in the trace.
244
242
trace_cursor. seek_key ( & trace_storage, Tr :: KeyContainer :: borrow_as ( & key) ) ;
0 commit comments