Skip to content

Commit 9cb9d55

Browse files
committed
refactor(query): support lazy read for update from
1 parent cad4a8d commit 9cb9d55

File tree

4 files changed

+21
-7
lines changed

4 files changed

+21
-7
lines changed

src/query/sql/src/executor/physical_plans/physical_mutation.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use crate::executor::physical_plans::MutationOrganize;
5858
use crate::executor::physical_plans::MutationSplit;
5959
use crate::executor::physical_plans::RowFetch;
6060
use crate::executor::PhysicalPlanBuilder;
61+
use crate::optimizer::ir::RelExpr;
6162
use crate::optimizer::ir::SExpr;
6263
use crate::parse_computed_expr;
6364
use crate::plans::BoundColumnRef;
@@ -283,11 +284,21 @@ impl PhysicalPlanBuilder {
283284
}));
284285
}
285286

287+
let already_enable_lazy_read = {
288+
let settings = self.ctx.get_settings();
289+
let lazy_read_threshold = settings.get_nondeterministic_update_lazy_read_threshold()?;
290+
let rel_expr = RelExpr::with_s_expr(s_expr);
291+
let cardinality = rel_expr.derive_cardinality_child(0)?;
292+
293+
lazy_read_threshold != 0 && lazy_read_threshold >= cardinality.cardinality as u64
294+
};
295+
286296
// Construct row fetch plan for lazy columns.
287-
if let Some(lazy_columns) = self
288-
.metadata
289-
.read()
290-
.get_table_lazy_columns(target_table_index)
297+
if !already_enable_lazy_read
298+
&& let Some(lazy_columns) = self
299+
.metadata
300+
.read()
301+
.get_table_lazy_columns(target_table_index)
291302
&& !lazy_columns.is_empty()
292303
{
293304
plan = PhysicalPlan::RowFetch(build_mutation_row_fetch(

src/query/sql/src/executor/physical_plans/physical_row_fetch.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ impl PhysicalPlanBuilder {
8888
return Err(ErrorCode::Internal("Internal column _row_id is not found"));
8989
};
9090

91-
let lazy_columns = metadata
92-
.lazy_columns()
91+
let lazy_columns = row_fetch
92+
.lazy_columns
9393
.iter()
9494
.filter(|index| !input_schema.has_field(&index.to_string())) // If the column is already in the input schema, we don't need to fetch it.
9595
.cloned()
@@ -114,7 +114,8 @@ impl PhysicalPlanBuilder {
114114
})
115115
.collect();
116116

117-
let source = input_plan.try_find_single_data_source();
117+
let metadata = self.metadata.read();
118+
let source = metadata.get_table_source(&row_fetch.fetch_table_index);
118119
debug_assert!(source.is_some());
119120
let source_info = source.cloned().unwrap();
120121
let table_schema = source_info.source_info.schema();

src/query/sql/src/planner/binder/bind_mutation/update.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ impl Binder {
355355
need_wrap_nullable: false,
356356
row_id_index: row_id.index,
357357
lazy_columns: fields_bindings.iter().map(|x| x.index).collect(),
358+
fetch_table_index: mutation.target_table_index,
358359
}));
359360
}
360361

src/query/sql/src/planner/plans/row_fetch.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub struct RowFetch {
3030

3131
pub lazy_columns: ColumnSet,
3232
pub row_id_index: IndexType,
33+
pub fetch_table_index: IndexType,
3334
}
3435

3536
impl Operator for RowFetch {

0 commit comments

Comments
 (0)