Skip to content

Commit 883ab59

Browse files
committed
compute: tokenize linear join closure
This commit adds a shutdown token check to the closure we pass to the differential join operator. When the dataflow is shutting down, this makes the join closure drain all input data, rather than processing it. As a result, differential join operators shut down faster an emit less data, which in turn speeds up shutdown of downstream operators. Unfortunately, the new shutdown logic interferes with the fueling of the differential join operator. Fuel is consumed based on the number of updates emitted. When the token is dropped, the join closure stops producing updates, which means the operator stops consuming fuel, so it does not yield anymore until it has drained all its inputs. If there are many inputs left, the replica may not accept commands for potentially quite a long time.
1 parent c3caa80 commit 883ab59

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

src/compute/src/render/join/linear_join.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use mz_repr::{DatumVec, Diff, Row, RowArena};
2626
use mz_storage_client::types::errors::DataflowError;
2727
use mz_timely_util::operator::CollectionExt;
2828

29+
use crate::render::context::ShutdownToken;
2930
use crate::render::context::{
3031
Arrangement, ArrangementFlavor, ArrangementImport, CollectionBundle, Context,
3132
};
@@ -123,6 +124,7 @@ where
123124
inputs[stage_plan.lookup_relation].enter_region(inner),
124125
stage_plan,
125126
&mut errors,
127+
self.shutdown_token.clone(),
126128
);
127129
// Update joined results and capture any errors.
128130
joined = JoinedFlavor::Collection(stream);
@@ -179,6 +181,7 @@ fn differential_join<G, T>(
179181
lookup_relation: _,
180182
}: LinearStagePlan,
181183
errors: &mut Vec<Collection<G, DataflowError, Diff>>,
184+
shutdown_token: ShutdownToken,
182185
) -> Collection<G, Row, Diff>
183186
where
184187
G: Scope,
@@ -223,27 +226,27 @@ where
223226
}
224227
JoinedFlavor::Local(local) => match arrangement {
225228
ArrangementFlavor::Local(oks, errs1) => {
226-
let (oks, errs2) = differential_join_inner(local, oks, closure);
229+
let (oks, errs2) = differential_join_inner(local, oks, closure, shutdown_token);
227230
errors.push(errs1.as_collection(|k, _v| k.clone()));
228231
errors.extend(errs2);
229232
oks
230233
}
231234
ArrangementFlavor::Trace(_gid, oks, errs1) => {
232-
let (oks, errs2) = differential_join_inner(local, oks, closure);
235+
let (oks, errs2) = differential_join_inner(local, oks, closure, shutdown_token);
233236
errors.push(errs1.as_collection(|k, _v| k.clone()));
234237
errors.extend(errs2);
235238
oks
236239
}
237240
},
238241
JoinedFlavor::Trace(trace) => match arrangement {
239242
ArrangementFlavor::Local(oks, errs1) => {
240-
let (oks, errs2) = differential_join_inner(trace, oks, closure);
243+
let (oks, errs2) = differential_join_inner(trace, oks, closure, shutdown_token);
241244
errors.push(errs1.as_collection(|k, _v| k.clone()));
242245
errors.extend(errs2);
243246
oks
244247
}
245248
ArrangementFlavor::Trace(_gid, oks, errs1) => {
246-
let (oks, errs2) = differential_join_inner(trace, oks, closure);
249+
let (oks, errs2) = differential_join_inner(trace, oks, closure, shutdown_token);
247250
errors.push(errs1.as_collection(|k, _v| k.clone()));
248251
errors.extend(errs2);
249252
oks
@@ -262,6 +265,7 @@ fn differential_join_inner<G, T, J, Tr2>(
262265
prev_keyed: J,
263266
next_input: Arranged<G, Tr2>,
264267
closure: JoinClosure,
268+
shutdown_token: ShutdownToken,
265269
) -> (
266270
Collection<G, Row, Diff>,
267271
Option<Collection<G, DataflowError, Diff>>,
@@ -284,6 +288,10 @@ where
284288
if closure.could_error() {
285289
let (oks, err) = prev_keyed
286290
.join_core(&next_input, move |key, old, new| {
291+
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
292+
// shutting down.
293+
shutdown_token.probe()?;
294+
287295
let temp_storage = RowArena::new();
288296
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
289297
closure
@@ -303,6 +311,10 @@ where
303311
(oks.as_collection(), Some(err.as_collection()))
304312
} else {
305313
let oks = prev_keyed.join_core(&next_input, move |key, old, new| {
314+
// Check the shutdown token to avoid doing unnecessary work when the dataflow is
315+
// shutting down.
316+
shutdown_token.probe()?;
317+
306318
let temp_storage = RowArena::new();
307319
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
308320
closure

0 commit comments

Comments
 (0)