Skip to content

refactor(query): support lazy read for update from #18394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ pub struct TransformHashJoinProbe {
partition_id_to_restore: usize,

step: Step,
step_logs: Vec<Step>,
}

impl TransformHashJoinProbe {
Expand Down Expand Up @@ -176,7 +175,6 @@ impl TransformHashJoinProbe {
spiller,
partition_id_to_restore: 0,
step: Step::Async(AsyncStep::WaitBuild),
step_logs: vec![Step::Async(AsyncStep::WaitBuild)],
}))
}

Expand All @@ -192,7 +190,6 @@ impl TransformHashJoinProbe {
}
};
self.step = step.clone();
self.step_logs.push(step);
Ok(event)
}

Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("nondeterministic_update_lazy_read_threshold", DefaultSettingValue {
value: UserSettingValue::UInt64(18446744073709551615),
desc: "Sets the maximum rows in a query to enable lazy read optimization when updating a multi-joined row. Setting it to 0 disables the optimization.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("enable_auto_vacuum", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Whether to automatically trigger VACUUM operations on tables (using vacuum2)",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,4 +1013,8 @@ impl Settings {
pub fn get_enable_parallel_union_all(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_parallel_union_all")? == 1)
}

pub fn get_nondeterministic_update_lazy_read_threshold(&self) -> Result<u64> {
self.try_get_u64("nondeterministic_update_lazy_read_threshold")
}
}
4 changes: 4 additions & 0 deletions src/query/sql/src/executor/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ impl PhysicalPlanBuilder {
self.build_mutation_source(mutation_source).await
}
RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await,
RelOperator::RowFetch(row_fetch) => {
self.build_row_fetch(s_expr, row_fetch, required, stat_info)
.await
}
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/query/sql/src/executor/physical_plans/physical_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::executor::physical_plans::MutationOrganize;
use crate::executor::physical_plans::MutationSplit;
use crate::executor::physical_plans::RowFetch;
use crate::executor::PhysicalPlanBuilder;
use crate::optimizer::ir::RelExpr;
use crate::optimizer::ir::SExpr;
use crate::parse_computed_expr;
use crate::plans::BoundColumnRef;
Expand Down Expand Up @@ -283,11 +284,21 @@ impl PhysicalPlanBuilder {
}));
}

let already_enable_lazy_read = {
let settings = self.ctx.get_settings();
let lazy_read_threshold = settings.get_nondeterministic_update_lazy_read_threshold()?;
let rel_expr = RelExpr::with_s_expr(s_expr);
let cardinality = rel_expr.derive_cardinality_child(0)?;

lazy_read_threshold != 0 && lazy_read_threshold >= cardinality.cardinality as u64
};

