@@ -37,7 +37,7 @@ fn projection_above_limit(plan: &LogicalPlan) -> Result<LogicalPlan> {
37
37
LogicalPlan :: Limit { n, input } => {
38
38
let schema: & Arc < DFSchema > = input. schema ( ) ;
39
39
40
- let lift_up_result = lift_up_expensive_projections ( input, HashSet :: new ( ) ) ;
40
+ let lift_up_result = lift_up_expensive_projections ( input, ColumnRecorder :: default ( ) ) ;
41
41
pal_debug ! ( "lift_up_res: {:?}" , lift_up_result) ;
42
42
match lift_up_result {
43
43
Ok ( ( inner_plan, None ) ) => Ok ( LogicalPlan :: Limit {
@@ -107,8 +107,11 @@ fn projection_above_limit(plan: &LogicalPlan) -> Result<LogicalPlan> {
107
107
}
108
108
}
109
109
110
+ #[ derive( Default ) ]
110
111
struct ColumnRecorder {
111
- columns : HashSet < Column > ,
112
+ /// We use indexmap IndexSet because we want iteration order to be deterministic and
113
+ /// specifically, to match left-to-right insertion order.
114
+ columns : indexmap:: IndexSet < Column > ,
112
115
}
113
116
114
117
impl ExpressionVisitor for ColumnRecorder {
@@ -180,21 +183,16 @@ fn looks_expensive(ex: &Expr) -> Result<bool> {
180
183
181
184
fn lift_up_expensive_projections (
182
185
plan : & LogicalPlan ,
183
- used_columns : HashSet < Column > ,
186
+ used_columns : ColumnRecorder ,
184
187
) -> Result < ( LogicalPlan , Option < Vec < Expr > > ) > {
185
188
match plan {
186
189
LogicalPlan :: Sort { expr, input } => {
187
- let mut recorder = ColumnRecorder {
188
- columns : used_columns,
189
- } ;
190
+ let mut recorder = used_columns;
190
191
for ex in expr {
191
192
recorder = ex. accept ( recorder) ?;
192
193
}
193
194
194
- let used_columns = recorder. columns ;
195
-
196
- let ( new_input, lifted_projection) =
197
- lift_up_expensive_projections ( & input, used_columns) ?;
195
+ let ( new_input, lifted_projection) = lift_up_expensive_projections ( & input, recorder) ?;
198
196
pal_debug ! (
199
197
"Sort sees result:\n {:?};;;{:?};;;" ,
200
198
new_input,
@@ -213,9 +211,7 @@ fn lift_up_expensive_projections(
213
211
input,
214
212
schema,
215
213
} => {
216
- let mut column_recorder = ColumnRecorder {
217
- columns : HashSet :: new ( ) ,
218
- } ;
214
+ let mut column_recorder = ColumnRecorder :: default ( ) ;
219
215
220
216
let mut this_projection_exprs = Vec :: < usize > :: new ( ) ;
221
217
@@ -241,7 +237,7 @@ fn lift_up_expensive_projections(
241
237
already_retained_cols. push ( ( col. clone ( ) , Some ( alias. clone ( ) ) ) ) ;
242
238
}
243
239
244
- if used_columns. contains ( & field. qualified_column ( ) ) {
240
+ if used_columns. columns . contains ( & field. qualified_column ( ) ) {
245
241
pal_debug ! (
246
242
"Expr {}: used_columns contains field {:?}" ,
247
243
i,
@@ -510,6 +506,44 @@ mod tests {
510
506
Ok ( ( ) )
511
507
}
512
508
509
+ /// Tests that multiple columns are retained in a deterministic order (and as a nice-to-have,
510
+ /// they should be in the left-to-right order of appearance).
511
+ #[ test]
512
+ fn limit_sorted_plan_with_expensive_expr_retaining_multiple_columns ( ) -> Result < ( ) > {
513
+ let table_scan = test_table_scan_abcd ( ) ?;
514
+
515
+ let case_expr = when ( col ( "d" ) . eq ( lit ( 3 ) ) , col ( "c" ) + lit ( 2 ) ) . otherwise ( lit ( 5 ) ) ?;
516
+
517
+ let plan = LogicalPlanBuilder :: from ( table_scan)
518
+ . project ( [
519
+ col ( "a" ) . alias ( "a1" ) ,
520
+ col ( "b" ) . alias ( "b1" ) ,
521
+ case_expr. alias ( "c1" ) ,
522
+ ] ) ?
523
+ . sort ( [ col ( "a1" ) . sort ( true , true ) ] ) ?
524
+ . limit ( 50 ) ?
525
+ . build ( ) ?;
526
+
527
+ let expected = "Limit: 50\
528
+ \n Sort: #a1 ASC NULLS FIRST\
529
+ \n Projection: #test.a AS a1, #test.b AS b1, CASE WHEN #test.d Eq Int32(3) THEN #test.c Plus Int32(2) ELSE Int32(5) END AS c1\
530
+ \n TableScan: test projection=None";
531
+
532
+ let formatted = format ! ( "{:?}" , plan) ;
533
+ assert_eq ! ( formatted, expected) ;
534
+
535
+ // We are testing that test.d deterministically comes before test.c in the inner Projection.
536
+ let optimized_expected = "Projection: #a1, #b1, CASE WHEN #test.d Eq Int32(3) THEN #test.c Plus Int32(2) ELSE Int32(5) END AS c1\
537
+ \n Limit: 50\
538
+ \n Sort: #a1 ASC NULLS FIRST\
539
+ \n Projection: #test.a AS a1, #test.b AS b1, #test.d, #test.c\
540
+ \n TableScan: test projection=None";
541
+
542
+ assert_optimized_plan_eq ( & plan, optimized_expected) ;
543
+
544
+ Ok ( ( ) )
545
+ }
546
+
513
547
/// Tests that we re-alias fields in the lifted up projection.
514
548
#[ test]
515
549
fn limit_sorted_plan_with_nonaliased_expensive_expr_optimized ( ) -> Result < ( ) > {
@@ -659,4 +693,15 @@ mod tests {
659
693
pub fn test_table_scan ( ) -> Result < LogicalPlan > {
660
694
test_table_scan_with_name ( "test" )
661
695
}
696
+
697
+ pub fn test_table_scan_abcd ( ) -> Result < LogicalPlan > {
698
+ let name = "test" ;
699
+ let schema = Schema :: new ( vec ! [
700
+ Field :: new( "a" , DataType :: UInt32 , false ) ,
701
+ Field :: new( "b" , DataType :: UInt32 , false ) ,
702
+ Field :: new( "c" , DataType :: UInt32 , false ) ,
703
+ Field :: new( "d" , DataType :: UInt32 , false ) ,
704
+ ] ) ;
705
+ LogicalPlanBuilder :: scan_empty ( Some ( name) , & schema, None ) ?. build ( )
706
+ }
662
707
}
0 commit comments