Skip to content

Commit 2fdb728

Browse files
authored
Merge pull request #22391 from teskje/time-based-join-fueling
compute: time based linear join yielding
2 parents f65d777 + a8c2cba commit 2fdb728

File tree

12 files changed

+208
-69
lines changed

12 files changed

+208
-69
lines changed

src/adapter/src/flags.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,30 @@
1010
use std::time::Duration;
1111

1212
use mz_compute_client::protocol::command::ComputeParameters;
13+
use mz_compute_types::dataflows::YieldSpec;
1314
use mz_orchestrator::scheduling_config::{ServiceSchedulingConfig, ServiceTopologySpreadConfig};
1415
use mz_ore::cast::CastFrom;
1516
use mz_ore::error::ErrorExt;
1617
use mz_persist_client::cfg::{PersistParameters, RetryParameters};
1718
use mz_service::params::GrpcClientParameters;
18-
use mz_sql::session::vars::SystemVars;
19+
use mz_sql::session::vars::{SystemVars, DEFAULT_LINEAR_JOIN_YIELDING};
1920
use mz_storage_types::parameters::{
2021
StorageMaxInflightBytesConfig, StorageParameters, UpsertAutoSpillConfig,
2122
};
2223
use mz_tracing::params::TracingParameters;
2324

2425
/// Return the current compute configuration, derived from the system configuration.
2526
pub fn compute_config(config: &SystemVars) -> ComputeParameters {
27+
let linear_join_yielding = config.linear_join_yielding();
28+
let linear_join_yielding = parse_yield_spec(linear_join_yielding).unwrap_or_else(|| {
29+
tracing::error!("invalid `linear_join_yielding` config: {linear_join_yielding}");
30+
parse_yield_spec(&DEFAULT_LINEAR_JOIN_YIELDING).expect("default is valid")
31+
});
32+
2633
ComputeParameters {
2734
max_result_size: Some(config.max_result_size()),
2835
dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()),
36+
linear_join_yielding: Some(linear_join_yielding),
2937
enable_mz_join_core: Some(config.enable_mz_join_core()),
3038
enable_jemalloc_profiling: Some(config.enable_jemalloc_profiling()),
3139
enable_specialized_arrangements: Some(config.enable_specialized_arrangements()),
@@ -35,6 +43,22 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
3543
}
3644
}
3745

