-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: Allow cancelling of grouping operations which are CPU bound #16196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…io MPSC (RecordBatchReceiverStream)
The PR is limited to solve aggregate with no group streaming, we can extend to more cases if it's not affecting performance? |
Thanks @zhuqi-lucas -- I'll try running the cancellation benchmark from @carols10cents |
🤖 |
1 similar comment
🤖 |
🤖: Benchmark completed Details
|
Thank you @alamb for review and benchmark. I am wandering if it will hit datafusion itself running performance, because we add (if logic) in the aggregate and other core exec. If we only want to support datafusion-cli canceling logic, maybe we can add the wapper logic to datafusion-cli. But from other related issue, it seems some cusomers use grpc to drop stream not only limited to datafusion-cli. May be the perfect solution is:
|
I polish the code only affect the no grouping aggregate, maybe we can compare the clickbench, so we can be confident to merge if it not affect aggregate performance. |
Updated the performance for current PR: SET datafusion.execution.target_partitions = 1;
SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value) |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.315 seconds. The main branch: SET datafusion.execution.target_partitions = 1;
SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value) |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.567 seconds. No performance regression from the above testing. |
@@ -77,6 +77,11 @@ impl AggregateStream { | |||
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); | |||
let input = agg.input.execute(partition, Arc::clone(&context))?; | |||
|
|||
// Only wrap no‐grouping aggregates in our YieldStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my own testing with partition_count = 1
group by aggregates suffer from the same problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve for review, i will try to reproduce it for group by aggregates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, i can reproduce it now:
SELECT
(value % 10) AS group_key,
COUNT(*) AS cnt,
SUM(value) AS sum_val
FROM range(1, 5000000000) AS t
GROUP BY (value % 10)
ORDER BY group_key;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve , i also added the grouping support in latest PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From testing result, it seems the grouping by cases have some performance regression.
Another solution is using CoalescePartitionsExec to wrapper: diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs
index 114f83068..ffb24463e 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -154,10 +154,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
0 => internal_err!(
"CoalescePartitionsExec requires at least one input partition"
),
- 1 => {
- // bypass any threading / metrics if there is a single partition
- self.input.execute(0, context)
- }
+ // 1 => {
+ // // bypass any threading / metrics if there is a single partition
+ // self.input.execute(0, context)
+ // }
_ => {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
// record the (very) minimal work done so that
diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs
index b81b3c8be..8bb8b2145 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -963,8 +963,7 @@ pub fn execute_stream(
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
- 1 => plan.execute(0, context),
- 2.. => {
+ 1.. => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
// CoalescePartitionsExec must produce a single partition
diff --git a/parquet-testing b/parquet-testing
index 6e851ddd7..107b36603 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
+Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0 |
Hi @alamb , i believe we also can do the clickbench benchmark for this PR. But i am not confident about the result since it seems we will always add some overhead to aggregate. Thanks! |
🤖 |
🤖: Benchmark completed Details
|
This comment was marked as outdated.
This comment was marked as outdated.
Running the benchmarks again to gather more details |
This comment was marked as outdated.
This comment was marked as outdated.
Thank you @alamb , it's surprising that performance has no regression, even faster for clickbench_partitioned, it may due to we yield for each partition running, and those make the partition running more efficient. |
🤖: Benchmark completed Details
|
Great, we will incorporate your feedback, answer your questions and write a list of follow-on work to serve as a basis for the tickets. I will only merge after these are done so we have a clear path to make progress on next week and beyond. |
Thank you @alamb for review, i will try to address the comments and answer the question. And the performance benchmark result looks like no regression, thanks! Updated: Addressed all comments in latest RP. Regarding the question: Instead it seems to add yielding for DataSource exec, which already will yield when reading Parquet from a remote store, for example? I think currently, we don't do handle this perfectly, but we add to DataSource/etc built-in operator, because we have many corner cases which don't have aggregate, so we do this way, and until now, it will fix all corner cases inside the infinite_cancel.rs testing cases. I agree, we can reduce the yield if we already have for some cases, we need to investigate those cases, and currently we just want to fix all the corner cases, and also no regression from performance result as the first step. Thanks! |
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas and @ozankabak -- I think this looks good to me
/// mitigate this by inserting explicit yielding (in as few places as | ||
/// possible to avoid performance degradation). This value represents the | ||
/// yielding period (in batches) at such explicit yielding points. The | ||
/// default value is 64. If set to 0, no DataFusion will not perform |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that this has a "escape valve" too -- if this mechanism isn't working we can disable the new yields via config
Thanks @alamb for the final review. @zhuqi-lucas, I think it would be great to create some tickets to track:
Also feel free to open any other issues if I'm missing something here |
So I just re-ran the reproducer from @pepijnve in #16196 (comment) against main and this PR doesn't cancel it:
I did check that aggregating values using range does work, so that is good:
|
So maybe we need a follow on PR to fix the cancel test from @pepijnve |
Is that the interleave test? Sorry the link was not clear |
Sorry -- it was this reproducer https://github.com/pepijnve/datafusion_cancel_test I pointed it at a local checkout of datafusion after this PR was merged |
His repo seems to be creating a plan manually and applying some old version of the rule (which is in that repo, not in DF proper). What are we trying to do here? Am I missing something? |
BTW if you wanted to check whether this PR covers interleave-related cases (which is the test case analyzed in that repo), this PR has two tests for it ( |
@alamb sorry if it came across as bike shedding; I felt there were legitimate concerns with the design that were being ignored. I wrote up all my notes in #16301.
It's an approximation of what you need that intrinsically has incomplete information to work with. That makes it so that it can't avoid potentially yielding unnecessarily every x batches. Other PR has more information so I won't repeat here. |
@zhuqi-lucas, talking about TODO items, in addition to the 4 things I noted in my comment above, I suggest the following: As we study interleave-related cases in more detail, I think we should add a test case with an interleave in a plan that doesn't have |
@ozankabak indeed, that was what my original test was simulating. The coalesce batches and repartition end up erasing the scenario I was trying to demonstrate. I fully agree that the test setup is contrived, but the point of the test was not to demonstrate that there was a problem with interleave. It was intended to demonstrate brittleness in the approach because certain legitimate combinations of execution plans still end up not being cancelable in practice. Hacky patch wrt current main that gets you back to the failing situation https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a |
Great, thanks for the patch. We should use it as one of the new test cases in the follow-on PRs. Look, I see that you are trying to help and we do want to take it. I suspect we might be facing a "culture" challenge here: Typically, DF community attacks large problems by solving them bit by bit and refining a solution iteratively. This is unlike some other projects which front-load the effort by going through a more comprehensive design process. We also do that for some tasks where this iterative approach is not applicable, but it is not very common. This "bit by bit approach" doesn't always succeed, every now and then it happens that we get stuck or go down the wrong path for a while, and then change tacks. However, we still typically prefer to "advance the front" and make progress in tangible ways as much as we can (if we see a way). This necessarily results in imperfect solutions being the "state of the code" in some cases, and they survive in the codebase for a while, but we are good at driving things to completion in the long run. Reflecting all this onto the task at hand, this PR (1) solves many cases already and (2) introduces some machinery that will be useful as we iterate on the full solution. I don't think it is brittle, I think it is imperfect and requires refinement. I am optimistic that we will eventually converge on a good approach that requires minimal operator cooperation (but we won't be able to reduce that to a strict zero) and is close to being optimal in terms of yielding overhead. Where we are at currently is not where we will ultimately be, this is just a step in a long process. I hope that helps. Thanks. |
We're not going to see eye to eye on this one and that's ok; people don't always have to agree. |
Thank you @alamb, @pepijnve can you add a testing case that this PR will not succeed? I remember i was adding all cases to test cases, if i was missing any testing cases, feel free to add a reproducer, i will continue fix it, thanks! And i think the description here is also outdated, because this PR not using EmissionType::Final to make choice now.
But i agree the spill cases, we need to investigate as a follow-up, thanks! |
Sure, thank you @ozankabak , let me create a follow-up ticket to track this! |
FYI Updated: 6 sub-tasks including the #16196 (comment) |
Sorry, just saw this testing case, thank you @pepijnve ! Also added it to the Epic sub-tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a cooperative yielding mechanism to DataFusion’s physical execution plans, allowing long-running operators (like aggregations) to periodically yield control and respond promptly to cancellations.
- Introduces
YieldStreamExec
andYieldStream
to wrap streams and inject yields every N batches. - Extends the
ExecutionPlan
trait withwith_cooperative_yields
, updates many executors to support cooperative mode viawrap_yield_stream
. - Adds the
InsertYieldExec
optimizer rule, a newyield_period
configuration, and protobuf support for serializing/deserializing yield operators.
Reviewed Changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
datafusion/common/src/config.rs | Add yield_period setting with documentation |
datafusion/physical-plan/src/yield_stream.rs | New YieldStream and YieldStreamExec operator |
datafusion/physical-plan/src/execution_plan.rs | Extend ExecutionPlan trait with with_cooperative_yields |
datafusion/physical-plan/src/work_table.rs | Wrap WorkTableExec streams with wrap_yield_stream |
datafusion/physical-optimizer/src/insert_yield_exec.rs | New optimizer rule to insert yield nodes |
datafusion/proto/src/physical_plan/mod.rs | Protobuf support for YieldStreamExec nodes |
datafusion/common/src/config.rs | Define default and behavior for yield_period |
Comments suppressed due to low confidence (5)
datafusion/common/src/config.rs:733
- The comment 'If set to 0, no DataFusion will not perform any explicit yielding.' contains a double negative; consider rephrasing to 'If set to 0, DataFusion will not perform any explicit yielding.'
/// default = 64
datafusion/sqllogictest/test_files/information_schema.slt:417
- The word 'promply' is misspelled; it should be 'promptly'.
datafusion.optimizer.yield_period 64 When DataFusion detects that a plan might not be promply cancellable ...
datafusion/physical-plan/src/yield_stream.rs:107
- [nitpick] The field is named
frequency
but the accessor isyield_period
; consider renaming one of them (e.g., useperiod
consistently) for clarity.
frequency: usize,
datafusion/physical-plan/src/work_table.rs:228
- [nitpick] Add a unit test for
WorkTableExec
to verify that withcooperative
enabled the stream is wrapped in aYieldStream
and yields as expected when cancelled.
Ok(wrap_yield_stream(stream, &context, self.cooperative))
datafusion/datasource/src/source.rs:387
- [nitpick] The method
with_cooperative
could be renamed towith_cooperative_yields
to match theExecutionPlan
trait methodwith_cooperative_yields
for consistency.
pub fn with_cooperative(mut self, cooperative: bool) -> Self {
BTW I liked this description so much I ported it to a proposed addition in the docs: |
Which issue does this PR close?
Rationale for this change
Some AggregateExecs can always make progress and thus they may never notice that the plan was canceled.
🧵 Yield-based Cooperative Scheduling in
AggregateExec
This PR introduces a lightweight yielding mechanism to
AggregateExec
to improve responsiveness to external signals (likeCtrl-C
) without adding any measurable performance overhead.🧭 Motivation
For aggregation queries without any
GROUP BY
Similarly for queries like
Where the computational work for each input is substantial
Are these changes tested?
Yes
Before this PR:
It will always stuck until done, we can't ctril c to stop it.
Are there any user-facing changes?
Some CPU heavy aggregation plans now cancel much sooner