Skip to content

Commit 0b77991

Browse files
authored
refactor(query): prune unused flatten result columns (#13935)
* refactor(query): prune unused flatten result columns * fix tests * fix tests * fix * rewrite tuple * fix * fix tests * fix tests * fix * fix
1 parent 4e6515b commit 0b77991

File tree

8 files changed

+422
-134
lines changed

8 files changed

+422
-134
lines changed

src/query/functions/src/srfs/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use common_expression::FunctionRegistry;
16-
1715
mod array;
1816
mod variant;
1917

20-
pub use variant::FlattenGenerator;
21-
pub use variant::FlattenMode;
18+
use common_expression::FunctionRegistry;
2219

2320
pub fn register(registry: &mut FunctionRegistry) {
2421
array::register(registry);

src/query/functions/src/srfs/variant.rs

Lines changed: 252 additions & 89 deletions
Large diffs are not rendered by default.

src/query/service/src/pipelines/processors/transforms/transform_srf.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,14 @@ impl BlockingTransform for TransformSRF {
233233
}
234234
}
235235
} else {
236+
let data_type = srf_expr.data_type();
237+
let inner_tys = data_type.as_tuple().unwrap();
238+
let inner_vals = vec![ScalarRef::Null; inner_tys.len()];
236239
row_result = Value::Column(
237240
ColumnBuilder::repeat(
238-
&ScalarRef::Tuple(vec![ScalarRef::Null]),
241+
&ScalarRef::Tuple(inner_vals),
239242
self.num_rows[i],
240-
srf_expr.data_type(),
243+
data_type,
241244
)
242245
.build(),
243246
);

src/query/sql/src/executor/physical_plans/physical_eval_scalar.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use common_exception::Result;
1618
use common_expression::ConstantFolder;
1719
use common_expression::DataField;
@@ -25,6 +27,10 @@ use crate::executor::physical_plan::PhysicalPlan;
2527
use crate::executor::physical_plan_builder::PhysicalPlanBuilder;
2628
use crate::optimizer::ColumnSet;
2729
use crate::optimizer::SExpr;
30+
use crate::plans::FunctionCall;
31+
use crate::plans::ProjectSet;
32+
use crate::plans::RelOperator;
33+
use crate::plans::ScalarExpr;
2834
use crate::IndexType;
2935
use crate::TypeCheck;
3036

@@ -91,7 +97,17 @@ impl PhysicalPlanBuilder {
9197
if used.is_empty() {
9298
self.build(s_expr.child(0)?, required).await
9399
} else {
94-
let input = self.build(s_expr.child(0)?, required).await?;
100+
let child = s_expr.child(0)?;
101+
let input = if let RelOperator::ProjectSet(project_set) = child.plan() {
102+
let new_project_set =
103+
self.prune_flatten_columns(eval_scalar, project_set, &required);
104+
let mut new_child = child.clone();
105+
new_child.plan = Arc::new(new_project_set.into());
106+
self.build(&new_child, required).await?
107+
} else {
108+
self.build(child, required).await?
109+
};
110+
95111
let eval_scalar = crate::plans::EvalScalar { items: used };
96112
self.create_eval_scalar(&eval_scalar, column_projections, input, stat_info)
97113
}
@@ -149,4 +165,46 @@ impl PhysicalPlanBuilder {
149165
stat_info: Some(stat_info),
150166
}))
151167
}
168+
169+
// The flatten function returns a tuple, which contains 6 columns.
170+
// Only keep columns required by parent plan, other columns can be pruned
171+
// to reduce the memory usage.
172+
fn prune_flatten_columns(
173+
&mut self,
174+
eval_scalar: &crate::plans::EvalScalar,
175+
project_set: &ProjectSet,
176+
required: &ColumnSet,
177+
) -> ProjectSet {
178+
let mut project_set = project_set.clone();
179+
for srf_item in &mut project_set.srfs {
180+
if let ScalarExpr::FunctionCall(srf_func) = &srf_item.scalar {
181+
if srf_func.func_name == "flatten" {
182+
// Store the columns required by the parent plan in params.
183+
let mut params = Vec::new();
184+
for item in &eval_scalar.items {
185+
if !required.contains(&item.index) {
186+
continue;
187+
}
188+
if let ScalarExpr::FunctionCall(func) = &item.scalar {
189+
if func.func_name == "get" && !func.arguments.is_empty() {
190+
if let ScalarExpr::BoundColumnRef(column_ref) = &func.arguments[0] {
191+
if column_ref.column.index == srf_item.index {
192+
params.push(func.params[0]);
193+
}
194+
}
195+
}
196+
}
197+
}
198+
199+
srf_item.scalar = ScalarExpr::FunctionCall(FunctionCall {
200+
span: srf_func.span,
201+
func_name: srf_func.func_name.clone(),
202+
params,
203+
arguments: srf_func.arguments.clone(),
204+
});
205+
}
206+
}
207+
}
208+
project_set
209+
}
152210
}