// Construct row fetch plan for lazy columns.
if let Some(lazy_columns) = self
.metadata
.read()
.get_table_lazy_columns(target_table_index)
if !already_enable_lazy_read
&& let Some(lazy_columns) = self
.metadata
.read()
.get_table_lazy_columns(target_table_index)
&& !lazy_columns.is_empty()
{
plan = PhysicalPlan::RowFetch(build_mutation_row_fetch(
Expand Down
95 changes: 95 additions & 0 deletions src/query/sql/src/executor/physical_plans/physical_row_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@

use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::Projection;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::ROW_ID_COL_NAME;

use crate::executor::explain::PlanStatsInfo;
use crate::executor::PhysicalPlan;
use crate::executor::PhysicalPlanBuilder;
use crate::optimizer::ir::SExpr;
use crate::ColumnEntry;
use crate::ColumnSet;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct RowFetch {
Expand All @@ -46,3 +52,92 @@ impl RowFetch {
Ok(DataSchemaRefExt::create(fields))
}
}

impl PhysicalPlanBuilder {
pub(crate) async fn build_row_fetch(
&mut self,
s_expr: &SExpr,
row_fetch: &crate::plans::RowFetch,
mut required: ColumnSet,
stat_info: PlanStatsInfo,
) -> Result<PhysicalPlan> {
// 1. Prune unused Columns.
// Apply lazy.
required = required
.difference(&row_fetch.lazy_columns)
.cloned()
.collect::<ColumnSet>();

required.insert(row_fetch.row_id_index);

// 2. Build physical plan.
let input_plan = self.build(s_expr.child(0)?, required).await?;
let metadata = self.metadata.read().clone();

// If `lazy_columns` is not empty, build a `RowFetch` plan on top of the `Limit` plan.
let input_schema = input_plan.output_schema()?;

// Lazy materialization is enabled.
let row_id_col_index = metadata
.columns()
.iter()
.position(|col| col.name() == ROW_ID_COL_NAME)
.ok_or_else(|| ErrorCode::Internal("Internal column _row_id is not found"))?;

let Ok(row_id_col_offset) = input_schema.index_of(&row_id_col_index.to_string()) else {
return Err(ErrorCode::Internal("Internal column _row_id is not found"));
};

let lazy_columns = row_fetch
.lazy_columns
.iter()
.filter(|index| !input_schema.has_field(&index.to_string())) // If the column is already in the input schema, we don't need to fetch it.
.cloned()
.collect::<Vec<_>>();

if lazy_columns.is_empty() {
// If there is no lazy column, we don't need to build a `RowFetch` plan.
return Ok(input_plan);
}

let mut has_inner_column = false;
let fetched_fields = lazy_columns
.iter()
.map(|index| {
let col = metadata.column(*index);
if let ColumnEntry::BaseTableColumn(c) = col {
if c.path_indices.is_some() {
has_inner_column = true;
}
}
DataField::new(&index.to_string(), col.data_type())
})
.collect();

let metadata = self.metadata.read();
let source = metadata.get_table_source(&row_fetch.fetch_table_index);
debug_assert!(source.is_some());
let source_info = source.cloned().unwrap();
let table_schema = source_info.source_info.schema();
let cols_to_fetch = Self::build_projection(
&metadata,
&table_schema,
lazy_columns.iter(),
has_inner_column,
true,
true,
false,
);

Ok(PhysicalPlan::RowFetch(RowFetch {
plan_id: 0,
input: Box::new(input_plan),
source: Box::new(source_info),
row_id_col_offset,
cols_to_fetch,
fetched_fields,
need_wrap_nullable: row_fetch.need_wrap_nullable,
stat_info: Some(stat_info),
}))
}
}
59 changes: 45 additions & 14 deletions src/query/sql/src/planner/binder/bind_mutation/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ use crate::binder::bind_mutation::mutation_expression::MutationExpression;
use crate::binder::util::TableIdentifier;
use crate::binder::Binder;
use crate::optimizer::ir::Matcher;
use crate::optimizer::ir::RelExpr;
use crate::plans::AggregateFunction;
use crate::plans::BoundColumnRef;
use crate::plans::EvalScalar;
use crate::plans::Plan;
use crate::plans::RelOp;
use crate::plans::RelOperator;
use crate::plans::RowFetch;
use crate::plans::ScalarItem;
use crate::plans::VisitorMut;
use crate::BindContext;
Expand Down Expand Up @@ -282,14 +284,36 @@ impl Binder {
.collect();
let eval_scalar = EvalScalar { items };

mutation.bind_context.aggregate_info.group_items = fields_bindings
.into_iter()
.chain(std::iter::once(row_id))
.map(|column| ScalarItem {
index: column.index,
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column }),
})
.collect();
mutation.bind_context.aggregate_info.group_items = vec![ScalarItem {
index: row_id.index,
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: row_id.clone(),
}),
}];

let enable_lazy_read = {
let settings = self.ctx.get_settings();
let lazy_read_threshold = settings.get_nondeterministic_update_lazy_read_threshold()?;
let rel_expr = RelExpr::with_s_expr(s_expr);
let cardinality = rel_expr.derive_cardinality_child(0)?;

lazy_read_threshold != 0 && lazy_read_threshold >= cardinality.cardinality as u64
};

if mutation.strategy == MutationStrategy::Direct || !enable_lazy_read {
mutation
.bind_context
.aggregate_info
.group_items
.extend(fields_bindings.iter().map(|column| ScalarItem {
index: column.index,
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: column.clone(),
}),
}));
}

