Skip to content

Commit a8c2cba

Browse files
committed
adapter: add SystemVar linear_join_yielding
This commit adds a new `SystemVar` called `linear_join_yielding` that can be used to control the yielding behavior of linear joins rendered by the compute layer.
1 parent 52827e0 commit a8c2cba

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

src/adapter/src/flags.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,30 @@
1010
use std::time::Duration;
1111

1212
use mz_compute_client::protocol::command::ComputeParameters;
13+
use mz_compute_types::dataflows::YieldSpec;
1314
use mz_orchestrator::scheduling_config::{ServiceSchedulingConfig, ServiceTopologySpreadConfig};
1415
use mz_ore::cast::CastFrom;
1516
use mz_ore::error::ErrorExt;
1617
use mz_persist_client::cfg::{PersistParameters, RetryParameters};
1718
use mz_service::params::GrpcClientParameters;
18-
use mz_sql::session::vars::SystemVars;
19+
use mz_sql::session::vars::{SystemVars, DEFAULT_LINEAR_JOIN_YIELDING};
1920
use mz_storage_types::parameters::{
2021
StorageMaxInflightBytesConfig, StorageParameters, UpsertAutoSpillConfig,
2122
};
2223
use mz_tracing::params::TracingParameters;
2324

2425
/// Return the current compute configuration, derived from the system configuration.
2526
pub fn compute_config(config: &SystemVars) -> ComputeParameters {
27+
let linear_join_yielding = config.linear_join_yielding();
28+
let linear_join_yielding = parse_yield_spec(linear_join_yielding).unwrap_or_else(|| {
29+
tracing::error!("invalid `linear_join_yielding` config: {linear_join_yielding}");
30+
parse_yield_spec(&DEFAULT_LINEAR_JOIN_YIELDING).expect("default is valid")
31+
});
32+
2633
ComputeParameters {
2734
max_result_size: Some(config.max_result_size()),
2835
dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()),
29-
linear_join_yielding: None,
36+
linear_join_yielding: Some(linear_join_yielding),
3037
enable_mz_join_core: Some(config.enable_mz_join_core()),
3138
enable_jemalloc_profiling: Some(config.enable_jemalloc_profiling()),
3239
enable_specialized_arrangements: Some(config.enable_specialized_arrangements()),
@@ -36,6 +43,22 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
3643
}
3744
}
3845

46+
fn parse_yield_spec(s: &str) -> Option<YieldSpec> {
47+
let parts: Vec<_> = s.split(':').collect();
48+
match &parts[..] {
49+
["work", amount] => {
50+
let amount = amount.parse().ok()?;
51+
Some(YieldSpec::ByWork(amount))
52+
}
53+
["time", millis] => {
54+
let millis = millis.parse().ok()?;
55+
let duration = Duration::from_millis(millis);
56+
Some(YieldSpec::ByTime(duration))
57+
}
58+
_ => None,
59+
}
60+
}
61+
3962
/// Return the current storage configuration, derived from the system configuration.
4063
pub fn storage_config(config: &SystemVars) -> StorageParameters {
4164
StorageParameters {

src/sql/src/session/vars.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,16 @@ const ENABLE_MZ_JOIN_CORE: ServerVar<bool> = ServerVar {
12961296
internal: true,
12971297
};
12981298

1299+
pub static DEFAULT_LINEAR_JOIN_YIELDING: Lazy<String> = Lazy::new(|| "work:1000000".into());
1300+
static LINEAR_JOIN_YIELDING: Lazy<ServerVar<String>> = Lazy::new(|| ServerVar {
1301+
name: UncasedStr::new("linear_join_yielding"),
1302+
value: &DEFAULT_LINEAR_JOIN_YIELDING,
1303+
description:
1304+
"The yielding behavior compute rendering should apply for linear join operators. Either \
1305+
'work:<amount>' or 'time:<milliseconds>'.",
1306+
internal: true,
1307+
});
1308+
12991309
pub const ENABLE_DEFAULT_CONNECTION_VALIDATION: ServerVar<bool> = ServerVar {
13001310
name: UncasedStr::new("enable_default_connection_validation"),
13011311
value: &true,
@@ -2471,6 +2481,7 @@ impl SystemVars {
24712481
.with_var(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
24722482
.with_var(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
24732483
.with_var(&ENABLE_MZ_JOIN_CORE)
2484+
.with_var(&LINEAR_JOIN_YIELDING)
24742485
.with_var(&ENABLE_STORAGE_SHARD_FINALIZATION)
24752486
.with_var(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
24762487
.with_var(&ENABLE_SPECIALIZED_ARRANGEMENTS)
@@ -3128,6 +3139,11 @@ impl SystemVars {
31283139
*self.expect_value(&ENABLE_MZ_JOIN_CORE)
31293140
}
31303141

3142+
/// Returns the `linear_join_yielding` configuration parameter.
3143+
pub fn linear_join_yielding(&self) -> &String {
3144+
self.expect_value(&LINEAR_JOIN_YIELDING)
3145+
}
3146+
31313147
/// Returns the `enable_storage_shard_finalization` configuration parameter.
31323148
pub fn enable_storage_shard_finalization(&self) -> bool {
31333149
*self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
@@ -4797,6 +4813,7 @@ pub fn is_tracing_var(name: &str) -> bool {
47974813
pub fn is_compute_config_var(name: &str) -> bool {
47984814
name == MAX_RESULT_SIZE.name()
47994815
|| name == DATAFLOW_MAX_INFLIGHT_BYTES.name()
4816+
|| name == LINEAR_JOIN_YIELDING.name()
48004817
|| name == ENABLE_MZ_JOIN_CORE.name()
48014818
|| name == ENABLE_JEMALLOC_PROFILING.name()
48024819
|| name == ENABLE_SPECIALIZED_ARRANGEMENTS.name()

0 commit comments

Comments
 (0)