@@ -95,8 +95,7 @@ pub fn materialize_topk(p: LogicalPlan) -> Result<LogicalPlan, DataFusionError>
95
95
let out_name = out_schema. field ( out_i) . name ( ) ;
96
96
97
97
//let mut e = Expr::Column(f.qualified_column());
98
- let mut e =
99
- p. post_projection [ p. input_columns [ out_i] ] . clone ( ) ;
98
+ let mut e = p. post_projection [ out_i] . clone ( ) ;
100
99
if out_name != in_field. name ( ) {
101
100
e = Expr :: Alias ( Box :: new ( e) , out_name. clone ( ) )
102
101
}
@@ -420,3 +419,75 @@ fn make_sort_expr(
420
419
_ => col,
421
420
}
422
421
}
422
+
423
+ #[ cfg( test) ]
424
+ mod tests {
425
+ use datafusion:: {
426
+ arrow:: datatypes:: Field ,
427
+ logical_plan:: { col, sum, LogicalPlanBuilder } ,
428
+ } ;
429
+
430
+ use super :: * ;
431
+
432
+ #[ test]
433
+ fn topk_projection_column_switched ( ) {
434
+ // A regression test for materialize_topk switching around projection expressions when their
435
+ // order does not match the aggregate node's aggregation expression order. (Also, when
436
+ // materialize_topk had this bug, the Projection node's DFSchema was left unchanged, making
437
+ // it inconsistent with the expressions.)
438
+
439
+ let table_schema = Schema :: new ( vec ! [
440
+ Field :: new( "group_field" , DataType :: Int64 , true ) ,
441
+ Field :: new( "agg_sortby" , DataType :: Int64 , true ) ,
442
+ Field :: new( "agg_1" , DataType :: Int64 , true ) ,
443
+ Field :: new( "agg_2" , DataType :: Int64 , true ) ,
444
+ ] ) ;
445
+
446
+ let scan_node = LogicalPlanBuilder :: scan_empty ( Some ( "table" ) , & table_schema, None )
447
+ . unwrap ( )
448
+ . build ( )
449
+ . unwrap ( ) ;
450
+
451
+ let cluster_send =
452
+ ClusterSendNode :: new ( Arc :: new ( scan_node) , vec ! [ vec![ ] ] , None ) . into_plan ( ) ;
453
+
454
+ let plan = LogicalPlanBuilder :: from ( cluster_send)
455
+ . aggregate (
456
+ vec ! [ col( "group_field" ) ] ,
457
+ vec ! [ sum( col( "agg_sortby" ) ) , sum( col( "agg_1" ) ) , sum( col( "agg_2" ) ) ] ,
458
+ )
459
+ . unwrap ( )
460
+ . project ( vec ! [
461
+ col( "group_field" ) ,
462
+ col( "SUM(table.agg_sortby)" ) ,
463
+ col( "SUM(table.agg_2)" ) ,
464
+ col( "SUM(table.agg_1)" ) ,
465
+ ] )
466
+ . expect ( "project to be valid" )
467
+ . sort ( vec ! [ col( "SUM(table.agg_sortby)" ) . sort( false , false ) ] )
468
+ . unwrap ( )
469
+ . limit ( 10 )
470
+ . unwrap ( )
471
+ . build ( )
472
+ . unwrap ( ) ;
473
+
474
+ let before_schema = plan. schema ( ) . clone ( ) ;
475
+
476
+ let plan = materialize_topk ( plan) . expect ( "materialize_topk to succeed" ) ;
477
+
478
+ let after_schema = plan. schema ( ) . clone ( ) ;
479
+
480
+ // Of course the schema shouldn't change.
481
+ assert_eq ! ( before_schema, after_schema) ;
482
+
483
+ // We are testing that topk materialization doesn't switch the field order (of
484
+ // SUM(table.agg_2) and SUM(table.agg_1)) in the projection above it.
485
+ let expected = "\
486
+ Projection: #table.group_field, #SUM(table.agg_sortby), #SUM(table.agg_2), #SUM(table.agg_1)\
487
+ \n ClusterAggregateTopK, limit = 10, groupBy = [#table.group_field], aggr = [SUM(#table.agg_sortby), SUM(#table.agg_1), SUM(#table.agg_2)], sortBy = [SortColumn { agg_index: 0, asc: false, nulls_first: false }]\
488
+ \n TableScan: table projection=None";
489
+ let formatted = format ! ( "{:?}" , plan) ;
490
+
491
+ assert_eq ! ( expected, formatted) ;
492
+ }
493
+ }
0 commit comments