Skip to content

feat: Allow cancelling of grouping operations which are CPU bound #16196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 80 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
b18aeaa
feat: support inability to yeild cpu for loop when it's not using Tok…
zhuqi-lucas May 27, 2025
2a965aa
Fix fuzz test
zhuqi-lucas May 28, 2025
67ca44b
polish code
zhuqi-lucas May 28, 2025
82a179d
add comments
zhuqi-lucas May 28, 2025
da3c2d5
fix corner case when huge data
zhuqi-lucas May 28, 2025
6cf3bf0
Also add grouping case
zhuqi-lucas May 28, 2025
3251990
Address comments
zhuqi-lucas May 30, 2025
a8da370
Merge remote-tracking branch 'upstream/main' into issue_16193
zhuqi-lucas May 30, 2025
311849d
fmt
zhuqi-lucas May 30, 2025
2a2ead9
Move YieldStream into physical-plan crate
alamb May 30, 2025
3b69287
Merge remote-tracking branch 'apache/main' into issue_16193
alamb May 30, 2025
3ff9252
Use existing RecordBatchStreamAdapter
alamb May 30, 2025
5547c3c
Add timeout testing for cancellation
zhuqi-lucas May 31, 2025
aeac0ef
fmt
zhuqi-lucas May 31, 2025
6d56b78
add license
zhuqi-lucas May 31, 2025
4ddd1e5
Support sort exec for cancellation
zhuqi-lucas May 31, 2025
b2ffec7
poc: unified yield exec for leaf node
zhuqi-lucas Jun 2, 2025
4587c3b
polish code phase 1
zhuqi-lucas Jun 2, 2025
098b1ec
Add license
zhuqi-lucas Jun 2, 2025
bc65c1a
Fix testing
zhuqi-lucas Jun 2, 2025
da58d0b
Support final path
zhuqi-lucas Jun 2, 2025
021fb92
fix test
zhuqi-lucas Jun 2, 2025
5027087
polish code
zhuqi-lucas Jun 2, 2025
8509d0a
fix testing and address suggestions
zhuqi-lucas Jun 2, 2025
97c1bb7
fix
zhuqi-lucas Jun 2, 2025
5c3a14c
remove buffer
zhuqi-lucas Jun 3, 2025
97923b8
address comments
zhuqi-lucas Jun 3, 2025
118f801
fmt
zhuqi-lucas Jun 3, 2025
e7a678a
Fix test
zhuqi-lucas Jun 3, 2025
54260aa
fix
zhuqi-lucas Jun 3, 2025
cb344af
fix
zhuqi-lucas Jun 3, 2025
89a4e93
fix slt
zhuqi-lucas Jun 3, 2025
896fd59
fix tpch sql
zhuqi-lucas Jun 3, 2025
2887f87
Add flag for yield insert and disable default
zhuqi-lucas Jun 4, 2025
2de4afb
recover testing
zhuqi-lucas Jun 4, 2025
485d55a
fix
zhuqi-lucas Jun 4, 2025
e31a997
Update doc
zhuqi-lucas Jun 4, 2025
b65ab60
Address comments
zhuqi-lucas Jun 4, 2025
1bf2ad3
fix fmt
zhuqi-lucas Jun 4, 2025
8ab64b3
Support config for yield frequency
zhuqi-lucas Jun 4, 2025
c7774a7
add built-in yield support
zhuqi-lucas Jun 5, 2025
7f64f5c
Merge remote-tracking branch 'upstream/main' into issue_16193
zhuqi-lucas Jun 5, 2025
3eb785c
Add LazyMemoryExec built-in Yield
zhuqi-lucas Jun 5, 2025
5737309
Update datafusion/datasource/src/source.rs
zhuqi-lucas Jun 5, 2025
74eb5c2
Update datafusion/core/tests/physical_optimizer/enforce_distribution.rs
zhuqi-lucas Jun 5, 2025
23d7a4b
Update datafusion/physical-optimizer/src/optimizer.rs
zhuqi-lucas Jun 5, 2025
e7c7eea
Update datafusion/physical-plan/src/memory.rs
zhuqi-lucas Jun 5, 2025
7df28e5
Update datafusion/physical-plan/src/memory.rs
zhuqi-lucas Jun 5, 2025
d16e6e5
Update datafusion/proto/src/physical_plan/mod.rs
zhuqi-lucas Jun 5, 2025
f6ccecb
Address comments
zhuqi-lucas Jun 5, 2025
8c74620
Add interleave reproducer
zhuqi-lucas Jun 5, 2025
5e2dde8
remove unused config
zhuqi-lucas Jun 5, 2025
aa73a2d
Add join with aggr case
zhuqi-lucas Jun 6, 2025
8d9f091
Merge remote-tracking branch 'upstream/main' into issue_16193
zhuqi-lucas Jun 6, 2025
b825ea6
fix clippy
zhuqi-lucas Jun 6, 2025
be88233
add reproducer for filter and add workaround in rule
zhuqi-lucas Jun 6, 2025
4018f8f
Add reproducer and comment it
zhuqi-lucas Jun 6, 2025
3b70846
adjust test
zhuqi-lucas Jun 6, 2025
11035e5
Harden tests, add failing tests for TDD, minor code refactors
ozankabak Jun 6, 2025
b05cb85
Add remaining test
zhuqi-lucas Jun 6, 2025
e136a03
Add sort merge case
zhuqi-lucas Jun 6, 2025
29bc2c7
change rule solution
zhuqi-lucas Jun 6, 2025
187a945
fix test
zhuqi-lucas Jun 6, 2025
3cc4095
add more built-in case
zhuqi-lucas Jun 6, 2025
4614048
fix
zhuqi-lucas Jun 6, 2025
0b8427c
Merge remote-tracking branch 'upstream/main' into issue_16193
zhuqi-lucas Jun 6, 2025
8047534
fix user defined exec
zhuqi-lucas Jun 6, 2025
7f4c93e
Reduce test diff
ozankabak Jun 6, 2025
ca7f061
Format imports
ozankabak Jun 6, 2025
5806bdb
Update documentation
ozankabak Jun 6, 2025
b45d3dc
Update information schema per docs
ozankabak Jun 6, 2025
a21c6ce
Only retain the with_cooperative_yields API for now
ozankabak Jun 6, 2025
c820e36
Format imports
ozankabak Jun 6, 2025
697bfae
Remove unnecessary clones
ozankabak Jun 6, 2025
884386e
Merge branch 'apache_main' into issue_16193
ozankabak Jun 7, 2025
d3952d9
Fix logical conflict
ozankabak Jun 7, 2025
56361a4
Exercise DRY, refactor common code to a helper function. Use period i…
ozankabak Jun 7, 2025
50959ad
Update datafusion/physical-plan/src/yield_stream.rs
zhuqi-lucas Jun 9, 2025
edc018d
Address new comments
zhuqi-lucas Jun 9, 2025
ee9bb89
Address more comments.
zhuqi-lucas Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ impl AggregateStream {
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
let input = agg.input.execute(partition, Arc::clone(&context))?;

// Only wrap no‐grouping aggregates in our YieldStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my own testing with partition_count = 1 group by aggregates suffer from the same problem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pepijnve for review, i will try to reproduce it for group by aggregates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, i can reproduce it now:

SELECT
  (value % 10)         AS group_key,
  COUNT(*)             AS cnt,
  SUM(value)           AS sum_val
FROM range(1, 5000000000) AS t
GROUP BY (value % 10)
ORDER BY group_key;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pepijnve , i also added the grouping support in latest PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From testing result, it seems the grouping by cases have some performance regression.

// (grouped aggregates tend to produce small streams
// and can rely on Tokio's own task‐yielding)
let input = Box::pin(YieldStream::new(input)) as SendableRecordBatchStream;

let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?;
let filter_expressions = match agg.mode {
AggregateMode::Partial
Expand Down Expand Up @@ -170,6 +175,65 @@ impl AggregateStream {
}
}

/// A stream that yields batches of data, yielding control back to the executor
pub struct YieldStream {
inner: SendableRecordBatchStream,
batches_processed: usize,
buffer: Option<Result<RecordBatch>>,
}

impl YieldStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a neat structure -- think it is more generally useful. Perhaps we could put it into somewhere that it is more likely discoverable, such as https://github.com/apache/datafusion/tree/main/datafusion/common-runtime/src

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb , good suggestion, addressed it in latest PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added some unit test for it.

pub fn new(inner: SendableRecordBatchStream) -> Self {
Self {
inner,
batches_processed: 0,
buffer: None,
}
}
}