src/query/sql/src/planner/binder/project_set.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use common_ast::ast::Lambda;
2020
use common_ast::ast::Literal;
2121
use common_ast::ast::Window;
2222
use common_ast::Visitor;
23-
use common_exception::ErrorCode;
2423
use common_exception::Result;
2524
use common_exception::Span;
2625
use common_expression::FunctionKind;
@@ -148,12 +147,6 @@ impl Binder {
148147
let srf_expr = srf_scalar.as_expr()?;
149148
let return_types = srf_expr.data_type().as_tuple().unwrap();
150149

151-
if return_types.len() > 1 {
152-
return Err(ErrorCode::Unimplemented(
153-
"set-returning functions with more than one return type are not supported yet",
154-
));
155-
}
156-
157150
// Add result column to metadata
158151
let column_index = self
159152
.metadata
@@ -173,20 +166,27 @@ impl Binder {
173166
};
174167
items.push(item);
175168

176-
// Flatten the tuple fields of the srfs to the top level columns
177-
// TODO(andylokandy/leisky): support multiple return types
178-
let flatten_result = ScalarExpr::FunctionCall(FunctionCall {
179-
span: srf.span(),
180-
func_name: "get".to_string(),
181-
params: vec![1],
182-
arguments: vec![ScalarExpr::BoundColumnRef(BoundColumnRef {
169+
// If tuple has more than one field, return the tuple column,
170+
// otherwise, extract the tuple field to top level column.
171+
let result_column = if return_types.len() > 1 {
172+
ScalarExpr::BoundColumnRef(BoundColumnRef {
183173
span: srf.span(),
184174
column,
185-
})],
186-
});
175+
})
176+
} else {
177+
ScalarExpr::FunctionCall(FunctionCall {
178+
span: srf.span(),
179+
func_name: "get".to_string(),
180+
params: vec![1],
181+
arguments: vec![ScalarExpr::BoundColumnRef(BoundColumnRef {
182+
span: srf.span(),
183+
column,
184+
})],
185+
})
186+
};
187187

188188
// Add the srf to bind context, so we can replace the srfs later.
189-
bind_context.srfs.insert(srf.to_string(), flatten_result);
189+
bind_context.srfs.insert(srf.to_string(), result_column);
190190
}
191191

192192
let project_set = ProjectSet { srfs: items };

src/query/sql/src/planner/binder/table.rs

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ impl Binder {
367367
plan.items.len()
368368
)));
369369
}
370-
// Delete result tuple column
370+
// Delete srf result tuple column, extract tuple inner columns instead
371371
let _ = bind_context.columns.pop();
372372
let scalar = &plan.items[0].scalar;
373373