46+
fn parse_yield_spec(s: &str) -> Option<YieldSpec> {
47+
let parts: Vec<_> = s.split(':').collect();
48+
match &parts[..] {
49+
["work", amount] => {
50+
let amount = amount.parse().ok()?;
51+
Some(YieldSpec::ByWork(amount))
52+
}
53+
["time", millis] => {
54+
let millis = millis.parse().ok()?;
55+
let duration = Duration::from_millis(millis);
56+
Some(YieldSpec::ByTime(duration))
57+
}
58+
_ => None,
59+
}
60+
}
61+
3862
/// Return the current storage configuration, derived from the system configuration.
3963
pub fn storage_config(config: &SystemVars) -> StorageParameters {
4064
StorageParameters {

src/compute-client/src/protocol/command.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,5 @@ message ProtoComputeParameters {
7070
mz_service.params.ProtoGrpcClientParameters grpc_client = 6;
7171
optional bool enable_jemalloc_profiling = 7;
7272
optional bool enable_specialized_arrangements = 8;
73+
mz_compute_types.dataflows.ProtoYieldSpec linear_join_yielding = 9;
7374
}

src/compute-client/src/protocol/command.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
//! Compute protocol commands.
1111
1212
use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
13-
use mz_compute_types::dataflows::DataflowDescription;
13+
use mz_compute_types::dataflows::{DataflowDescription, YieldSpec};
1414
use mz_expr::RowSetFinishing;
1515
use mz_ore::tracing::OpenTelemetryContext;
1616
use mz_persist_client::cfg::PersistParameters;
@@ -359,6 +359,8 @@ pub struct ComputeParameters {
359359
pub max_result_size: Option<u32>,
360360
/// The maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
361361
pub dataflow_max_inflight_bytes: Option<usize>,
362+
/// The yielding behavior with which linear joins should be rendered.
363+
pub linear_join_yielding: Option<YieldSpec>,
362364
/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
363365
pub enable_mz_join_core: Option<bool>,
364366
/// Whether to activate jemalloc heap profiling.
@@ -379,6 +381,7 @@ impl ComputeParameters {
379381
let ComputeParameters {
380382
max_result_size,
381383
dataflow_max_inflight_bytes,
384+
linear_join_yielding,
382385
enable_mz_join_core,
383386
enable_jemalloc_profiling,
384387
enable_specialized_arrangements,
@@ -393,6 +396,9 @@ impl ComputeParameters {
393396
if dataflow_max_inflight_bytes.is_some() {
394397
self.dataflow_max_inflight_bytes = dataflow_max_inflight_bytes;
395398
}
399+
if linear_join_yielding.is_some() {
400+
self.linear_join_yielding = linear_join_yielding;
401+
}
396402
if enable_mz_join_core.is_some() {
397403
self.enable_mz_join_core = enable_mz_join_core;
398404
}
@@ -420,6 +426,7 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
420426
ProtoComputeParameters {
421427
max_result_size: self.max_result_size.into_proto(),
422428
dataflow_max_inflight_bytes: self.dataflow_max_inflight_bytes.into_proto(),
429+
linear_join_yielding: self.linear_join_yielding.into_proto(),
423430
enable_mz_join_core: self.enable_mz_join_core.into_proto(),
424431
enable_jemalloc_profiling: self.enable_jemalloc_profiling.into_proto(),
425432
enable_specialized_arrangements: self.enable_specialized_arrangements.into_proto(),
@@ -433,6 +440,7 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
433440
Ok(Self {
434441
max_result_size: proto.max_result_size.into_rust()?,
435442
dataflow_max_inflight_bytes: proto.dataflow_max_inflight_bytes.into_rust()?,
443+
linear_join_yielding: proto.linear_join_yielding.into_rust()?,
436444
enable_mz_join_core: proto.enable_mz_join_core.into_rust()?,
437445
enable_jemalloc_profiling: proto.enable_jemalloc_profiling.into_rust()?,
438446
enable_specialized_arrangements: proto.enable_specialized_arrangements.into_rust()?,

src/compute-types/src/dataflows.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import "compute-types/src/plan.proto";
1313
import "compute-types/src/sinks.proto";
1414
import "compute-types/src/sources.proto";
1515
import "expr/src/scalar.proto";
16+
import "proto/src/proto.proto";
1617
import "repr/src/antichain.proto";
1718
import "repr/src/global_id.proto";
1819
import "repr/src/relation_and_scalar.proto";
@@ -63,3 +64,10 @@ message ProtoBuildDesc {
6364
mz_repr.global_id.ProtoGlobalId id = 1;
6465
plan.ProtoPlan plan = 2;
6566
}
67+
68+
message ProtoYieldSpec {
69+
oneof kind {
70+
uint64 by_work = 1;
71+
mz_proto.ProtoDuration by_time = 2;
72+
}
73+
}

src/compute-types/src/dataflows.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
//! Types for describing dataflows.
1111
1212
use std::collections::{BTreeMap, BTreeSet};
13+
use std::time::Duration;
1314

1415
use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
1516
use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
@@ -640,6 +641,38 @@ impl RustType<ProtoBuildDesc> for BuildDesc<crate::plan::Plan> {
640641
}
641642
}
642643

644+
/// Specification of a dataflow operator's yielding behavior.
645+
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
646+
pub enum YieldSpec {
647+
ByWork(usize),
648+
ByTime(Duration),
649+
}
650+
651+
impl RustType<ProtoYieldSpec> for YieldSpec {
652+
fn into_proto(&self) -> ProtoYieldSpec {
653+
use proto_yield_spec::Kind;
654+
655+
let kind = match *self {
656+
Self::ByWork(w) => Kind::ByWork(w.into_proto()),
657+
Self::ByTime(t) => Kind::ByTime(t.into_proto()),
658+
};
659+
ProtoYieldSpec { kind: Some(kind) }
660+
}
661+
662+
fn from_proto(proto: ProtoYieldSpec) -> Result<Self, TryFromProtoError> {
663+
use proto_yield_spec::Kind;
664+
665+
let Some(kind) = proto.kind else {
666+
return Err(TryFromProtoError::missing_field("ProtoYieldSpec::kind"));
667+
};
668+
let spec = match kind {
669+
Kind::ByWork(w) => Self::ByWork(w.into_rust()?),
670+
Kind::ByTime(t) => Self::ByTime(t.into_rust()?),
671+
};
672+
Ok(spec)
673+
}
674+
}
675+
643676
#[cfg(test)]
644677
mod tests {
645678
use mz_proto::protobuf_roundtrip;

src/compute/src/compute_state.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::arrangement::manager::{SpecializedTraceHandle, TraceBundle, TraceMana
4444
use crate::logging;
4545
use crate::logging::compute::ComputeEvent;
4646
use crate::metrics::ComputeMetrics;
47-
use crate::render::LinearJoinImpl;
47+
use crate::render::{LinearJoinImpl, LinearJoinSpec};
4848
use crate::server::ResponseSender;
4949
use crate::typedefs::TraceRowHandle;
5050

@@ -84,8 +84,8 @@ pub struct ComputeState {
8484
max_result_size: u32,
8585
/// Maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
8686
pub dataflow_max_inflight_bytes: usize,
87-
/// Implementation to use for rendering linear joins.
88-
pub linear_join_impl: LinearJoinImpl,
87+
/// Specification for rendering linear joins.
88+
pub linear_join_spec: LinearJoinSpec,
8989
/// Metrics for this replica.
9090
pub metrics: ComputeMetrics,
9191
/// A process-global handle to tracing configuration.
@@ -116,7 +116,7 @@ impl ComputeState {
116116
command_history,
117117
max_result_size: u32::MAX,
118118
dataflow_max_inflight_bytes: usize::MAX,
119-
linear_join_impl: Default::default(),
119+
linear_join_spec: Default::default(),
120120
metrics,
121121
tracing_handle,
122122
enable_specialized_arrangements: Default::default(),
@@ -198,6 +198,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
198198
let ComputeParameters {
199199
max_result_size,
200200
dataflow_max_inflight_bytes,
201+
linear_join_yielding,
201202
enable_mz_join_core,
202203
enable_jemalloc_profiling,
203204
enable_specialized_arrangements,
@@ -212,11 +213,14 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
212213
if let Some(v) = dataflow_max_inflight_bytes {
213214
self.compute_state.dataflow_max_inflight_bytes = v;
214215
}
216+
if let Some(v) = linear_join_yielding {
217+
self.compute_state.linear_join_spec.yielding = v;
218+
}
215219
if let Some(v) = enable_specialized_arrangements {
216220
self.compute_state.enable_specialized_arrangements = v;
217221
}
218222
if let Some(v) = enable_mz_join_core {
219-
self.compute_state.linear_join_impl = match v {
223+
self.compute_state.linear_join_spec.implementation = match v {
220224
false => LinearJoinImpl::DifferentialDataflow,
221225
true => LinearJoinImpl::Materialize,
222226
};

src/compute/src/render/context.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use timely::progress::{Antichain, Timestamp};
4242
use crate::arrangement::manager::SpecializedTraceHandle;
4343
use crate::extensions::arrange::{KeyCollection, MzArrange};
4444
use crate::render::errors::ErrorLogger;
45-
use crate::render::join::LinearJoinImpl;
45+
use crate::render::join::LinearJoinSpec;
4646
use crate::typedefs::{ErrSpine, RowSpine, TraceErrHandle, TraceRowHandle};
4747

4848
// Local type definition to avoid the horror in signatures.
@@ -98,8 +98,8 @@ where
9898
pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
9999
/// A token that operators can probe to know whether the dataflow is shutting down.
100100
pub(super) shutdown_token: ShutdownToken,
101-
/// The implementation to use for rendering linear joins.
102-
pub(super) linear_join_impl: LinearJoinImpl,
101+
/// Specification for rendering linear joins.
102+
pub(super) linear_join_spec: LinearJoinSpec,
103103
pub(super) enable_specialized_arrangements: bool,
104104
}
105105

@@ -127,7 +127,7 @@ where
127127
until: dataflow.until.clone(),
128128
bindings: BTreeMap::new(),
129129
shutdown_token: Default::default(),
130-
linear_join_impl: Default::default(),
130+
linear_join_spec: Default::default(),
131131
enable_specialized_arrangements: Default::default(),
132132
}
133133
}

0 commit comments

Comments
 (0)