-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: Allow cancelling of grouping operations which are CPU bound #16196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
b18aeaa
2a965aa
67ca44b
82a179d
da3c2d5
6cf3bf0
3251990
a8da370
311849d
2a2ead9
3b69287
3ff9252
5547c3c
aeac0ef
6d56b78
4ddd1e5
b2ffec7
4587c3b
098b1ec
bc65c1a
da58d0b
021fb92
5027087
8509d0a
97c1bb7
5c3a14c
97923b8
118f801
e7a678a
54260aa
cb344af
89a4e93
896fd59
2887f87
2de4afb
485d55a
e31a997
b65ab60
1bf2ad3
8ab64b3
c7774a7
7f64f5c
3eb785c
5737309
74eb5c2
23d7a4b
e7c7eea
7df28e5
d16e6e5
f6ccecb
8c74620
5e2dde8
aa73a2d
8d9f091
b825ea6
be88233
4018f8f
3b70846
11035e5
b05cb85
e136a03
29bc2c7
187a945
3cc4095
4614048
0b8427c
8047534
7f4c93e
ca7f061
5806bdb
b45d3dc
a21c6ce
c820e36
697bfae
884386e
d3952d9
56361a4
50959ad
edc018d
ee9bb89
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,11 @@ impl AggregateStream { | |
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); | ||
let input = agg.input.execute(partition, Arc::clone(&context))?; | ||
|
||
// Only wrap no‐grouping aggregates in our YieldStream | ||
// (grouped aggregates tend to produce small streams | ||
// and can rely on Tokio's own task‐yielding) | ||
let input = Box::pin(YieldStream::new(input)) as SendableRecordBatchStream; | ||
|
||
let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?; | ||
let filter_expressions = match agg.mode { | ||
AggregateMode::Partial | ||
|
@@ -170,6 +175,65 @@ impl AggregateStream { | |
} | ||
} | ||
|
||
/// A stream that yields batches of data, yielding control back to the executor | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub struct YieldStream { | ||
inner: SendableRecordBatchStream, | ||
batches_processed: usize, | ||
buffer: Option<Result<RecordBatch>>, | ||
} | ||
|
||
impl YieldStream { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a neat structure -- think it is more generally useful. Perhaps we could put it into somewhere that it is more likely discoverable, such as https://github.com/apache/datafusion/tree/main/datafusion/common-runtime/src There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you @alamb , good suggestion, addressed it in latest PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also added some unit test for it. |
||
pub fn new(inner: SendableRecordBatchStream) -> Self { | ||
Self { | ||
inner, | ||
batches_processed: 0, | ||
buffer: None, | ||
} | ||
} | ||
} | ||
|
||
// Stream<Item = Result<RecordBatch>> to poll_next_unpin | ||
impl Stream for YieldStream { | ||
type Item = Result<RecordBatch>; | ||
|
||
fn poll_next( | ||
mut self: std::pin::Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Option<Self::Item>> { | ||
const YIELD_BATCHES: usize = 64; | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let this = &mut *self; | ||
|
||
if let Some(batch) = this.buffer.take() { | ||
return Poll::Ready(Some(batch)); | ||
} | ||
|
||
match this.inner.poll_next_unpin(cx) { | ||
Poll::Ready(Some(Ok(batch))) => { | ||
this.batches_processed += 1; | ||
if this.batches_processed >= YIELD_BATCHES { | ||
this.batches_processed = 0; | ||
// We need to buffer the batch when we return Poll::Pending, | ||
// so that we can return it on the next poll. | ||
// Otherwise, the next poll will miss the batch and return None. | ||
this.buffer = Some(Ok(batch)); | ||
cx.waker().wake_by_ref(); | ||
Poll::Pending | ||
} else { | ||
Poll::Ready(Some(Ok(batch))) | ||
} | ||
} | ||
other => other, | ||
} | ||
} | ||
} | ||
|
||
// RecordBatchStream schema() | ||
impl RecordBatchStream for YieldStream { | ||
fn schema(&self) -> Arc<arrow_schema::Schema> { | ||
self.inner.schema() | ||
} | ||
} | ||
|
||
impl Stream for AggregateStream { | ||
type Item = Result<RecordBatch>; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my own testing with
partition_count = 1
group by aggregates suffer from the same problemThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve for review, i will try to reproduce it for group by aggregates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, i can reproduce it now:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve , i also added the grouping support in latest PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From testing result, it seems the grouping by cases have some performance regression.