@@ -462,28 +462,34 @@ impl Binder {
462462
.bind_project_set(&mut bind_context, &srfs, child)
463463
.await?;
464464

465-
if let Some((_, flatten_scalar)) = bind_context.srfs.remove(&srf.to_string()) {
466-
// Add result column to metadata
467-
let data_type = flatten_scalar.data_type()?;
468-
let index = self
469-
.metadata
470-
.write()
471-
.add_derived_column(srf.to_string(), data_type.clone());
472-
let column_binding = ColumnBindingBuilder::new(
473-
srf.to_string(),
474-
index,
475-
Box::new(data_type),
476-
Visibility::Visible,
477-
)
478-
.build();
479-
bind_context.add_column_binding(column_binding);
465+
if let Some((_, srf_result)) = bind_context.srfs.remove(&srf.to_string()) {
466+
let column_binding =
467+
if let ScalarExpr::BoundColumnRef(column_ref) = &srf_result {
468+
column_ref.column.clone()
469+
} else {
470+
// Add result column to metadata
471+
let data_type = srf_result.data_type()?;
472+
let index = self
473+
.metadata
474+
.write()
475+
.add_derived_column(srf.to_string(), data_type.clone());
476+
ColumnBindingBuilder::new(
477+
srf.to_string(),
478+
index,
479+
Box::new(data_type),
480+
Visibility::Visible,
481+
)
482+
.build()
483+
};
480484

481485
let eval_scalar = EvalScalar {
482486
items: vec![ScalarItem {
483-
scalar: flatten_scalar,
484-
index,
487+
scalar: srf_result,
488+
index: column_binding.index,
485489
}],
486490
};
491+
// Add srf result column
492+
bind_context.add_column_binding(column_binding);
487493

488494
let flatten_expr =
489495
SExpr::create_unary(Arc::new(eval_scalar.into()), Arc::new(srf_expr));
@@ -505,9 +511,8 @@ impl Binder {
505511

506512
return Ok((new_expr, bind_context));
507513
} else {
508-
return Err(
509-
ErrorCode::Internal("srf flatten result is missing").set_span(*span)
510-
);
514+
return Err(ErrorCode::Internal("lateral join bind project_set failed")
515+
.set_span(*span));
511516
}
512517
} else {
513518
return Err(ErrorCode::InvalidArgument(format!(

tests/sqllogictests/suites/mode/standalone/explain/project_set.test

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,53 @@ ProjectSet
8484
├── push downs: [filters: [], limit: NONE]
8585
└── estimated rows: 10.00
8686

87+
statement ok
88+
drop table if exists t;
89+
90+
statement ok
91+
create table t(a int, b variant);
92+
93+
query T
94+
EXPLAIN SELECT t.a, f.seq, f.value FROM t, LATERAL FLATTEN(input => t.b) f
95+
----
96+
EvalScalar
97+
├── output columns: [t.a (#0), seq (#3), value (#7)]
98+
├── expressions: [get(1)(flatten (#2)), get(5)(flatten (#2))]
99+
├── estimated rows: 0.00
100+
└── ProjectSet
101+
├── output columns: [t.a (#0), flatten (#2)]
102+
├── estimated rows: 0.00
103+
├── set returning functions: flatten(1, 5)(t.b (#1))
104+
└── TableScan
105+
├── table: default.project_set.t
106+
├── output columns: [a (#0), b (#1)]
107+
├── read rows: 0
108+
├── read bytes: 0
109+
├── partitions total: 0
110+
├── partitions scanned: 0
111+
├── push downs: [filters: [], limit: NONE]
112+
└── estimated rows: 0.00
113+
114+
query T
115+
EXPLAIN SELECT json_each(t.b), unnest(t.b) FROM t
116+
----
117+
EvalScalar
118+
├── output columns: [json_each (#2), unnest(t.b) (#4)]
119+
├── expressions: [get(1)(unnest (#3))]
120+
├── estimated rows: 0.00
121+
└── ProjectSet
122+
├── output columns: [json_each (#2), unnest (#3)]
123+
├── estimated rows: 0.00
124+
├── set returning functions: json_each(t.b (#1)), unnest(t.b (#1))
125+
└── TableScan
126+
├── table: default.project_set.t
127+
├── output columns: [b (#1)]
128+
├── read rows: 0
129+
├── read bytes: 0
130+
├── partitions total: 0
131+
├── partitions scanned: 0
132+
├── push downs: [filters: [], limit: NONE]
133+
└── estimated rows: 0.00
134+
87135
statement ok
88136
drop database project_set

tests/sqllogictests/suites/query/lateral.test

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,5 +156,19 @@ GROUP BY p.id ORDER BY p.id
156156
12712555 2
157157
98127771 2
158158

159+
query IT
160+
SELECT u.user_id, f.value from
161+
user_activities u,
162+
LATERAL unnest(u.activities) f
163+
----
164+
1 "reading"
165+
1 "swimming"
166+
1 "cycling"
167+
2 "painting"
168+
2 "running"
169+
3 "cooking"
170+
3 "climbing"
171+
3 "writing"
172+
159173
statement ok
160174
drop database test_lateral

0 commit comments

Comments
 (0)