diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index 62836ef7c5a1e..c991b1a71220a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -112,7 +112,6 @@ pub struct TransformHashJoinProbe { partition_id_to_restore: usize, step: Step, - step_logs: Vec, } impl TransformHashJoinProbe { @@ -176,7 +175,6 @@ impl TransformHashJoinProbe { spiller, partition_id_to_restore: 0, step: Step::Async(AsyncStep::WaitBuild), - step_logs: vec![Step::Async(AsyncStep::WaitBuild)], })) } @@ -192,7 +190,6 @@ impl TransformHashJoinProbe { } }; self.step = step.clone(); - self.step_logs.push(step); Ok(event) } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 9b32bbbc233fd..43783520ed773 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1297,6 +1297,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("nondeterministic_update_lazy_read_threshold", DefaultSettingValue { + value: UserSettingValue::UInt64(18446744073709551615), + desc: "Sets the maximum rows in a query to enable lazy read optimization when updating a multi-joined row. Setting it to 0 disables the optimization.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("enable_auto_vacuum", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Whether to automatically trigger VACUUM operations on tables (using vacuum2)", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0e9a7617a1ac9..74b247e13e21c 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -1013,4 +1013,8 @@ impl Settings { pub fn get_enable_parallel_union_all(&self) -> Result { Ok(self.try_get_u64("enable_parallel_union_all")? == 1) } + + pub fn get_nondeterministic_update_lazy_read_threshold(&self) -> Result { + self.try_get_u64("nondeterministic_update_lazy_read_threshold") + } } diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index c50adc9f30db4..e79ec81630d0d 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -129,6 +129,10 @@ impl PhysicalPlanBuilder { self.build_mutation_source(mutation_source).await } RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await, + RelOperator::RowFetch(row_fetch) => { + self.build_row_fetch(s_expr, row_fetch, required, stat_info) + .await + } } } diff --git a/src/query/sql/src/executor/physical_plans/physical_mutation.rs b/src/query/sql/src/executor/physical_plans/physical_mutation.rs index 9feb40c125a1d..ed757c1ac1386 100644 --- a/src/query/sql/src/executor/physical_plans/physical_mutation.rs +++ b/src/query/sql/src/executor/physical_plans/physical_mutation.rs @@ -58,6 +58,7 @@ use crate::executor::physical_plans::MutationOrganize; use crate::executor::physical_plans::MutationSplit; use crate::executor::physical_plans::RowFetch; use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::ir::RelExpr; use crate::optimizer::ir::SExpr; use crate::parse_computed_expr; use crate::plans::BoundColumnRef; @@ -283,11 +284,21 @@ impl PhysicalPlanBuilder { })); } + let already_enable_lazy_read = { + let settings = self.ctx.get_settings(); + let lazy_read_threshold = settings.get_nondeterministic_update_lazy_read_threshold()?; + let rel_expr = RelExpr::with_s_expr(s_expr); + let cardinality = rel_expr.derive_cardinality_child(0)?; + + lazy_read_threshold != 0 && lazy_read_threshold >= cardinality.cardinality as u64 + }; + // Construct row fetch plan for lazy columns. - if let Some(lazy_columns) = self - .metadata - .read() - .get_table_lazy_columns(target_table_index) + if !already_enable_lazy_read + && let Some(lazy_columns) = self + .metadata + .read() + .get_table_lazy_columns(target_table_index) && !lazy_columns.is_empty() { plan = PhysicalPlan::RowFetch(build_mutation_row_fetch( diff --git a/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs b/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs index 3a5aca4f7a309..434a240710a3d 100644 --- a/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs +++ b/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs @@ -14,13 +14,19 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::Projection; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; +use databend_common_expression::ROW_ID_COL_NAME; use crate::executor::explain::PlanStatsInfo; use crate::executor::PhysicalPlan; +use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::ir::SExpr; +use crate::ColumnEntry; +use crate::ColumnSet; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct RowFetch { @@ -46,3 +52,92 @@ impl RowFetch { Ok(DataSchemaRefExt::create(fields)) } } + +impl PhysicalPlanBuilder { + pub(crate) async fn build_row_fetch( + &mut self, + s_expr: &SExpr, + row_fetch: &crate::plans::RowFetch, + mut required: ColumnSet, + stat_info: PlanStatsInfo, + ) -> Result { + // 1. Prune unused Columns. + // Apply lazy. + required = required + .difference(&row_fetch.lazy_columns) + .cloned() + .collect::(); + + required.insert(row_fetch.row_id_index); + + // 2. Build physical plan. + let input_plan = self.build(s_expr.child(0)?, required).await?; + let metadata = self.metadata.read().clone(); + + // If `lazy_columns` is not empty, build a `RowFetch` plan on top of the `Limit` plan. + let input_schema = input_plan.output_schema()?; + + // Lazy materialization is enabled. + let row_id_col_index = metadata + .columns() + .iter() + .position(|col| col.name() == ROW_ID_COL_NAME) + .ok_or_else(|| ErrorCode::Internal("Internal column _row_id is not found"))?; + + let Ok(row_id_col_offset) = input_schema.index_of(&row_id_col_index.to_string()) else { + return Err(ErrorCode::Internal("Internal column _row_id is not found")); + }; + + let lazy_columns = row_fetch + .lazy_columns + .iter() + .filter(|index| !input_schema.has_field(&index.to_string())) // If the column is already in the input schema, we don't need to fetch it. + .cloned() + .collect::>(); + + if lazy_columns.is_empty() { + // If there is no lazy column, we don't need to build a `RowFetch` plan. + return Ok(input_plan); + } + + let mut has_inner_column = false; + let fetched_fields = lazy_columns + .iter() + .map(|index| { + let col = metadata.column(*index); + if let ColumnEntry::BaseTableColumn(c) = col { + if c.path_indices.is_some() { + has_inner_column = true; + } + } + DataField::new(&index.to_string(), col.data_type()) + }) + .collect(); + + let metadata = self.metadata.read(); + let source = metadata.get_table_source(&row_fetch.fetch_table_index); + debug_assert!(source.is_some()); + let source_info = source.cloned().unwrap(); + let table_schema = source_info.source_info.schema(); + let cols_to_fetch = Self::build_projection( + &metadata, + &table_schema, + lazy_columns.iter(), + has_inner_column, + true, + true, + false, + ); + + Ok(PhysicalPlan::RowFetch(RowFetch { + plan_id: 0, + input: Box::new(input_plan), + source: Box::new(source_info), + row_id_col_offset, + cols_to_fetch, + fetched_fields, + need_wrap_nullable: row_fetch.need_wrap_nullable, + stat_info: Some(stat_info), + })) + } +} diff --git a/src/query/sql/src/planner/binder/bind_mutation/update.rs b/src/query/sql/src/planner/binder/bind_mutation/update.rs index 56de12c2c2921..1b3239728dfb1 100644 --- a/src/query/sql/src/planner/binder/bind_mutation/update.rs +++ b/src/query/sql/src/planner/binder/bind_mutation/update.rs @@ -29,12 +29,14 @@ use crate::binder::bind_mutation::mutation_expression::MutationExpression; use crate::binder::util::TableIdentifier; use crate::binder::Binder; use crate::optimizer::ir::Matcher; +use crate::optimizer::ir::RelExpr; use crate::plans::AggregateFunction; use crate::plans::BoundColumnRef; use crate::plans::EvalScalar; use crate::plans::Plan; use crate::plans::RelOp; use crate::plans::RelOperator; +use crate::plans::RowFetch; use crate::plans::ScalarItem; use crate::plans::VisitorMut; use crate::BindContext; @@ -282,14 +284,36 @@ impl Binder { .collect(); let eval_scalar = EvalScalar { items }; - mutation.bind_context.aggregate_info.group_items = fields_bindings - .into_iter() - .chain(std::iter::once(row_id)) - .map(|column| ScalarItem { - index: column.index, - scalar: ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column }), - }) - .collect(); + mutation.bind_context.aggregate_info.group_items = vec![ScalarItem { + index: row_id.index, + scalar: ScalarExpr::BoundColumnRef(BoundColumnRef { + span: None, + column: row_id.clone(), + }), + }]; + + let enable_lazy_read = { + let settings = self.ctx.get_settings(); + let lazy_read_threshold = settings.get_nondeterministic_update_lazy_read_threshold()?; + let rel_expr = RelExpr::with_s_expr(s_expr); + let cardinality = rel_expr.derive_cardinality_child(0)?; + + lazy_read_threshold != 0 && lazy_read_threshold >= cardinality.cardinality as u64 + }; + + if mutation.strategy == MutationStrategy::Direct || !enable_lazy_read { + mutation + .bind_context + .aggregate_info + .group_items + .extend(fields_bindings.iter().map(|column| ScalarItem { + index: column.index, + scalar: ScalarExpr::BoundColumnRef(BoundColumnRef { + span: None, + column: column.clone(), + }), + })); + } for eval in &mut mutation.matched_evaluators { if let Some(expr) = &mut eval.condition { @@ -319,14 +343,21 @@ impl Binder { .collect(), ); - let aggr_expr = + let mut input = self.bind_aggregate(&mut mutation.bind_context, s_expr.unary_child().clone())?; - let input = if eval_scalar.items.is_empty() { - aggr_expr - } else { - aggr_expr.build_unary(Arc::new(eval_scalar.into())) - }; + if !eval_scalar.items.is_empty() { + input = input.build_unary(Arc::new(eval_scalar.into())); + } + + if mutation.strategy != MutationStrategy::Direct && enable_lazy_read { + input = input.build_unary(RelOperator::RowFetch(RowFetch { + need_wrap_nullable: false, + row_id_index: row_id.index, + lazy_columns: fields_bindings.iter().map(|x| x.index).collect(), + fetch_table_index: mutation.target_table_index, + })); + } let s_expr = Box::new(input.build_unary(Arc::new(mutation.into()))); let Plan::DataMutation { diff --git a/src/query/sql/src/planner/optimizer/ir/format.rs b/src/query/sql/src/planner/optimizer/ir/format.rs index f9613af6b35ef..634660d93e043 100644 --- a/src/query/sql/src/planner/optimizer/ir/format.rs +++ b/src/query/sql/src/planner/optimizer/ir/format.rs @@ -53,6 +53,7 @@ fn display_rel_op(rel_op: &RelOperator) -> String { RelOperator::Sort(_) => "Sort".to_string(), RelOperator::Limit(_) => "Limit".to_string(), RelOperator::UnionAll(_) => "UnionAll".to_string(), + RelOperator::RowFetch(_) => "RowFetch".to_string(), RelOperator::Exchange(op) => { format!("Exchange: ({})", match op { Exchange::Hash(scalars) => format!( diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs index 868b55f05e935..6869f401130eb 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs @@ -336,6 +336,7 @@ impl DPhpyOptimizer { | RelOperator::Aggregate(_) | RelOperator::Sort(_) | RelOperator::Limit(_) + | RelOperator::RowFetch(_) | RelOperator::EvalScalar(_) | RelOperator::Window(_) | RelOperator::Udf(_) diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs index eb645151c4230..e3fc8a4fd4499 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs @@ -139,7 +139,8 @@ pub async fn dynamic_sample( | RelOperator::Exchange(_) | RelOperator::Window(_) | RelOperator::Udf(_) - | RelOperator::AsyncFunction(_) => { + | RelOperator::AsyncFunction(_) + | RelOperator::RowFetch(_) => { dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await } } diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs index 079f80afcb9fe..795767759be74 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs @@ -318,12 +318,13 @@ impl SubqueryDecorrelatorOptimizer { Arc::new(self.optimize_sync(s_expr.right_child())?), )), - RelOperator::Limit(_) | RelOperator::Udf(_) | RelOperator::AsyncFunction(_) => { - Ok(SExpr::create_unary( - s_expr.plan.clone(), - Arc::new(self.optimize_sync(s_expr.unary_child())?), - )) - } + RelOperator::Limit(_) + | RelOperator::Udf(_) + | RelOperator::AsyncFunction(_) + | RelOperator::RowFetch(_) => Ok(SExpr::create_unary( + s_expr.plan.clone(), + Arc::new(self.optimize_sync(s_expr.unary_child())?), + )), RelOperator::DummyTableScan(_) | RelOperator::Scan(_) diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs index 46ebf4648860d..d626ee8fffd2c 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs @@ -152,7 +152,8 @@ fn find_group_by_keys( | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::MutationSource(_) - | RelOperator::CompactBlock(_) => {} + | RelOperator::CompactBlock(_) + | RelOperator::RowFetch(_) => {} } Ok(()) } diff --git a/src/query/sql/src/planner/plans/mod.rs b/src/query/sql/src/planner/plans/mod.rs index 2124d39c6aa79..4e9ef72f95a28 100644 --- a/src/query/sql/src/planner/plans/mod.rs +++ b/src/query/sql/src/planner/plans/mod.rs @@ -43,6 +43,7 @@ mod r_cte_scan; mod recluster; mod replace; mod revert_table; +mod row_fetch; mod scalar_expr; mod scan; mod set; @@ -90,6 +91,7 @@ pub use r_cte_scan::*; pub use recluster::*; pub use replace::Replace; pub use revert_table::RevertTablePlan; +pub use row_fetch::*; pub use scalar_expr::*; pub use scan::*; pub use set::*; diff --git a/src/query/sql/src/planner/plans/operator.rs b/src/query/sql/src/planner/plans/operator.rs index 092a010304ec0..5563c812de2a4 100644 --- a/src/query/sql/src/planner/plans/operator.rs +++ b/src/query/sql/src/planner/plans/operator.rs @@ -45,6 +45,7 @@ use crate::plans::Limit; use crate::plans::Mutation; use crate::plans::OptimizeCompactBlock as CompactBlock; use crate::plans::ProjectSet; +use crate::plans::RowFetch; use crate::plans::Scan; use crate::plans::Sort; use crate::plans::Udf; @@ -127,6 +128,7 @@ pub enum RelOp { MergeInto, CompactBlock, MutationSource, + RowFetch, } /// Relational operators @@ -155,6 +157,7 @@ pub enum RelOperator { ExpressionScan(ExpressionScan), CacheScan(CacheScan), Udf(Udf), + RowFetch(RowFetch), RecursiveCteScan(RecursiveCteScan), AsyncFunction(AsyncFunction), Mutation(Mutation), diff --git a/src/query/sql/src/planner/plans/operator_macros.rs b/src/query/sql/src/planner/plans/operator_macros.rs index 4b90525c53f43..50c1b7d414d2b 100644 --- a/src/query/sql/src/planner/plans/operator_macros.rs +++ b/src/query/sql/src/planner/plans/operator_macros.rs @@ -110,6 +110,7 @@ macro_rules! impl_match_rel_op { RelOperator::Mutation($rel_op) => $rel_op.$method($($arg),*), RelOperator::CompactBlock($rel_op) => $rel_op.$method($($arg),*), RelOperator::MutationSource($rel_op) => $rel_op.$method($($arg),*), + RelOperator::RowFetch($rel_op) => $rel_op.$method($($arg),*), } } } diff --git a/src/query/sql/src/planner/plans/row_fetch.rs b/src/query/sql/src/planner/plans/row_fetch.rs new file mode 100644 index 0000000000000..b5f162a4e5256 --- /dev/null +++ b/src/query/sql/src/planner/plans/row_fetch.rs @@ -0,0 +1,48 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; + +use crate::optimizer::ir::RelExpr; +use crate::optimizer::ir::RelationalProperty; +use crate::optimizer::ir::StatInfo; +use crate::plans::Operator; +use crate::plans::RelOp; +use crate::ColumnSet; +use crate::IndexType; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct RowFetch { + pub need_wrap_nullable: bool, + + pub lazy_columns: ColumnSet, + pub row_id_index: IndexType, + pub fetch_table_index: IndexType, +} + +impl Operator for RowFetch { + fn rel_op(&self) -> RelOp { + RelOp::RowFetch + } + + fn derive_relational_prop(&self, rel_expr: &RelExpr) -> Result> { + rel_expr.derive_relational_prop_child(0) + } + + fn derive_stats(&self, rel_expr: &RelExpr) -> Result> { + rel_expr.derive_cardinality_child(0) + } +} diff --git a/tests/sqllogictests/suites/crdb/nondeterministic_update2.test b/tests/sqllogictests/suites/crdb/nondeterministic_update2.test new file mode 100644 index 0000000000000..050f386d15762 --- /dev/null +++ b/tests/sqllogictests/suites/crdb/nondeterministic_update2.test @@ -0,0 +1,13 @@ +statement ok +set error_on_nondeterministic_update = 0; + +statement ok +set nondeterministic_update_lazy_read_threshold = 0; + +include ./update.test + +statement ok +unset error_on_nondeterministic_update; + +statement ok +unset nondeterministic_update_lazy_read_threshold; diff --git a/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test b/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test new file mode 100644 index 0000000000000..0f1ec162ad4ff --- /dev/null +++ b/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test @@ -0,0 +1,13 @@ +statement ok +set error_on_nondeterministic_update = 0; + +statement ok +set nondeterministic_update_lazy_read_threshold = 0; + +include ./update_cte.test + +statement ok +unset error_on_nondeterministic_update; + +statement ok +unset nondeterministic_update_lazy_read_threshold;