Skip to content

Commit 00f11ab

Browse files
committed
compute: tokenize delta join closure
This commit adds a shutdown token check to the closure we pass to the delta join operator. When the dataflow is shutting down, this makes the join closure drain all input data, rather than processing it. As a results, delta join operators shut down faster and emit less data, which in turn speeds up shutdown of downstream operators.
1 parent ff5f3a8 commit 00f11ab

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

src/compute/src/render/join/delta_join.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//! Consult [DeltaJoinPlan] documentation for details.
1313
1414
#![allow(clippy::op_ref)]
15+
1516
use std::collections::{BTreeMap, BTreeSet};
1617

1718
use timely::dataflow::Scope;
@@ -24,7 +25,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena};
2425
use mz_storage_client::types::errors::DataflowError;
2526
use mz_timely_util::operator::CollectionExt;
2627

27-
use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
28+
use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken};
2829

2930
impl<G> Context<G, Row>
3031
where
@@ -198,6 +199,7 @@ where
198199
stream_thinning,
199200
|t1, t2| t1.le(t2),
200201
closure,
202+
self.shutdown_token.clone(),
201203
)
202204
} else {
203205
build_halfjoin(
@@ -207,6 +209,7 @@ where
207209
stream_thinning,
208210
|t1, t2| t1.lt(t2),
209211
closure,
212+
self.shutdown_token.clone(),
210213
)
211214
}
212215
}
@@ -219,6 +222,7 @@ where
219222
stream_thinning,
220223
|t1, t2| t1.le(t2),
221224
closure,
225+
self.shutdown_token.clone(),
222226
)
223227
} else {
224228
build_halfjoin(
@@ -228,6 +232,7 @@ where
228232
stream_thinning,
229233
|t1, t2| t1.lt(t2),
230234
closure,
235+
self.shutdown_token.clone(),
231236
)
232237
}
233238
}
@@ -314,6 +319,7 @@ fn build_halfjoin<G, Tr, CF>(
314319
prev_thinning: Vec<usize>,
315320
comparison: CF,
316321
closure: JoinClosure,
322+
shutdown_token: ShutdownToken,
317323
) -> (
318324
Collection<G, (Row, G::Timestamp), Diff>,
319325
Collection<G, DataflowError, Diff>,
@@ -364,6 +370,10 @@ where
364370
|_timer, count| count > 1_000_000,
365371
// TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
366372
move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
373+
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
374+
// shutting down.
375+
shutdown_token.probe()?;
376+
367377
let temp_storage = RowArena::new();
368378
let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]);
369379
let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
@@ -396,6 +406,10 @@ where
396406
|_timer, count| count > 1_000_000,
397407
// TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
398408
move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
409+
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
410+
// shutting down.
411+
shutdown_token.probe()?;
412+
399413
let temp_storage = RowArena::new();
400414
let mut datums_local = datums.borrow_with_many(&[key, stream_row, lookup_row]);
401415
let row = closure

0 commit comments

Comments
 (0)