Skip to content

Commit b232bb0

Browse files
committed
adapter: add SystemVar linear_join_yielding
This commit adds a new `SystemVar` called `linear_join_yielding` that can be used to control the yielding behavior of linear joins rendered by the compute layer.
1 parent 3b880aa commit b232bb0

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

src/adapter/src/flags.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,30 @@
1010
use std::time::Duration;
1111

1212
use mz_compute_client::protocol::command::ComputeParameters;
13+
use mz_compute_types::dataflows::YieldSpec;
1314
use mz_orchestrator::scheduling_config::{ServiceSchedulingConfig, ServiceTopologySpreadConfig};
1415
use mz_ore::cast::CastFrom;
1516
use mz_ore::error::ErrorExt;
1617
use mz_persist_client::cfg::{PersistParameters, RetryParameters};
1718
use mz_service::params::GrpcClientParameters;
18-
use mz_sql::session::vars::SystemVars;
19+
use mz_sql::session::vars::{SystemVars, DEFAULT_LINEAR_JOIN_YIELDING};
1920
use mz_storage_types::parameters::{
2021
StorageMaxInflightBytesConfig, StorageParameters, UpsertAutoSpillConfig,
2122
};
2223
use mz_tracing::params::TracingParameters;
2324

2425
/// Return the current compute configuration, derived from the system configuration.
2526
pub fn compute_config(config: &SystemVars) -> ComputeParameters {
27+
let linear_join_yielding = config.linear_join_yielding();
28+
let linear_join_yielding = parse_yield_spec(linear_join_yielding).unwrap_or_else(|| {
29+
tracing::error!("invalid `linear_join_yielding` config: {linear_join_yielding}");
30+
parse_yield_spec(&DEFAULT_LINEAR_JOIN_YIELDING).expect("default is valid")
31+
});
32+
2633
ComputeParameters {
2734
max_result_size: Some(config.max_result_size()),
2835
dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()),
29-
linear_join_yielding: None,
36+
linear_join_yielding: Some(linear_join_yielding),
3037
enable_mz_join_core: Some(config.enable_mz_join_core()),
3138
enable_jemalloc_profiling: Some(config.enable_jemalloc_profiling()),
3239
persist: persist_config(config),
@@ -35,6 +42,22 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
3542
}
3643
}
3744

45+
fn parse_yield_spec(s: &str) -> Option<YieldSpec> {
46+
let parts: Vec<_> = s.split(':').collect();
47+
match &parts[..] {
48+
["work", amount] => {
49+
let amount = amount.parse().ok()?;
50+
Some(YieldSpec::ByWork(amount))
51+
}
52+
["time", millis] => {
53+
let millis = millis.parse().ok()?;
54+
let duration = Duration::from_millis(millis);
55+
Some(YieldSpec::ByTime(duration))
56+
}
57+
_ => None,
58+
}
59+
}
60+
3861
/// Return the current storage configuration, derived from the system configuration.
3962
pub fn storage_config(config: &SystemVars) -> StorageParameters {
4063
StorageParameters {

src/sql/src/session/vars.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,16 @@ const ENABLE_MZ_JOIN_CORE: ServerVar<bool> = ServerVar {
12891289
internal: true,
12901290
};
12911291

1292+
pub static DEFAULT_LINEAR_JOIN_YIELDING: Lazy<String> = Lazy::new(|| "work:1000000".into());
1293+
static LINEAR_JOIN_YIELDING: Lazy<ServerVar<String>> = Lazy::new(|| ServerVar {
1294+
name: UncasedStr::new("linear_join_yielding"),
1295+
value: &DEFAULT_LINEAR_JOIN_YIELDING,
1296+
description:
1297+
"The yielding behavior compute rendering should apply for linear join operators. Either \
1298+
'work:<amount>' or 'time:<milliseconds>'.",
1299+
internal: true,
1300+
});
1301+
12921302
pub const ENABLE_DEFAULT_CONNECTION_VALIDATION: ServerVar<bool> = ServerVar {
12931303
name: UncasedStr::new("enable_default_connection_validation"),
12941304
value: &true,
@@ -2428,6 +2438,7 @@ impl SystemVars {
24282438
.with_var(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
24292439
.with_var(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
24302440
.with_var(&ENABLE_MZ_JOIN_CORE)
2441+
.with_var(&LINEAR_JOIN_YIELDING)
24312442
.with_var(&ENABLE_STORAGE_SHARD_FINALIZATION)
24322443
.with_var(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
24332444
.with_var(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
@@ -3079,6 +3090,11 @@ impl SystemVars {
30793090
*self.expect_value(&ENABLE_MZ_JOIN_CORE)
30803091
}
30813092

3093+
/// Returns the `linear_join_yielding` configuration parameter.
3094+
pub fn linear_join_yielding(&self) -> &String {
3095+
self.expect_value(&LINEAR_JOIN_YIELDING)
3096+
}
3097+
30823098
/// Returns the `enable_storage_shard_finalization` configuration parameter.
30833099
pub fn enable_storage_shard_finalization(&self) -> bool {
30843100
*self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
@@ -4744,6 +4760,7 @@ pub fn is_tracing_var(name: &str) -> bool {
47444760
pub fn is_compute_config_var(name: &str) -> bool {
47454761
name == MAX_RESULT_SIZE.name()
47464762
|| name == DATAFLOW_MAX_INFLIGHT_BYTES.name()
4763+
|| name == LINEAR_JOIN_YIELDING.name()
47474764
|| name == ENABLE_MZ_JOIN_CORE.name()
47484765
|| name == ENABLE_JEMALLOC_PROFILING.name()
47494766
|| is_persist_config_var(name)

0 commit comments

Comments
 (0)