Skip to content

Commit 536187a

Browse files
authored
chore(query): check schema mismatch in projection render result (#17654)
* chore(query): check schema mismatch in projection render result * chore(query): check schema mismatch in projection render result * chore(query): add ScalarExpr::TypedConstantExpr * chore(query): add ScalarExpr::TypedConstantExpr * update * update
1 parent d4e0806 commit 536187a

File tree

26 files changed

+205
-93
lines changed

26 files changed

+205
-93
lines changed

src/query/expression/src/expression.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub enum RawExpr<Index: ColumnIndex = usize> {
4242
Constant {
4343
span: Span,
4444
scalar: Scalar,
45+
data_type: Option<DataType>,
4546
},
4647
ColumnRef {
4748
span: Span,
@@ -361,9 +362,14 @@ impl<Index: ColumnIndex> RawExpr<Index> {
361362
f: impl Fn(&Index) -> ToIndex + Copy,
362363
) -> RawExpr<ToIndex> {
363364
match self {
364-
RawExpr::Constant { span, scalar } => RawExpr::Constant {
365+
RawExpr::Constant {
366+
span,
367+
scalar,
368+
data_type,
369+
} => RawExpr::Constant {
365370
span: *span,
366371
scalar: scalar.clone(),
372+
data_type: data_type.clone(),
367373
},
368374
RawExpr::ColumnRef {
369375
span,

src/query/expression/src/type_check.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,17 @@ pub fn check<Index: ColumnIndex>(
4444
fn_registry: &FunctionRegistry,
4545
) -> Result<Expr<Index>> {
4646
match expr {
47-
RawExpr::Constant { span, scalar } => Ok(Expr::Constant {
47+
RawExpr::Constant {
48+
span,
49+
scalar,
50+
data_type,
51+
} => Ok(Expr::Constant {
4852
span: *span,
4953
scalar: scalar.clone(),
50-
data_type: scalar.as_ref().infer_data_type(),
54+
data_type: data_type
55+
.as_ref()
56+
.cloned()
57+
.unwrap_or_else(|| scalar.as_ref().infer_data_type()),
5158
}),
5259
RawExpr::ColumnRef {
5360
span,

src/query/expression/src/utils/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ pub fn cast_scalar(
9393
let raw_expr = RawExpr::Cast {
9494
span,
9595
is_try: false,
96-
expr: Box::new(RawExpr::Constant { span, scalar }),
96+
expr: Box::new(RawExpr::Constant {
97+
span,
98+
scalar,
99+
data_type: None,
100+
}),
97101
dest_type,
98102
};
99103
let expr = crate::type_check::check(&raw_expr, fn_registry)?;

src/query/functions/src/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ fn transform_expr(ast: AExpr, columns: &[(&str, DataType)]) -> RawExpr {
113113
AExpr::Literal { span, value } => RawExpr::Constant {
114114
span,
115115
scalar: transform_literal(value),
116+
data_type: None,
116117
},
117118
AExpr::ColumnRef {
118119
span,
@@ -374,6 +375,7 @@ fn transform_expr(ast: AExpr, columns: &[(&str, DataType)]) -> RawExpr {
374375
RawExpr::Constant {
375376
span,
376377
scalar: Scalar::String(key.name),
378+
data_type: Some(DataType::String),
377379
},
378380
]),
379381
MapAccessor::DotNumber { key } => {

src/query/service/src/pipelines/builders/builder_project.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,19 @@ impl PipelineBuilder {
4141
}
4242

4343
let mut projections = Vec::with_capacity(result_columns.len());
44-
4544
for column_binding in result_columns {
4645
let index = column_binding.index;
4746
projections.push(input_schema.index_of(index.to_string().as_str())?);
4847

49-
// TODO: open this check after fix grouping sets datatype mismatch
50-
// #[cfg(debug_assertions)]
51-
// {
52-
// let f = input_schema.field_with_name(index.to_string().as_str())?;
53-
// assert_eq!(
54-
// f.data_type(),
55-
// column_binding.data_type.as_ref(),
56-
// "Result projection schema mismatch"
57-
// );
58-
// }
48+
#[cfg(debug_assertions)]
49+
{
50+
let f = input_schema.field_with_name(index.to_string().as_str())?;
51+
assert_eq!(
52+
f.data_type(),
53+
column_binding.data_type.as_ref(),
54+
"Result projection schema mismatch"
55+
);
56+
}
5957
}
6058
let num_input_columns = input_schema.num_fields();
6159
pipeline.add_transformer(|| {

src/query/service/src/pipelines/processors/transforms/hash_join/util.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ pub(crate) fn inlist_filter(
7777
let array = RawExpr::Constant {
7878
span: None,
7979
scalar: build_column.as_scalar().unwrap().to_owned(),
80+
data_type: None,
8081
};
8182

8283
let args = vec![array, raw_probe_key];
@@ -120,6 +121,7 @@ pub(crate) fn dedup_build_key_column(
120121
list.push(RawExpr::Constant {
121122
span: None,
122123
scalar: value.to_owned(),
124+
data_type: None,
123125
})
124126
}
125127
let array = RawExpr::FunctionCall {
@@ -245,10 +247,12 @@ pub(crate) fn min_max_filter(
245247
let min = RawExpr::Constant {
246248
span: None,
247249
scalar: min,
250+
data_type: None,
248251
};
249252
let max = RawExpr::Constant {
250253
span: None,
251254
scalar: max,
255+
data_type: None,
252256
};
253257
// Make gte and lte function
254258
let gte_func = RawExpr::FunctionCall {

src/query/service/tests/it/storages/fuse/statistics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ async fn test_ft_cluster_stats_with_stats() -> databend_common_exception::Result
385385
RawExpr::Constant {
386386
span: None,
387387
scalar: Scalar::Number(NumberScalar::UInt64(1)),
388+
data_type: None,
388389
},
389390
],
390391
};

src/query/sql/src/executor/physical_plans/physical_window.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ impl PhysicalPlanBuilder {
220220
expr: Box::new(RawExpr::Constant {
221221
span: w.span,
222222
scalar: scalar.clone(),
223+
data_type: None,
223224
}),
224225
dest_type: common_ty.clone(),
225226
};

src/query/sql/src/planner/binder/project.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ impl Binder {
135135
..
136136
}) = item.scalar.clone()
137137
{
138-
if typ == SubqueryType::Any || typ == SubqueryType::Exists {
138+
if typ == SubqueryType::Any {
139139
ScalarExpr::SubqueryExpr(SubqueryExpr {
140140
span,
141141
typ,
@@ -172,14 +172,19 @@ impl Binder {
172172
child: SExpr,
173173
) -> Result<SExpr> {
174174
bind_context.set_expr_context(ExprContext::SelectClause);
175-
175+
let mut columns = columns.to_vec();
176176
let mut scalars = scalars
177177
.iter()
178178
.map(|(_, item)| {
179179
if bind_context.in_grouping {
180180
let mut scalar = item.scalar.clone();
181181
let mut grouping_checker = GroupingChecker::new(bind_context);
182182
grouping_checker.visit(&mut scalar)?;
183+
184+
if let Some(x) = columns.iter_mut().find(|x| x.index == item.index) {
185+
x.data_type = Box::new(scalar.data_type()?);
186+
}
187+
183188
Ok(ScalarItem {
184189
scalar,
185190
index: item.index,
@@ -198,12 +203,9 @@ impl Binder {
198203

199204
scalars.sort_by_key(|s| s.index);
200205
let eval_scalar = EvalScalar { items: scalars };
201-
202206
let new_expr = SExpr::create_unary(Arc::new(eval_scalar.into()), Arc::new(child));
203-
204207
// Set output columns
205-
bind_context.columns = columns.to_vec();
206-
208+
bind_context.columns = columns;
207209
Ok(new_expr)
208210
}
209211

src/query/sql/src/planner/format/display.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ pub fn format_scalar(scalar: &ScalarExpr) -> String {
243243
}
244244
}
245245
ScalarExpr::ConstantExpr(constant) => constant.value.to_string(),
246+
ScalarExpr::TypedConstantExpr(constant, _) => constant.value.to_string(),
246247
ScalarExpr::WindowFunction(win) => win.display_name.clone(),
247248
ScalarExpr::AggregateFunction(agg) => agg.display_name.clone(),
248249
ScalarExpr::LambdaFunction(lambda) => {

0 commit comments

Comments
 (0)