Skip to content

Commit 3b880aa

Browse files
committed
compute: allow specification of join yielding
This commit introduces the plumbing required to allow users of Compute to specify the yielding behavior of linear join operators via `UpdateConfiguration` commands.
1 parent 0749d13 commit 3b880aa

File tree

10 files changed

+115
-34
lines changed

10 files changed

+115
-34
lines changed

src/adapter/src/flags.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
2626
ComputeParameters {
2727
max_result_size: Some(config.max_result_size()),
2828
dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()),
29+
linear_join_yielding: None,
2930
enable_mz_join_core: Some(config.enable_mz_join_core()),
3031
enable_jemalloc_profiling: Some(config.enable_jemalloc_profiling()),
3132
persist: persist_config(config),

src/compute-client/src/protocol/command.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,5 @@ message ProtoComputeParameters {
6969
mz_tracing.params.ProtoTracingParameters tracing = 5;
7070
mz_service.params.ProtoGrpcClientParameters grpc_client = 6;
7171
optional bool enable_jemalloc_profiling = 7;
72+
mz_compute_types.dataflows.ProtoYieldSpec linear_join_yielding = 8;
7273
}

src/compute-client/src/protocol/command.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
//! Compute protocol commands.
1111
1212
use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
13-
use mz_compute_types::dataflows::DataflowDescription;
13+
use mz_compute_types::dataflows::{DataflowDescription, YieldSpec};
1414
use mz_expr::RowSetFinishing;
1515
use mz_ore::tracing::OpenTelemetryContext;
1616
use mz_persist_client::cfg::PersistParameters;
@@ -359,6 +359,8 @@ pub struct ComputeParameters {
359359
pub max_result_size: Option<u32>,
360360
/// The maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
361361
pub dataflow_max_inflight_bytes: Option<usize>,
362+
/// The yielding behavior with which linear joins should be rendered.
363+
pub linear_join_yielding: Option<YieldSpec>,
362364
/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
363365
pub enable_mz_join_core: Option<bool>,
364366
/// Whether to activate jemalloc heap profiling.
@@ -377,6 +379,7 @@ impl ComputeParameters {
377379
let ComputeParameters {
378380
max_result_size,
379381
dataflow_max_inflight_bytes,
382+
linear_join_yielding,
380383
enable_mz_join_core,
381384
enable_jemalloc_profiling,
382385
persist,
@@ -390,6 +393,9 @@ impl ComputeParameters {
390393
if dataflow_max_inflight_bytes.is_some() {
391394
self.dataflow_max_inflight_bytes = dataflow_max_inflight_bytes;
392395
}
396+
if linear_join_yielding.is_some() {
397+
self.linear_join_yielding = linear_join_yielding;
398+
}
393399
if enable_mz_join_core.is_some() {
394400
self.enable_mz_join_core = enable_mz_join_core;
395401
}
@@ -413,6 +419,7 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
413419
ProtoComputeParameters {
414420
max_result_size: self.max_result_size.into_proto(),
415421
dataflow_max_inflight_bytes: self.dataflow_max_inflight_bytes.into_proto(),
422+
linear_join_yielding: self.linear_join_yielding.into_proto(),
416423
enable_mz_join_core: self.enable_mz_join_core.into_proto(),
417424
enable_jemalloc_profiling: self.enable_jemalloc_profiling.into_proto(),
418425
persist: Some(self.persist.into_proto()),
@@ -425,6 +432,7 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
425432
Ok(Self {
426433
max_result_size: proto.max_result_size.into_rust()?,
427434
dataflow_max_inflight_bytes: proto.dataflow_max_inflight_bytes.into_rust()?,
435+
linear_join_yielding: proto.linear_join_yielding.into_rust()?,
428436
enable_mz_join_core: proto.enable_mz_join_core.into_rust()?,
429437
enable_jemalloc_profiling: proto.enable_jemalloc_profiling.into_rust()?,
430438
persist: proto

src/compute-types/src/dataflows.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import "compute-types/src/plan.proto";
1313
import "compute-types/src/sinks.proto";
1414
import "compute-types/src/sources.proto";
1515
import "expr/src/scalar.proto";
16+
import "proto/src/proto.proto";
1617
import "repr/src/antichain.proto";
1718
import "repr/src/global_id.proto";
1819
import "repr/src/relation_and_scalar.proto";
@@ -63,3 +64,10 @@ message ProtoBuildDesc {
6364
mz_repr.global_id.ProtoGlobalId id = 1;
6465
plan.ProtoPlan plan = 2;
6566
}
67+
68+
message ProtoYieldSpec {
69+
oneof kind {
70+
uint64 by_work = 1;
71+
mz_proto.ProtoDuration by_time = 2;
72+
}
73+
}

src/compute-types/src/dataflows.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
//! Types for describing dataflows.
1111
1212
use std::collections::{BTreeMap, BTreeSet};
13+
use std::time::Duration;
1314

1415
use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
1516
use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
@@ -640,6 +641,38 @@ impl RustType<ProtoBuildDesc> for BuildDesc<crate::plan::Plan> {
640641
}
641642
}
642643

644+
/// Specification of a dataflow operator's yielding behavior.
645+
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
646+
pub enum YieldSpec {
647+
ByWork(usize),
648+
ByTime(Duration),
649+
}
650+
651+
impl RustType<ProtoYieldSpec> for YieldSpec {
652+
fn into_proto(&self) -> ProtoYieldSpec {
653+
use proto_yield_spec::Kind;
654+
655+
let kind = match *self {
656+
Self::ByWork(w) => Kind::ByWork(w.into_proto()),
657+
Self::ByTime(t) => Kind::ByTime(t.into_proto()),
658+
};
659+
ProtoYieldSpec { kind: Some(kind) }
660+
}
661+
662+
fn from_proto(proto: ProtoYieldSpec) -> Result<Self, TryFromProtoError> {
663+
use proto_yield_spec::Kind;
664+
665+
let Some(kind) = proto.kind else {
666+
return Err(TryFromProtoError::missing_field("ProtoYieldSpec::kind"));
667+
};
668+
let spec = match kind {
669+
Kind::ByWork(w) => Self::ByWork(w.into_rust()?),
670+
Kind::ByTime(t) => Self::ByTime(t.into_rust()?),
671+
};
672+
Ok(spec)
673+
}
674+
}
675+
643676
#[cfg(test)]
644677
mod tests {
645678
use mz_proto::protobuf_roundtrip;

src/compute/src/compute_state.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::arrangement::manager::{TraceBundle, TraceManager};
4141
use crate::logging;
4242
use crate::logging::compute::ComputeEvent;
4343
use crate::metrics::ComputeMetrics;
44-
use crate::render::LinearJoinImpl;
44+
use crate::render::{LinearJoinImpl, LinearJoinSpec};
4545
use crate::server::ResponseSender;
4646

4747
/// Worker-local state that is maintained across dataflows.
@@ -80,8 +80,8 @@ pub struct ComputeState {
8080
max_result_size: u32,
8181
/// Maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
8282
pub dataflow_max_inflight_bytes: usize,
83-
/// Implementation to use for rendering linear joins.
84-
pub linear_join_impl: LinearJoinImpl,
83+
/// Specification for rendering linear joins.
84+
pub linear_join_spec: LinearJoinSpec,
8585
/// Metrics for this replica.
8686
pub metrics: ComputeMetrics,
8787
/// A process-global handle to tracing configuration.
@@ -110,7 +110,7 @@ impl ComputeState {
110110
command_history,
111111
max_result_size: u32::MAX,
112112
dataflow_max_inflight_bytes: usize::MAX,
113-
linear_join_impl: Default::default(),
113+
linear_join_spec: Default::default(),
114114
metrics,
115115
tracing_handle,
116116
}
@@ -191,6 +191,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
191191
let ComputeParameters {
192192
max_result_size,
193193
dataflow_max_inflight_bytes,
194+
linear_join_yielding,
194195
enable_mz_join_core,
195196
enable_jemalloc_profiling,
196197
persist,
@@ -204,8 +205,11 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
204205
if let Some(v) = dataflow_max_inflight_bytes {
205206
self.compute_state.dataflow_max_inflight_bytes = v;
206207
}
208+
if let Some(v) = linear_join_yielding {
209+
self.compute_state.linear_join_spec.yielding = v;
210+
}
207211
if let Some(v) = enable_mz_join_core {
208-
self.compute_state.linear_join_impl = match v {
212+
self.compute_state.linear_join_spec.implementation = match v {
209213
false => LinearJoinImpl::DifferentialDataflow,
210214
true => LinearJoinImpl::Materialize,
211215
};

src/compute/src/render/context.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use timely::progress::{Antichain, Timestamp};
3838

3939
use crate::extensions::arrange::{KeyCollection, MzArrange};
4040
use crate::render::errors::ErrorLogger;
41-
use crate::render::join::LinearJoinImpl;
41+
use crate::render::join::LinearJoinSpec;
4242
use crate::typedefs::{ErrSpine, RowSpine, TraceErrHandle, TraceRowHandle};
4343

4444
// Local type definition to avoid the horror in signatures.
@@ -94,8 +94,8 @@ where
9494
pub bindings: BTreeMap<Id, CollectionBundle<S, V, T>>,
9595
/// A token that operators can probe to know whether the dataflow is shutting down.
9696
pub(super) shutdown_token: ShutdownToken,
97-
/// The implementation to use for rendering linear joins.
98-
pub(super) linear_join_impl: LinearJoinImpl,
97+
/// Specification for rendering linear joins.
98+
pub(super) linear_join_spec: LinearJoinSpec,
9999
}
100100

101101
impl<S: Scope, V: Data + columnation::Columnation> Context<S, V>
@@ -122,7 +122,7 @@ where
122122
until: dataflow.until.clone(),
123123
bindings: BTreeMap::new(),
124124
shutdown_token: Default::default(),
125-
linear_join_impl: Default::default(),
125+
linear_join_spec: Default::default(),
126126
}
127127
}
128128
}

src/compute/src/render/join/linear_join.rs

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
//!
1212
//! Consult [LinearJoinPlan] documentation for details.
1313
14+
use std::time::Instant;
15+
1416
use differential_dataflow::lattice::Lattice;
1517
use differential_dataflow::operators::arrange::arrangement::Arranged;
1618
use differential_dataflow::trace::TraceReader;
1719
use differential_dataflow::{AsCollection, Collection, Data};
20+
use mz_compute_types::dataflows::YieldSpec;
1821
use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
1922
use mz_compute_types::plan::join::JoinClosure;
2023
use mz_repr::{DatumVec, Diff, Row, RowArena};
@@ -34,16 +37,31 @@ use crate::typedefs::RowSpine;
3437
/// Available linear join implementations.
3538
///
3639
/// See the `mz_join_core` module docs for our rationale for providing two join implementations.
37-
#[derive(Clone, Copy, Default)]
40+
#[derive(Clone, Copy)]
3841
pub enum LinearJoinImpl {
39-
#[default]
4042
Materialize,
4143
DifferentialDataflow,
4244
}
4345

44-
impl LinearJoinImpl {
45-
/// Run this join implementation on the provided arrangements.
46-
fn run<G, Tr1, Tr2, L, I>(
46+
/// Specification of how linear joins are to be executed.
47+
#[derive(Clone, Copy)]
48+
pub struct LinearJoinSpec {
49+
pub implementation: LinearJoinImpl,
50+
pub yielding: YieldSpec,
51+
}
52+
53+
impl Default for LinearJoinSpec {
54+
fn default() -> Self {
55+
Self {
56+
implementation: LinearJoinImpl::Materialize,
57+
yielding: YieldSpec::ByWork(1_000_000),
58+
}
59+
}
60+
}
61+
62+
impl LinearJoinSpec {
63+
/// Render a join operator according to this specification.
64+
fn render<G, Tr1, Tr2, L, I>(
4765
&self,
4866
arranged1: &Arranged<G, Tr1>,
4967
arranged2: &Arranged<G, Tr2>,
@@ -58,13 +76,21 @@ impl LinearJoinImpl {
5876
I: IntoIterator,
5977
I::Item: Data,
6078
{
61-
match self {
62-
Self::DifferentialDataflow => {
79+
use LinearJoinImpl::*;
80+
use YieldSpec::*;
81+
82+
match (self.implementation, self.yielding) {
83+
(DifferentialDataflow, _) => {
6384
differential_dataflow::operators::JoinCore::join_core(arranged1, arranged2, result)
6485
}
65-
Self::Materialize => mz_join_core(arranged1, arranged2, result, |_start, work| {
66-
work >= 1_000_000
67-
}),
86+
(Materialize, ByWork(limit)) => {
87+
let yield_fn = move |_start, work| work >= limit;
88+
mz_join_core(arranged1, arranged2, result, yield_fn)
89+
}
90+
(Materialize, ByTime(limit)) => {
91+
let yield_fn = move |start: Instant, _work| start.elapsed() >= limit;
92+
mz_join_core(arranged1, arranged2, result, yield_fn)
93+
}
6894
}
6995
}
7096
}
@@ -157,7 +183,7 @@ where
157183
// Different variants of `joined` implement this differently,
158184
// and the logic is centralized there.
159185
let stream = differential_join(
160-
self.linear_join_impl,
186+
self.linear_join_spec,
161187
joined,
162188
inputs[stage_plan.lookup_relation].enter_region(inner),
163189
stage_plan,
@@ -208,7 +234,7 @@ where
208234
/// version of the join of previous inputs. This is split into its own method
209235
/// to enable reuse of code with different types of `prev_keyed`.
210236
fn differential_join<G, T>(
211-
join_impl: LinearJoinImpl,
237+
join_spec: LinearJoinSpec,
212238
mut joined: JoinedFlavor<G, T>,
213239
lookup_relation: CollectionBundle<G, Row, T>,
214240
LinearStagePlan {
@@ -263,27 +289,27 @@ where
263289
}
264290
JoinedFlavor::Local(local) => match arrangement {
265291
ArrangementFlavor::Local(oks, errs1) => {
266-
let (oks, errs2) = differential_join_inner(join_impl, local, oks, closure);
292+
let (oks, errs2) = differential_join_inner(join_spec, local, oks, closure);
267293
errors.push(errs1.as_collection(|k, _v| k.clone()));
268294
errors.extend(errs2);
269295
oks
270296
}
271297
ArrangementFlavor::Trace(_gid, oks, errs1) => {
272-
let (oks, errs2) = differential_join_inner(join_impl, local, oks, closure);
298+
let (oks, errs2) = differential_join_inner(join_spec, local, oks, closure);
273299
errors.push(errs1.as_collection(|k, _v| k.clone()));
274300
errors.extend(errs2);
275301
oks
276302
}
277303
},
278304
JoinedFlavor::Trace(trace) => match arrangement {
279305
ArrangementFlavor::Local(oks, errs1) => {
280-
let (oks, errs2) = differential_join_inner(join_impl, trace, oks, closure);
306+
let (oks, errs2) = differential_join_inner(join_spec, trace, oks, closure);
281307
errors.push(errs1.as_collection(|k, _v| k.clone()));
282308
errors.extend(errs2);
283309
oks
284310
}
285311
ArrangementFlavor::Trace(_gid, oks, errs1) => {
286-
let (oks, errs2) = differential_join_inner(join_impl, trace, oks, closure);
312+
let (oks, errs2) = differential_join_inner(join_spec, trace, oks, closure);
287313
errors.push(errs1.as_collection(|k, _v| k.clone()));
288314
errors.extend(errs2);
289315
oks
@@ -299,7 +325,7 @@ where
299325
/// The return type includes an optional error collection, which may be
300326
/// `None` if we can determine that `closure` cannot error.
301327
fn differential_join_inner<G, T, Tr1, Tr2>(
302-
join_impl: LinearJoinImpl,
328+
join_spec: LinearJoinSpec,
303329
prev_keyed: Arranged<G, Tr1>,
304330
next_input: Arranged<G, Tr2>,
305331
closure: JoinClosure,
@@ -319,8 +345,8 @@ where
319345
let mut row_builder = Row::default();
320346

321347
if closure.could_error() {
322-
let (oks, err) = join_impl
323-
.run(&prev_keyed, &next_input, move |key, old, new| {
348+
let (oks, err) = join_spec
349+
.render(&prev_keyed, &next_input, move |key, old, new| {
324350
let temp_storage = RowArena::new();
325351
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
326352
closure
@@ -339,7 +365,7 @@ where
339365

340366
(oks.as_collection(), Some(err.as_collection()))
341367
} else {
342-
let oks = join_impl.run(&prev_keyed, &next_input, move |key, old, new| {
368+
let oks = join_spec.render(&prev_keyed, &next_input, move |key, old, new| {
343369
let temp_storage = RowArena::new();
344370
let mut datums_local = datums.borrow_with_many(&[key, old, new]);
345371
closure

src/compute/src/render/join/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ mod delta_join;
1515
mod linear_join;
1616
mod mz_join_core;
1717

18-
pub use linear_join::LinearJoinImpl;
18+
pub use linear_join::{LinearJoinImpl, LinearJoinSpec};

0 commit comments

Comments
 (0)