// Stream<Item = Result<RecordBatch>> to poll_next_unpin
impl Stream for YieldStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
const YIELD_BATCHES: usize = 64;
let this = &mut *self;

if let Some(batch) = this.buffer.take() {
return Poll::Ready(Some(batch));
}

match this.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
this.batches_processed += 1;
if this.batches_processed >= YIELD_BATCHES {
this.batches_processed = 0;
// We need to buffer the batch when we return Poll::Pending,
// so that we can return it on the next poll.
// Otherwise, the next poll will miss the batch and return None.
this.buffer = Some(Ok(batch));
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(Some(Ok(batch)))
}
}
other => other,
}
}
}

// RecordBatchStream schema()
impl RecordBatchStream for YieldStream {
fn schema(&self) -> Arc<arrow_schema::Schema> {
self.inner.schema()
}
}

impl Stream for AggregateStream {
type Item = Result<RecordBatch>;

Expand Down
2 changes: 1 addition & 1 deletion parquet-testing
Submodule parquet-testing updated 67 files
+6 −0 README.md
+2 −1 data/README.md
+ data/datapage_v2_empty_datapage.snappy.parquet
+67 −0 data/geospatial/README.md
+ data/geospatial/crs-arbitrary-value.parquet
+ data/geospatial/crs-default.parquet
+163 −0 data/geospatial/crs-gen.py
+ data/geospatial/crs-geography.parquet
+ data/geospatial/crs-projjson.parquet
+ data/geospatial/crs-srid.parquet
+213 −0 data/geospatial/geospatial-gen.py
+ data/geospatial/geospatial-with-nan.parquet
+ data/geospatial/geospatial.parquet
+332 −0 data/geospatial/geospatial.yaml
+52 −0 variant/README.md
+ variant/array_empty.metadata
+ variant/array_empty.value
+ variant/array_nested.metadata
+ variant/array_nested.value
+ variant/array_primitive.metadata
+ variant/array_primitive.value
+73 −0 variant/data_dictionary.json
+ variant/long_string.metadata
+ variant/long_string.value
+ variant/object_empty.metadata
+ variant/object_empty.value
+ variant/object_nested.metadata
+ variant/object_nested.value
+ variant/object_primitive.metadata
+ variant/object_primitive.value
+ variant/primitive_binary.metadata
+ variant/primitive_binary.value
+ variant/primitive_boolean_false.metadata
+1 −0 variant/primitive_boolean_false.value
+ variant/primitive_boolean_true.metadata
+1 −0 variant/primitive_boolean_true.value
+ variant/primitive_date.metadata
+ variant/primitive_date.value
+ variant/primitive_decimal16.metadata
+ variant/primitive_decimal16.value
+ variant/primitive_decimal4.metadata
+ variant/primitive_decimal4.value
+ variant/primitive_decimal8.metadata
+ variant/primitive_decimal8.value
+ variant/primitive_double.metadata
+1 −0 variant/primitive_double.value
+ variant/primitive_float.metadata
+1 −0 variant/primitive_float.value
+ variant/primitive_int16.metadata
+1 −0 variant/primitive_int16.value
+ variant/primitive_int32.metadata
+ variant/primitive_int32.value
+ variant/primitive_int64.metadata
+1 −0 variant/primitive_int64.value
+ variant/primitive_int8.metadata
+1 −0 variant/primitive_int8.value
+0 −0 variant/primitive_null.metadata
+0 −0 variant/primitive_null.value
+ variant/primitive_string.metadata
+ variant/primitive_string.value
+ variant/primitive_timestamp.metadata
+ variant/primitive_timestamp.value
+ variant/primitive_timestampntz.metadata
+ variant/primitive_timestampntz.value
+173 −0 variant/regen.py
+ variant/short_string.metadata
+1 −0 variant/short_string.value