for eval in &mut mutation.matched_evaluators {
if let Some(expr) = &mut eval.condition {
Expand Down Expand Up @@ -319,14 +343,21 @@ impl Binder {
.collect(),
);

let aggr_expr =
let mut input =
self.bind_aggregate(&mut mutation.bind_context, s_expr.unary_child().clone())?;

let input = if eval_scalar.items.is_empty() {
aggr_expr
} else {
aggr_expr.build_unary(Arc::new(eval_scalar.into()))
};
if !eval_scalar.items.is_empty() {
input = input.build_unary(Arc::new(eval_scalar.into()));
}

if mutation.strategy != MutationStrategy::Direct && enable_lazy_read {
input = input.build_unary(RelOperator::RowFetch(RowFetch {
need_wrap_nullable: false,
row_id_index: row_id.index,
lazy_columns: fields_bindings.iter().map(|x| x.index).collect(),
fetch_table_index: mutation.target_table_index,
}));
}

let s_expr = Box::new(input.build_unary(Arc::new(mutation.into())));
let Plan::DataMutation {
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/optimizer/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ fn display_rel_op(rel_op: &RelOperator) -> String {
RelOperator::Sort(_) => "Sort".to_string(),
RelOperator::Limit(_) => "Limit".to_string(),
RelOperator::UnionAll(_) => "UnionAll".to_string(),
RelOperator::RowFetch(_) => "RowFetch".to_string(),
RelOperator::Exchange(op) => {
format!("Exchange: ({})", match op {
Exchange::Hash(scalars) => format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ impl DPhpyOptimizer {
| RelOperator::Aggregate(_)
| RelOperator::Sort(_)
| RelOperator::Limit(_)
| RelOperator::RowFetch(_)
| RelOperator::EvalScalar(_)
| RelOperator::Window(_)
| RelOperator::Udf(_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ pub async fn dynamic_sample(
| RelOperator::Exchange(_)
| RelOperator::Window(_)
| RelOperator::Udf(_)
| RelOperator::AsyncFunction(_) => {
| RelOperator::AsyncFunction(_)
| RelOperator::RowFetch(_) => {
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,13 @@ impl SubqueryDecorrelatorOptimizer {
Arc::new(self.optimize_sync(s_expr.right_child())?),
)),

RelOperator::Limit(_) | RelOperator::Udf(_) | RelOperator::AsyncFunction(_) => {
Ok(SExpr::create_unary(
s_expr.plan.clone(),
Arc::new(self.optimize_sync(s_expr.unary_child())?),
))
}
RelOperator::Limit(_)
| RelOperator::Udf(_)
| RelOperator::AsyncFunction(_)
| RelOperator::RowFetch(_) => Ok(SExpr::create_unary(
s_expr.plan.clone(),
Arc::new(self.optimize_sync(s_expr.unary_child())?),
)),

RelOperator::DummyTableScan(_)
| RelOperator::Scan(_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ fn find_group_by_keys(
| RelOperator::RecursiveCteScan(_)
| RelOperator::Mutation(_)
| RelOperator::MutationSource(_)
| RelOperator::CompactBlock(_) => {}
| RelOperator::CompactBlock(_)
| RelOperator::RowFetch(_) => {}
}
Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod r_cte_scan;
mod recluster;
mod replace;
mod revert_table;
mod row_fetch;
mod scalar_expr;
mod scan;
mod set;
Expand Down Expand Up @@ -90,6 +91,7 @@ pub use r_cte_scan::*;
pub use recluster::*;
pub use replace::Replace;
pub use revert_table::RevertTablePlan;
pub use row_fetch::*;
pub use scalar_expr::*;
pub use scan::*;
pub use set::*;
Expand Down
3 changes: 3 additions & 0 deletions src/query/sql/src/planner/plans/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::plans::Limit;
use crate::plans::Mutation;
use crate::plans::OptimizeCompactBlock as CompactBlock;
use crate::plans::ProjectSet;
use crate::plans::RowFetch;
use crate::plans::Scan;
use crate::plans::Sort;
use crate::plans::Udf;
Expand Down Expand Up @@ -127,6 +128,7 @@ pub enum RelOp {
MergeInto,
CompactBlock,
MutationSource,
RowFetch,
}

/// Relational operators
Expand Down Expand Up @@ -155,6 +157,7 @@ pub enum RelOperator {
ExpressionScan(ExpressionScan),
CacheScan(CacheScan),
Udf(Udf),
RowFetch(RowFetch),
RecursiveCteScan(RecursiveCteScan),
AsyncFunction(AsyncFunction),
Mutation(Mutation),
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/plans/operator_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ macro_rules! impl_match_rel_op {
RelOperator::Mutation($rel_op) => $rel_op.$method($($arg),*),
RelOperator::CompactBlock($rel_op) => $rel_op.$method($($arg),*),
RelOperator::MutationSource($rel_op) => $rel_op.$method($($arg),*),
RelOperator::RowFetch($rel_op) => $rel_op.$method($($arg),*),
}
}
}
Expand Down
Loading
Loading