File tree Expand file tree Collapse file tree 2 files changed +19
-1
lines changed
physical-plan/src/aggregates Expand file tree Collapse file tree 2 files changed +19
-1
lines changed Original file line number Diff line number Diff line change @@ -408,6 +408,19 @@ async fn oom_with_tracked_consumer_pool() {
408
408
. await
409
409
}
410
410
411
+ #[ tokio:: test]
412
+ async fn oom_grouped_hash_aggregate ( ) {
413
+ TestCase :: new ( )
414
+ . with_query ( "SELECT COUNT(*), SUM(request_bytes) FROM t GROUP BY host" )
415
+ . with_expected_errors ( vec ! [
416
+ "Failed to allocate additional" ,
417
+ "GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))" ,
418
+ ] )
419
+ . with_memory_limit ( 1_000 )
420
+ . run ( )
421
+ . await
422
+ }
423
+
411
424
/// For regression case: if spilled `StringViewArray`'s buffer will be referenced by
412
425
/// other batches which are also need to be spilled, then the spill writer will
413
426
/// repeatedly write out the same buffer, and after reading back, each batch's size
Original file line number Diff line number Diff line change @@ -529,7 +529,12 @@ impl GroupedHashAggregateStream {
529
529
} )
530
530
. collect ( ) ;
531
531
532
- let name = format ! ( "GroupedHashAggregateStream[{partition}]" ) ;
532
+ let agg_fn_names = aggregate_exprs
533
+ . iter ( )
534
+ . map ( |expr| expr. human_display ( ) )
535
+ . collect :: < Vec < _ > > ( )
536
+ . join ( ", " ) ;
537
+ let name = format ! ( "GroupedHashAggregateStream[{partition}] ({agg_fn_names})" ) ;
533
538
let reservation = MemoryConsumer :: new ( name)
534
539
. with_can_spill ( true )
535
540
. register ( context. memory_pool ( ) ) ;
You can’t perform that action at this time.
0 commit comments