Skip to content

mz_join: efficient linear scan through times #33085

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def get_minimal_system_parameters(
"enable_logical_compaction_window": "true",
"enable_multi_worker_storage_persist_sink": "true",
"enable_multi_replica_sources": "true",
"enable_mz_join_core_v2": "true",
"enable_rbac_checks": "true",
"enable_reduce_mfp_fusion": "true",
"enable_refresh_every_mvs": "true",
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,7 @@ def __init__(
"enable_timely_init_at_process_startup",
"persist_enable_incremental_compaction",
"storage_statistics_retention_duration",
"enable_mz_join_core_v2",
]

def run(self, exe: Executor) -> bool:
Expand Down
9 changes: 9 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ pub const ENABLE_MZ_JOIN_CORE: Config<bool> = Config::new(
linear joins.",
);

/// Whether rendering should use `mz_join_core_v2` rather than DD's `JoinCore::join_core`.
pub const ENABLE_MZ_JOIN_CORE_V2: Config<bool> = Config::new(
"enable_mz_join_core_v2",
false,
"Whether compute should use `mz_join_core_v2` rather than DD's `JoinCore::join_core` to render \
linear joins.",
);

/// Whether rendering should use the new MV sink correction buffer implementation.
pub const ENABLE_CORRECTION_V2: Config<bool> = Config::new(
"enable_compute_correction_v2",
Expand Down Expand Up @@ -373,6 +381,7 @@ pub const PEEK_STASH_BATCH_SIZE: Config<usize> = Config::new(
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
.add(&ENABLE_MZ_JOIN_CORE)
.add(&ENABLE_MZ_JOIN_CORE_V2)
.add(&ENABLE_CORRECTION_V2)
.add(&ENABLE_MV_APPEND_SMEARING)
.add(&ENABLE_TEMPORAL_BUCKETING)
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/render/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@
mod delta_join;
mod linear_join;
mod mz_join_core;
mod mz_join_core_v2;

pub use linear_join::LinearJoinSpec;
39 changes: 33 additions & 6 deletions src/compute/src/render/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::arrangement::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::{AsCollection, Collection, Data};
use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
use mz_compute_types::dyncfgs::{
ENABLE_MZ_JOIN_CORE, ENABLE_MZ_JOIN_CORE_V2, LINEAR_JOIN_YIELDING,
};
use mz_compute_types::plan::join::JoinClosure;
use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
use mz_dyncfg::ConfigSet;
Expand All @@ -37,6 +39,7 @@ use crate::extensions::arrange::MzArrangeCore;
use crate::render::RenderTimestamp;
use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownProbe};
use crate::render::join::mz_join_core::mz_join_core;
use crate::render::join::mz_join_core_v2::mz_join_core as mz_join_core_v2;
use crate::row_spine::{RowRowBuilder, RowRowSpine};
use crate::typedefs::{MzTimestamp, RowRowAgent, RowRowEnter};

Expand All @@ -46,6 +49,7 @@ use crate::typedefs::{MzTimestamp, RowRowAgent, RowRowEnter};
#[derive(Clone, Copy)]
enum LinearJoinImpl {
Materialize,
MaterializeV2,
DifferentialDataflow,
}

Expand Down Expand Up @@ -73,9 +77,12 @@ impl Default for LinearJoinSpec {
impl LinearJoinSpec {
/// Create a `LinearJoinSpec` based on the given config.
pub fn from_config(config: &ConfigSet) -> Self {
let implementation = match ENABLE_MZ_JOIN_CORE.get(config) {
true => LinearJoinImpl::Materialize,
false => LinearJoinImpl::DifferentialDataflow,
let implementation = if ENABLE_MZ_JOIN_CORE_V2.get(config) {
LinearJoinImpl::MaterializeV2
} else if ENABLE_MZ_JOIN_CORE.get(config) {
LinearJoinImpl::Materialize
} else {
LinearJoinImpl::DifferentialDataflow
};

let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
Expand Down Expand Up @@ -106,8 +113,7 @@ impl LinearJoinSpec {
+ Clone
+ 'static,
L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
I: IntoIterator,
I::Item: Data,
I: IntoIterator<Item: Data> + 'static,
{
use LinearJoinImpl::*;

Expand All @@ -134,6 +140,27 @@ impl LinearJoinSpec {
let yield_fn = |_start, _work| false;
mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
}
(MaterializeV2, Some(work_limit), Some(time_limit)) => {
let yield_fn =
move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
.as_collection()
}
(MaterializeV2, Some(work_limit), None) => {
let yield_fn = move |_start, work| work >= work_limit;
mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
.as_collection()
}
(MaterializeV2, None, Some(time_limit)) => {
let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
.as_collection()
}
(MaterializeV2, None, None) => {
let yield_fn = |_start, _work| false;
mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
.as_collection()
}
}
}
}
Expand Down
22 changes: 9 additions & 13 deletions src/compute/src/render/join/mz_join_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,24 @@
//!
//! * Differential's `JoinCore::join_core`
//! * A Materialize fork thereof, called `mz_join_core`
//! * Another Materialize fork thereof, called `mz_join_core_v2`
//!
//! `mz_join_core` exists to solve a responsiveness problem with the DD implementation.
//! DD's join is only able to yield between keys. When computing a large cross-join or a highly
//! skewed join, this can result in loss of interactivity when the join operator refuses to yield
//! control for multiple seconds or longer, which in turn causes degraded user experience.
//!
//! `mz_join_core` currently fixes the yielding issue by omitting the merge-join matching strategy
//! implemented in DD's join implementation. This leaves only the nested loop strategy for which it
//! `mz_join_core` currently fixes the yielding issue by omitting the linear scan through times
//! implemented in DD's join implementation. This leaves only the quadratic strategy for which it
//! is easy to implement yielding within keys.
//!
//! While `mz_join_core` retains responsiveness in the face of cross-joins it is also, due to its
//! sole reliance on nested-loop matching, significantly slower than DD's join for workloads that
//! have a large amount of edits at different times. We consider these niche workloads for
//! Materialize today, due to the way source ingestion works, but that might change in the future.
//! While `mz_join_core` retains responsiveness in the face of cross-joins it is also significantly
//! slower than DD's join for workloads that have a large amount of edits at different times.
//! `mz_join_core_v2` resolves this by adding support for the DD join's linear scan through times.
//!
//! For the moment, we keep both implementations around, selectable through a feature flag.
//! We expect `mz_join_core` to be more useful in Materialize today, but being able to fall back to
//! DD's implementation provides a safety net in case that assumption is wrong.
//!
//! In the mid-term, we want to arrive at a single join implementation that is as efficient as DD's
//! join and as responsive as `mz_join_core`. Whether that means adding merge-join matching to
//! `mz_join_core` or adding better fueling to DD's join implementation is still TBD.
//! For the moment, we keep all three implementations around, selectable through feature flags.
//! Eventually, we hope that `mz_join_core_v2` proves itself sufficiently to become the only join
//! implementation.

use std::cmp::Ordering;
use std::collections::VecDeque;
Expand Down
Loading