102
102
103
103
use std:: any:: Any ;
104
104
use std:: cell:: RefCell ;
105
- use std:: collections:: { HashMap , HashSet } ;
105
+ use std:: collections:: { BTreeMap , HashMap , HashSet } ;
106
106
use std:: rc:: Rc ;
107
107
use std:: rc:: Weak ;
108
+ use std:: time:: Duration ;
108
109
109
110
use differential_dataflow:: AsCollection ;
110
111
use timely:: communication:: Allocate ;
112
+ use timely:: dataflow:: operators:: capture:: EventLink ;
111
113
use timely:: dataflow:: operators:: to_stream:: ToStream ;
114
+ use timely:: dataflow:: operators:: Capture ;
112
115
use timely:: dataflow:: scopes:: Child ;
113
116
use timely:: dataflow:: Scope ;
117
+
114
118
use timely:: progress:: Antichain ;
115
119
use timely:: worker:: Worker as TimelyWorker ;
116
120
121
+ use crate :: activator:: RcActivator ;
117
122
use dataflow_types:: * ;
118
123
use expr:: { GlobalId , Id } ;
119
124
use itertools:: Itertools ;
@@ -123,10 +128,12 @@ use persist::client::RuntimeClient;
123
128
use repr:: { Row , Timestamp } ;
124
129
125
130
use crate :: arrangement:: manager:: { TraceBundle , TraceManager } ;
131
+ use crate :: event:: ActivatedEventPusher ;
126
132
use crate :: metrics:: Metrics ;
127
133
use crate :: render:: context:: CollectionBundle ;
128
134
use crate :: render:: context:: { ArrangementFlavor , Context } ;
129
135
use crate :: render:: sources:: PersistedSourceManager ;
136
+ use crate :: replay:: MzReplay ;
130
137
use crate :: server:: LocalInput ;
131
138
use crate :: sink:: SinkBaseMetrics ;
132
139
use crate :: source:: metrics:: SourceBaseMetrics ;
@@ -205,28 +212,47 @@ pub struct RelevantTokens {
205
212
pub cdc_tokens : HashMap < GlobalId , Rc < dyn Any > > ,
206
213
}
207
214
208
- /// Build a dataflow from a description.
209
- pub fn build_dataflow < A : Allocate > (
215
+ /// Information about each source that must be communicated between storage and compute layers.
216
+ pub struct SourceBoundary {
217
+ /// Captured `row` updates representing a differential collection.
218
+ pub ok :
219
+ ActivatedEventPusher < Rc < EventLink < repr:: Timestamp , ( Row , repr:: Timestamp , repr:: Diff ) > > > ,
220
+ /// Captured error updates representing a differential collection.
221
+ pub err : ActivatedEventPusher <
222
+ Rc < EventLink < repr:: Timestamp , ( DataflowError , repr:: Timestamp , repr:: Diff ) > > ,
223
+ > ,
224
+ /// A token that should be dropped to terminate the source.
225
+ pub token : Rc < Option < crate :: source:: SourceToken > > ,
226
+ /// Additional tokens that should be dropped to terminate the source.
227
+ pub additional_tokens : Vec < Rc < dyn std:: any:: Any > > ,
228
+ }
229
+
230
+ /// Assemble the "storage" side of a dataflow, i.e. the sources.
231
+ ///
232
+ /// This method creates a new dataflow to host the implementations of sources for the `dataflow`
233
+ /// argument, and returns assets for each source that can import the results into a new dataflow.
234
+ pub fn build_storage_dataflow < A : Allocate > (
210
235
timely_worker : & mut TimelyWorker < A > ,
211
- compute_state : & mut ComputeState ,
212
236
storage_state : & mut StorageState ,
213
- dataflow : DataflowDescription < plan:: Plan > ,
237
+ dataflow : & DataflowDescription < plan:: Plan > ,
214
238
now : NowFn ,
215
239
source_metrics : & SourceBaseMetrics ,
216
- sink_metrics : & SinkBaseMetrics ,
217
- ) {
240
+ ) -> BTreeMap < GlobalId , SourceBoundary > {
218
241
let worker_logging = timely_worker. log_register ( ) . get ( "timely" ) ;
219
242
let name = format ! ( "Dataflow: {}" , & dataflow. debug_name) ;
220
243
let materialized_logging = timely_worker. log_register ( ) . get ( "materialized" ) ;
221
244
245
+ let mut results = BTreeMap :: new ( ) ;
246
+
222
247
timely_worker. dataflow_core ( & name, worker_logging, Box :: new ( ( ) ) , |_, scope| {
223
248
// The scope.clone() occurs to allow import in the region.
224
249
// We build a region here to establish a pattern of a scope inside the dataflow,
225
250
// so that other similar uses (e.g. with iterative scopes) do not require weird
226
251
// alternate type signatures.
227
252
scope. clone ( ) . region_named ( & name, |region| {
228
- let mut context = Context :: for_dataflow ( & dataflow, scope. addr ( ) . into_element ( ) ) ;
229
- let mut tokens = RelevantTokens :: default ( ) ;
253
+ let as_of = dataflow. as_of . clone ( ) . unwrap ( ) ;
254
+ let dataflow_id = scope. addr ( ) . into_element ( ) ;
255
+ let debug_name = format ! ( "{}-sources" , dataflow. debug_name) ;
230
256
231
257
assert ! (
232
258
!dataflow
@@ -241,11 +267,11 @@ pub fn build_dataflow<A: Allocate>(
241
267
242
268
// Import declared sources into the rendering context.
243
269
for ( src_id, source) in & dataflow. source_imports {
244
- let ( collection_bundle , ( source_token, additional_tokens) ) =
270
+ let ( ( ok , err ) , ( source_token, additional_tokens) ) =
245
271
crate :: render:: sources:: import_source (
246
- & context . debug_name ,
247
- context . dataflow_id ,
248
- & context . as_of_frontier ,
272
+ & debug_name,
273
+ dataflow_id,
274
+ & as_of ,
249
275
source. clone ( ) ,
250
276
storage_state,
251
277
region,
@@ -255,17 +281,89 @@ pub fn build_dataflow<A: Allocate>(
255
281
source_metrics,
256
282
) ;
257
283
284
+ let ok_activator = RcActivator :: new ( format ! ( "{debug_name}-ok" ) , 1 ) ;
285
+ let err_activator = RcActivator :: new ( format ! ( "{debug_name}-err" ) , 1 ) ;
286
+
287
+ let ok_handle =
288
+ ActivatedEventPusher :: new ( Rc :: new ( EventLink :: new ( ) ) , ok_activator. clone ( ) ) ;
289
+ let err_handle =
290
+ ActivatedEventPusher :: new ( Rc :: new ( EventLink :: new ( ) ) , err_activator. clone ( ) ) ;
291
+
292
+ results. insert (
293
+ * src_id,
294
+ SourceBoundary {
295
+ ok : ActivatedEventPusher :: < _ > :: clone ( & ok_handle) ,
296
+ err : ActivatedEventPusher :: < _ > :: clone ( & err_handle) ,
297
+ token : source_token,
298
+ additional_tokens,
299
+ } ,
300
+ ) ;
301
+
302
+ ok. inner . capture_into ( ok_handle) ;
303
+ err. inner . capture_into ( err_handle) ;
304
+ }
305
+ } )
306
+ } ) ;
307
+
308
+ results
309
+ }
310
+
311
+ /// Assemble the "compute" side of a dataflow, i.e. all but the sources.
312
+ ///
313
+ /// This method imports sources from provided assets, and then builds the remaining
314
+ /// dataflow using "compute-local" assets like shared arrangements, and producing
315
+ /// both arrangements and sinks.
316
+ pub fn build_compute_dataflow < A : Allocate > (
317
+ timely_worker : & mut TimelyWorker < A > ,
318
+ compute_state : & mut ComputeState ,
319
+ sources : BTreeMap < GlobalId , SourceBoundary > ,
320
+ dataflow : DataflowDescription < plan:: Plan > ,
321
+ sink_metrics : & SinkBaseMetrics ,
322
+ ) {
323
+ let worker_logging = timely_worker. log_register ( ) . get ( "timely" ) ;
324
+ let name = format ! ( "Dataflow: {}" , & dataflow. debug_name) ;
325
+
326
+ timely_worker. dataflow_core ( & name, worker_logging, Box :: new ( ( ) ) , |_, scope| {
327
+ // The scope.clone() occurs to allow import in the region.
328
+ // We build a region here to establish a pattern of a scope inside the dataflow,
329
+ // so that other similar uses (e.g. with iterative scopes) do not require weird
330
+ // alternate type signatures.
331
+ scope. clone ( ) . region_named ( & name, |region| {
332
+ let mut context = Context :: for_dataflow ( & dataflow, scope. addr ( ) . into_element ( ) ) ;
333
+ let mut tokens = RelevantTokens :: default ( ) ;
334
+
335
+ // Import declared sources into the rendering context.
336
+ for ( source_id, source) in sources. into_iter ( ) {
258
337
// Associate collection bundle with the source identifier.
259
- context. insert_id ( Id :: Global ( * src_id) , collection_bundle) ;
338
+ let ok = Some ( source. ok . inner )
339
+ . mz_replay (
340
+ region,
341
+ & format ! ( "{name}-{source_id}" ) ,
342
+ Duration :: MAX ,
343
+ source. ok . activator ,
344
+ )
345
+ . as_collection ( ) ;
346
+ let err = Some ( source. err . inner )
347
+ . mz_replay (
348
+ region,
349
+ & format ! ( "{name}-{source_id}-err" ) ,
350
+ Duration :: MAX ,
351
+ source. err . activator ,
352
+ )
353
+ . as_collection ( ) ;
354
+ context. insert_id (
355
+ Id :: Global ( source_id) ,
356
+ CollectionBundle :: from_collections ( ok, err) ,
357
+ ) ;
260
358
261
359
// Associate returned tokens with the source identifier.
262
- let prior = tokens. source_tokens . insert ( * src_id , source_token ) ;
360
+ let prior = tokens. source_tokens . insert ( source_id , source . token ) ;
263
361
assert ! ( prior. is_none( ) ) ;
264
362
tokens
265
363
. additional_tokens
266
- . entry ( * src_id )
364
+ . entry ( source_id )
267
365
. or_insert_with ( Vec :: new)
268
- . extend ( additional_tokens) ;
366
+ . extend ( source . additional_tokens ) ;
269
367
}
270
368
271
369
// Import declared indexes into the rendering context.
0 commit comments