Skip to content

Commit 942e5a3

Browse files
Clean up token use in dataflow (MaterializeInc#10521)
* Clean up token use in dataflow * update comments
1 parent 087dba3 commit 942e5a3

File tree

7 files changed

+56
-111
lines changed

7 files changed

+56
-111
lines changed

src/dataflow/src/render/mod.rs

Lines changed: 26 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -197,21 +197,6 @@ pub struct StorageState {
197197
pub persist: Option<RuntimeClient>,
198198
}
199199

200-
/// A container for "tokens" that are relevant to an in-construction dataflow.
201-
///
202-
/// Tokens are used by consumers of data to keep their sources of data running.
203-
/// Once all tokens referencing a source are dropped, the source can shut down,
204-
/// which will wind down (eventually) the dataflow containing it.
205-
#[derive(Default)]
206-
pub struct RelevantTokens {
207-
/// The source tokens for all sources that have been built in this context.
208-
pub source_tokens: HashMap<GlobalId, Rc<Option<SourceToken>>>,
209-
/// Any other tokens that need to be dropped when an object is dropped.
210-
pub additional_tokens: HashMap<GlobalId, Vec<Rc<dyn Any>>>,
211-
/// Tokens for CDCv2 capture sources that have been built in this context.
212-
pub cdc_tokens: HashMap<GlobalId, Rc<dyn Any>>,
213-
}
214-
215200
/// Information about each source that must be communicated between storage and compute layers.
216201
pub struct SourceBoundary {
217202
/// Captured `row` updates representing a differential collection.
@@ -222,9 +207,7 @@ pub struct SourceBoundary {
222207
Rc<EventLink<repr::Timestamp, (DataflowError, repr::Timestamp, repr::Diff)>>,
223208
>,
224209
/// A token that should be dropped to terminate the source.
225-
pub token: Rc<Option<crate::source::SourceToken>>,
226-
/// Additional tokens that should be dropped to terminate the source.
227-
pub additional_tokens: Vec<Rc<dyn std::any::Any>>,
210+
pub token: Rc<dyn std::any::Any>,
228211
}
229212

230213
/// Assemble the "storage" side of a dataflow, i.e. the sources.
@@ -267,19 +250,18 @@ pub fn build_storage_dataflow<A: Allocate>(
267250

268251
// Import declared sources into the rendering context.
269252
for (src_id, source) in &dataflow.source_imports {
270-
let ((ok, err), (source_token, additional_tokens)) =
271-
crate::render::sources::import_source(
272-
&debug_name,
273-
dataflow_id,
274-
&as_of,
275-
source.clone(),
276-
storage_state,
277-
region,
278-
materialized_logging.clone(),
279-
src_id.clone(),
280-
now.clone(),
281-
source_metrics,
282-
);
253+
let ((ok, err), token) = crate::render::sources::import_source(
254+
&debug_name,
255+
dataflow_id,
256+
&as_of,
257+
source.clone(),
258+
storage_state,
259+
region,
260+
materialized_logging.clone(),
261+
src_id.clone(),
262+
now.clone(),
263+
source_metrics,
264+
);
283265

284266
let ok_activator = RcActivator::new(format!("{debug_name}-ok"), 1);
285267
let err_activator = RcActivator::new(format!("{debug_name}-err"), 1);
@@ -294,8 +276,7 @@ pub fn build_storage_dataflow<A: Allocate>(
294276
SourceBoundary {
295277
ok: ActivatedEventPusher::<_>::clone(&ok_handle),
296278
err: ActivatedEventPusher::<_>::clone(&err_handle),
297-
token: source_token,
298-
additional_tokens,
279+
token,
299280
},
300281
);
301282

@@ -330,7 +311,7 @@ pub fn build_compute_dataflow<A: Allocate>(
330311
// alternate type signatures.
331312
scope.clone().region_named(&name, |region| {
332313
let mut context = Context::for_dataflow(&dataflow, scope.addr().into_element());
333-
let mut tokens = RelevantTokens::default();
314+
let mut tokens = BTreeMap::new();
334315

335316
// Import declared sources into the rendering context.
336317
for (source_id, source) in sources.into_iter() {
@@ -357,13 +338,8 @@ pub fn build_compute_dataflow<A: Allocate>(
357338
);
358339

359340
// Associate returned tokens with the source identifier.
360-
let prior = tokens.source_tokens.insert(source_id, source.token);
341+
let prior = tokens.insert(source_id, source.token);
361342
assert!(prior.is_none());
362-
tokens
363-
.additional_tokens
364-
.entry(source_id)
365-
.or_insert_with(Vec::new)
366-
.extend(source.additional_tokens);
367343
}
368344

369345
// Import declared indexes into the rendering context.
@@ -423,7 +399,7 @@ where
423399
fn import_index(
424400
&mut self,
425401
compute_state: &mut ComputeState,
426-
tokens: &mut RelevantTokens,
402+
tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
427403
scope: &mut G,
428404
region: &mut Child<'g, G, G::Timestamp>,
429405
idx_id: GlobalId,
@@ -450,15 +426,10 @@ where
450426
ArrangementFlavor::Trace(idx_id, ok_arranged, err_arranged),
451427
),
452428
);
453-
tokens
454-
.additional_tokens
455-
.entry(idx_id)
456-
.or_insert_with(Vec::new)
457-
.push(Rc::new((
458-
ok_button.press_on_drop(),
459-
err_button.press_on_drop(),
460-
token,
461-
)));
429+
tokens.insert(
430+
idx_id,
431+
Rc::new((ok_button.press_on_drop(), err_button.press_on_drop(), token)),
432+
);
462433
} else {
463434
panic!(
464435
"import of index {} failed while building dataflow {}",
@@ -480,23 +451,18 @@ where
480451
fn export_index(
481452
&mut self,
482453
compute_state: &mut ComputeState,
483-
tokens: &mut RelevantTokens,
454+
tokens: &mut BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
484455
import_ids: HashSet<GlobalId>,
485456
idx_id: GlobalId,
486457
idx: &IndexDesc,
487458
) {
488459
// put together tokens that belong to the export
489-
let mut needed_source_tokens = Vec::new();
490-
let mut needed_additional_tokens = Vec::new();
460+
let mut needed_tokens = Vec::new();
491461
for import_id in import_ids {
492-
if let Some(addls) = tokens.additional_tokens.get(&import_id) {
493-
needed_additional_tokens.extend_from_slice(addls);
494-
}
495-
if let Some(source_token) = tokens.source_tokens.get(&import_id) {
496-
needed_source_tokens.push(Rc::clone(&source_token));
462+
if let Some(token) = tokens.get(&import_id) {
463+
needed_tokens.push(Rc::clone(&token));
497464
}
498465
}
499-
let tokens = Rc::new((needed_source_tokens, needed_additional_tokens));
500466
let bundle = self.lookup_id(Id::Global(idx_id)).unwrap_or_else(|| {
501467
panic!(
502468
"Arrangement alarmingly absent! id: {:?}",
@@ -507,7 +473,7 @@ where
507473
Some(ArrangementFlavor::Local(oks, errs)) => {
508474
compute_state.traces.set(
509475
idx_id,
510-
TraceBundle::new(oks.trace, errs.trace).with_drop(tokens),
476+
TraceBundle::new(oks.trace, errs.trace).with_drop(needed_tokens),
511477
);
512478
}
513479
Some(ArrangementFlavor::Trace(gid, _, _)) => {

src/dataflow/src/render/sinks.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use interchange::envelopes::{combine_at_timestamp, dbz_format, upsert_format};
2424
use repr::{Datum, Diff, Row, Timestamp};
2525

2626
use crate::render::context::Context;
27-
use crate::render::RelevantTokens;
2827
use crate::sink::SinkBaseMetrics;
2928

3029
impl<G> Context<G, Row, Timestamp>
@@ -35,7 +34,7 @@ where
3534
pub(crate) fn export_sink(
3635
&mut self,
3736
compute_state: &mut crate::render::ComputeState,
38-
tokens: &mut RelevantTokens,
37+
tokens: &mut std::collections::BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
3938
import_ids: HashSet<GlobalId>,
4039
sink_id: GlobalId,
4140
sink: &SinkDesc,
@@ -44,15 +43,10 @@ where
4443
let sink_render = get_sink_render_for(&sink.connector);
4544

4645
// put together tokens that belong to the export
47-
let mut needed_source_tokens = Vec::new();
48-
let mut needed_additional_tokens = Vec::new();
49-
let mut needed_sink_tokens = Vec::new();
46+
let mut needed_tokens = Vec::new();
5047
for import_id in import_ids {
51-
if let Some(addls) = tokens.additional_tokens.get(&import_id) {
52-
needed_additional_tokens.extend_from_slice(addls);
53-
}
54-
if let Some(source_token) = tokens.source_tokens.get(&import_id) {
55-
needed_source_tokens.push(Rc::clone(&source_token));
48+
if let Some(token) = tokens.get(&import_id) {
49+
needed_tokens.push(Rc::clone(&token))
5650
}
5751
}
5852

@@ -89,17 +83,12 @@ where
8983
sink_render.render_continuous_sink(compute_state, sink, sink_id, collection, metrics);
9084

9185
if let Some(sink_token) = sink_token {
92-
needed_sink_tokens.push(sink_token);
86+
needed_tokens.push(sink_token);
9387
}
9488

95-
let tokens = Rc::new((
96-
needed_sink_tokens,
97-
needed_source_tokens,
98-
needed_additional_tokens,
99-
));
10089
compute_state
10190
.dataflow_tokens
102-
.insert(sink_id, Box::new(tokens));
91+
.insert(sink_id, Box::new(needed_tokens));
10392
}
10493
}
10594

@@ -231,7 +220,7 @@ where
231220
sink_id: GlobalId,
232221
sinked_collection: Collection<G, (Option<Row>, Option<Row>), Diff>,
233222
metrics: &SinkBaseMetrics,
234-
) -> Option<Box<dyn Any>>
223+
) -> Option<Rc<dyn Any>>
235224
where
236225
G: Scope<Timestamp = Timestamp>;
237226
}

src/dataflow/src/render/sources.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ where
129129

130130
/// Constructs a `CollectionBundle` and tokens from source arguments.
131131
///
132-
/// The first return value is the collection bundle, and the second a pair of source and additional
133-
/// tokens, that are used to control the demolition of the source.
132+
/// The first returned pair are the row and error collections, and the
133+
/// second is a token that will keep the source alive as long as it is held.
134134
pub(crate) fn import_source<G>(
135135
dataflow_debug_name: &String,
136136
dataflow_id: usize,
@@ -148,10 +148,7 @@ pub(crate) fn import_source<G>(
148148
base_metrics: &SourceBaseMetrics,
149149
) -> (
150150
(Collection<G, Row>, Collection<G, DataflowError>),
151-
(
152-
Rc<Option<crate::source::SourceToken>>,
153-
Vec<Rc<dyn std::any::Any>>,
154-
),
151+
Rc<dyn std::any::Any>,
155152
)
156153
where
157154
G: Scope<Timestamp = Timestamp>,
@@ -164,7 +161,7 @@ where
164161
}
165162

166163
// Tokens that we should return from the method.
167-
let mut additional_tokens: Vec<Rc<dyn std::any::Any>> = Vec::new();
164+
let mut needed_tokens: Vec<Rc<dyn std::any::Any>> = Vec::new();
168165

169166
// Before proceeding, we may need to remediate sources with non-trivial relational
170167
// expressions that post-process the bare source. If the expression is trivial, a
@@ -182,7 +179,7 @@ where
182179
storage_state.local_inputs.insert(src_id, local_input);
183180

184181
// TODO(mcsherry): Local tables are a special non-source we should relocate.
185-
((ok, err), (Rc::new(None), Vec::new()))
182+
((ok, err), Rc::new(()))
186183
}
187184

188185
SourceConnector::External {
@@ -377,7 +374,7 @@ where
377374
schema_registry_config,
378375
confluent_wire_format,
379376
);
380-
additional_tokens.push(Rc::new(token));
377+
needed_tokens.push(Rc::new(token));
381378
(oks, None)
382379
} else {
383380
let (results, extra_token) = match ok_source {
@@ -401,7 +398,7 @@ where
401398
),
402399
};
403400
if let Some(tok) = extra_token {
404-
additional_tokens.push(Rc::new(tok));
401+
needed_tokens.push(Rc::new(tok));
405402
}
406403

407404
// render envelopes
@@ -467,7 +464,7 @@ where
467464
&src_id,
468465
&source_name,
469466
storage_state,
470-
&mut additional_tokens,
467+
&mut needed_tokens,
471468
);
472469

473470
(sealed_upsert, upsert_err)
@@ -518,7 +515,7 @@ where
518515
&src_id,
519516
&source_name,
520517
storage_state,
521-
&mut additional_tokens,
518+
&mut needed_tokens,
522519
);
523520

524521
// NOTE: Persistence errors don't go through the same
@@ -639,11 +636,10 @@ where
639636
.or_insert_with(Vec::new)
640637
.push(Rc::downgrade(&source_token));
641638

642-
// Return the source token for capability manipulation, and any additional tokens.
643-
(
644-
(collection, err_collection),
645-
(source_token, additional_tokens),
646-
)
639+
needed_tokens.push(source_token);
640+
641+
// Return the collections and any needed tokens.
642+
((collection, err_collection), Rc::new(needed_tokens))
647643
}
648644
}
649645
}
@@ -799,7 +795,7 @@ fn seal_and_await<G, D1, D2, K1, V1, K2, V2>(
799795
source_id: &GlobalId,
800796
source_name: &str,
801797
storage_state: &mut StorageState,
802-
additional_tokens: &mut Vec<Rc<dyn std::any::Any>>,
798+
needed_tokens: &mut Vec<Rc<dyn std::any::Any>>,
803799
) -> Stream<G, (D1, Timestamp, Diff)>
804800
where
805801
G: Scope<Timestamp = Timestamp>,
@@ -844,7 +840,7 @@ where
844840
compaction_handle,
845841
);
846842

847-
additional_tokens.push(source_token);
843+
needed_tokens.push(source_token);
848844

849845
// Don't send data forward to "dataflow" until the frontier
850846
// tells us that we both persisted and sealed it.

src/dataflow/src/sink/avro_ocf.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
use std::any::Any;
1111
use std::fs::OpenOptions;
12+
use std::rc::Rc;
1213

1314
use differential_dataflow::{Collection, Hashable};
1415

@@ -50,7 +51,7 @@ where
5051
sink_id: GlobalId,
5152
sinked_collection: Collection<G, (Option<Row>, Option<Row>), Diff>,
5253
_metrics: &SinkBaseMetrics,
53-
) -> Option<Box<dyn Any>>
54+
) -> Option<Rc<dyn Any>>
5455
where
5556
G: Scope<Timestamp = Timestamp>,
5657
{

src/dataflow/src/sink/kafka.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ where
8787
sink_id: GlobalId,
8888
sinked_collection: Collection<G, (Option<Row>, Option<Row>), Diff>,
8989
metrics: &SinkBaseMetrics,
90-
) -> Option<Box<dyn Any>>
90+
) -> Option<Rc<dyn Any>>
9191
where
9292
G: Scope<Timestamp = Timestamp>,
9393
{
@@ -950,7 +950,7 @@ fn kafka<G>(
950950
as_of: SinkAsOf,
951951
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
952952
metrics: &KafkaBaseMetrics,
953-
) -> Box<dyn Any>
953+
) -> Rc<dyn Any>
954954
where
955955
G: Scope<Timestamp = Timestamp>,
956956
{
@@ -1027,7 +1027,7 @@ pub fn produce_to_kafka<G>(
10271027
shared_gate_ts: Rc<Cell<Option<Timestamp>>>,
10281028
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
10291029
metrics: &KafkaBaseMetrics,
1030-
) -> Box<dyn Any>
1030+
) -> Rc<dyn Any>
10311031
where
10321032
G: Scope<Timestamp = Timestamp>,
10331033
{
@@ -1287,7 +1287,7 @@ where
12871287
}),
12881288
);
12891289

1290-
Box::new(KafkaSinkToken { shutdown_flag })
1290+
Rc::new(KafkaSinkToken { shutdown_flag })
12911291
}
12921292

12931293
/// Encodes a stream of `(Option<Row>, Option<Row>)` updates using the specified encoder.

0 commit comments

Comments
 (0)