Skip to content

feat: Allow cancelling of grouping operations which are CPU bound #16196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 80 commits into from
Jun 9, 2025

Conversation

zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented May 27, 2025

Which issue does this PR close?

Rationale for this change

Some AggregateExecs can always make progress and thus they may never notice that the plan was canceled.

🧵 Yield-based Cooperative Scheduling in AggregateExec

This PR introduces a lightweight yielding mechanism to AggregateExec to improve responsiveness to external signals (like Ctrl-C) without adding any measurable performance overhead.

🧭 Motivation

For aggregation queries without any GROUP BY

SELECT SUM(value) FROM range(1, 50000000000);

Similarly for queries like

SELECT DISTINCT a, b, c, d, ... 

Where the computational work for each input is substantial

Are these changes tested?

Yes

Before this PR:

SET datafusion.execution.target_partitions = 1;
SELECT SUM(value) FROM range(1, 50000000000);

It will always stuck until done, we can't ctril c to stop it.

Are there any user-facing changes?

Some CPU heavy aggregation plans now cancel much sooner

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label May 27, 2025
@zhuqi-lucas
Copy link
Contributor Author

The PR is limited to solve aggregate with no group streaming, we can extend to more cases if it's not affecting performance?

@zhuqi-lucas zhuqi-lucas changed the title feat: support inability to yeild cpu for loop when it's not using Tok… feat: support inability to yeild for loop when it's not using Tok… May 27, 2025
@zhuqi-lucas zhuqi-lucas marked this pull request as draft May 27, 2025 15:10
@alamb
Copy link
Contributor

alamb commented May 27, 2025

Thanks @zhuqi-lucas -- I'll try running the cancellation benchmark from @carols10cents

@alamb
Copy link
Contributor

alamb commented May 27, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing issue_16193 (b18aeaa) to 2d12bf6 diff
Benchmarks: cancellation
Results will be posted here when complete

1 similar comment
@alamb
Copy link
Contributor

alamb commented May 27, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing issue_16193 (b18aeaa) to 2d12bf6 diff
Benchmarks: cancellation
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented May 27, 2025

🤖: Benchmark completed

Details

Comparing HEAD and issue_16193
--------------------
Benchmark cancellation.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃    HEAD ┃ issue_16193 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QCancellati… │ 31.29ms │     33.69ms │ 1.08x slower │
└──────────────┴─────────┴─────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
┃ Benchmark Summary          ┃         ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
│ Total Time (HEAD)          │ 31.29ms │
│ Total Time (issue_16193)   │ 33.69ms │
│ Average Time (HEAD)        │ 31.29ms │
│ Average Time (issue_16193) │ 33.69ms │
│ Queries Faster             │       0 │
│ Queries Slower             │       1 │
│ Queries with No Change     │       0 │
└────────────────────────────┴─────────┘

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented May 28, 2025

Thank you @alamb for review and benchmark.

I am wandering if it will hit datafusion itself running performance, because we add (if logic) in the aggregate and other core exec.

If we only want to support datafusion-cli canceling logic, maybe we can add the wapper logic to datafusion-cli.

But from other related issue, it seems some cusomers use grpc to drop stream not only limited to datafusion-cli.

#14036 (comment)

May be the perfect solution is:

  1. We have a public drop stream interface which can be called by customers (grpc, etc) to cancel. I am still can't find a solution for this besides change the core exec logic itself...
  2. We also has a wrapper for datafusion-cli, so we call Ctril c, we can cancel the execution.

@zhuqi-lucas
Copy link
Contributor Author

I polish the code only affect the no grouping aggregate, maybe we can compare the clickbench, so we can be confident to merge if it not affect aggregate performance.

@zhuqi-lucas zhuqi-lucas marked this pull request as ready for review May 28, 2025 10:31
@zhuqi-lucas zhuqi-lucas changed the title feat: support inability to yeild for loop when it's not using Tok… feat: support inability to yield for loop when it's not using Tok… May 28, 2025
@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented May 28, 2025

Updated the performance for current PR:

SET datafusion.execution.target_partitions = 1;

SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value)         |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.315 seconds.

The main branch:

SET datafusion.execution.target_partitions = 1;


SELECT SUM(value)
FROM range(1,50000000000) AS t;
+----------------------+
| sum(t.value)         |
+----------------------+
| -4378597037249509888 |
+----------------------+
1 row(s) fetched.
Elapsed 22.567 seconds.

No performance regression from the above testing.

@@ -77,6 +77,11 @@ impl AggregateStream {
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
let input = agg.input.execute(partition, Arc::clone(&context))?;

// Only wrap no‐grouping aggregates in our YieldStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my own testing with partition_count = 1 group by aggregates suffer from the same problem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pepijnve for review, i will try to reproduce it for group by aggregates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, i can reproduce it now:

SELECT
  (value % 10)         AS group_key,
  COUNT(*)             AS cnt,
  SUM(value)           AS sum_val
FROM range(1, 5000000000) AS t
GROUP BY (value % 10)
ORDER BY group_key;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pepijnve , i also added the grouping support in latest PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From testing result, it seems the grouping by cases have some performance regression.

@zhuqi-lucas
Copy link
Contributor Author

Another solution is using CoalescePartitionsExec to wrapper:

diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs
index 114f83068..ffb24463e 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -154,10 +154,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
             0 => internal_err!(
                 "CoalescePartitionsExec requires at least one input partition"
             ),
-            1 => {
-                // bypass any threading / metrics if there is a single partition
-                self.input.execute(0, context)
-            }
+            // 1 => {
+            //     // bypass any threading / metrics if there is a single partition
+            //     self.input.execute(0, context)
+            // }
             _ => {
                 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
                 // record the (very) minimal work done so that
diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs
index b81b3c8be..8bb8b2145 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -963,8 +963,7 @@ pub fn execute_stream(
 ) -> Result<SendableRecordBatchStream> {
     match plan.output_partitioning().partition_count() {
         0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
-        1 => plan.execute(0, context),
-        2.. => {
+        1.. => {
             // merge into a single partition
             let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
             // CoalescePartitionsExec must produce a single partition
diff --git a/parquet-testing b/parquet-testing
index 6e851ddd7..107b36603 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
+Subproject commit 107b36603e051aee26bd93e04b871034f6c756c0

@zhuqi-lucas
Copy link
Contributor Author

Hi @alamb , i believe we also can do the clickbench benchmark for this PR. But i am not confident about the result since it seems we will always add some overhead to aggregate. Thanks!

@alamb
Copy link
Contributor

alamb commented May 28, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing issue_16193 (6cf3bf0) to 2d12bf6 diff
Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented May 28, 2025

🤖: Benchmark completed

Details

Comparing HEAD and issue_16193
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ issue_16193 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 0     │  1884.53ms │   1932.55ms │    no change │
│ QQuery 1     │   692.42ms │    704.34ms │    no change │
│ QQuery 2     │  1422.09ms │   1424.04ms │    no change │
│ QQuery 3     │   727.80ms │    722.42ms │    no change │
│ QQuery 4     │  1434.99ms │   1447.24ms │    no change │
│ QQuery 5     │ 15295.77ms │  15299.58ms │    no change │
│ QQuery 6     │  1997.15ms │   2013.51ms │    no change │
│ QQuery 7     │  2049.62ms │   2168.78ms │ 1.06x slower │
│ QQuery 8     │   836.82ms │    848.06ms │    no change │
└──────────────┴────────────┴─────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 26341.19ms │
│ Total Time (issue_16193)   │ 26560.52ms │
│ Average Time (HEAD)        │  2926.80ms │
│ Average Time (issue_16193) │  2951.17ms │
│ Queries Faster             │          0 │
│ Queries Slower             │          1 │
│ Queries with No Change     │          8 │
└────────────────────────────┴────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ issue_16193 ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 0     │    15.54ms │     15.42ms │    no change │
│ QQuery 1     │    33.15ms │     33.70ms │    no change │
│ QQuery 2     │    80.58ms │     80.48ms │    no change │
│ QQuery 3     │    96.06ms │     95.48ms │    no change │
│ QQuery 4     │   588.87ms │    595.63ms │    no change │
│ QQuery 5     │   818.19ms │    815.70ms │    no change │
│ QQuery 6     │    23.64ms │     23.47ms │    no change │
│ QQuery 7     │    37.27ms │     36.33ms │    no change │
│ QQuery 8     │   926.36ms │    910.32ms │    no change │
│ QQuery 9     │  1187.66ms │   1209.81ms │    no change │
│ QQuery 10    │   267.19ms │    257.24ms │    no change │
│ QQuery 11    │   293.30ms │    292.35ms │    no change │
│ QQuery 12    │   911.85ms │    911.14ms │    no change │
│ QQuery 13    │  1247.36ms │   1317.95ms │ 1.06x slower │
│ QQuery 14    │   848.42ms │    844.38ms │    no change │
│ QQuery 15    │   834.03ms │    835.94ms │    no change │
│ QQuery 16    │  1736.24ms │   1752.77ms │    no change │
│ QQuery 17    │  1613.09ms │   1606.14ms │    no change │
│ QQuery 18    │  3084.41ms │   3051.47ms │    no change │
│ QQuery 19    │    83.51ms │     84.57ms │    no change │
│ QQuery 20    │  1125.18ms │   1120.50ms │    no change │
│ QQuery 21    │  1285.82ms │   1308.43ms │    no change │
│ QQuery 22    │  2139.79ms │   2162.32ms │    no change │
│ QQuery 23    │  8003.46ms │   7992.05ms │    no change │
│ QQuery 24    │   463.30ms │    450.76ms │    no change │
│ QQuery 25    │   384.45ms │    381.51ms │    no change │
│ QQuery 26    │   521.56ms │    522.91ms │    no change │
│ QQuery 27    │  1586.93ms │   1590.23ms │    no change │
│ QQuery 28    │ 12567.57ms │  12455.67ms │    no change │
│ QQuery 29    │   526.70ms │    534.84ms │    no change │
│ QQuery 30    │   807.81ms │    815.44ms │    no change │
│ QQuery 31    │   858.67ms │    848.84ms │    no change │
│ QQuery 32    │  2639.73ms │   2641.91ms │    no change │
│ QQuery 33    │  3342.77ms │   3316.54ms │    no change │
│ QQuery 34    │  3312.00ms │   3325.35ms │    no change │
│ QQuery 35    │  1312.86ms │   1309.17ms │    no change │
│ QQuery 36    │   117.39ms │    116.89ms │    no change │
│ QQuery 37    │    55.88ms │     56.08ms │    no change │
│ QQuery 38    │   121.82ms │    119.37ms │    no change │
│ QQuery 39    │   192.09ms │    195.42ms │    no change │
│ QQuery 40    │    44.75ms │     49.63ms │ 1.11x slower │
│ QQuery 41    │    42.75ms │     44.34ms │    no change │
│ QQuery 42    │    38.39ms │     38.13ms │    no change │
└──────────────┴────────────┴─────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 56218.38ms │
│ Total Time (issue_16193)   │ 56166.59ms │
│ Average Time (HEAD)        │  1307.40ms │
│ Average Time (issue_16193) │  1306.20ms │
│ Queries Faster             │          0 │
│ Queries Slower             │          2 │
│ Queries with No Change     │         41 │
└────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     HEAD ┃ issue_16193 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 122.53ms │    122.67ms │     no change │
│ QQuery 2     │  21.63ms │     21.72ms │     no change │
│ QQuery 3     │  35.49ms │     33.31ms │ +1.07x faster │
│ QQuery 4     │  19.72ms │     20.20ms │     no change │
│ QQuery 5     │  51.62ms │     52.88ms │     no change │
│ QQuery 6     │  11.84ms │     11.99ms │     no change │
│ QQuery 7     │  97.55ms │     94.51ms │     no change │
│ QQuery 8     │  25.70ms │     25.67ms │     no change │
│ QQuery 9     │  59.26ms │     59.82ms │     no change │
│ QQuery 10    │  56.45ms │     56.10ms │     no change │
│ QQuery 11    │  11.50ms │     11.77ms │     no change │
│ QQuery 12    │  41.66ms │     39.51ms │ +1.05x faster │
│ QQuery 13    │  27.43ms │     26.76ms │     no change │
│ QQuery 14    │   9.52ms │      9.56ms │     no change │
│ QQuery 15    │  23.59ms │     22.84ms │     no change │
│ QQuery 16    │  21.71ms │     21.73ms │     no change │
│ QQuery 17    │  94.44ms │     95.31ms │     no change │
│ QQuery 18    │ 210.48ms │    215.52ms │     no change │
│ QQuery 19    │  26.60ms │     26.03ms │     no change │
│ QQuery 20    │  35.72ms │     36.88ms │     no change │
│ QQuery 21    │ 160.00ms │    160.99ms │     no change │
│ QQuery 22    │  16.65ms │     17.19ms │     no change │
└──────────────┴──────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary          ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 1181.11ms │
│ Total Time (issue_16193)   │ 1182.95ms │
│ Average Time (HEAD)        │   53.69ms │
│ Average Time (issue_16193) │   53.77ms │
│ Queries Faster             │         2 │
│ Queries Slower             │         0 │
│ Queries with No Change     │        20 │
└────────────────────────────┴───────────┘

@alamb

This comment was marked as outdated.

@alamb
Copy link
Contributor

alamb commented May 28, 2025

Running the benchmarks again to gather more details

@alamb

This comment was marked as outdated.

@zhuqi-lucas
Copy link
Contributor Author

🤖: Benchmark completed

Details

Comparing HEAD and issue_16193
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ issue_16193 ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 0     │  1906.70ms │   1850.03ms │ no change │
│ QQuery 1     │   691.18ms │    691.76ms │ no change │
│ QQuery 2     │  1411.18ms │   1436.19ms │ no change │
│ QQuery 3     │   696.17ms │    700.91ms │ no change │
│ QQuery 4     │  1438.68ms │   1458.59ms │ no change │
│ QQuery 5     │ 15100.87ms │  14874.14ms │ no change │
│ QQuery 6     │  1996.44ms │   1982.89ms │ no change │
│ QQuery 7     │  2089.84ms │   2063.71ms │ no change │
│ QQuery 8     │   830.88ms │    835.14ms │ no change │
└──────────────┴────────────┴─────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 26161.93ms │
│ Total Time (issue_16193)   │ 25893.35ms │
│ Average Time (HEAD)        │  2906.88ms │
│ Average Time (issue_16193) │  2877.04ms │
│ Queries Faster             │          0 │
│ Queries Slower             │          0 │
│ Queries with No Change     │          9 │
└────────────────────────────┴────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       HEAD ┃ issue_16193 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    15.18ms │     15.80ms │     no change │
│ QQuery 1     │    33.16ms │     32.97ms │     no change │
│ QQuery 2     │    77.60ms │     80.92ms │     no change │
│ QQuery 3     │    97.69ms │     97.56ms │     no change │
│ QQuery 4     │   589.28ms │    594.05ms │     no change │
│ QQuery 5     │   847.85ms │    836.44ms │     no change │
│ QQuery 6     │    23.38ms │     22.84ms │     no change │
│ QQuery 7     │    39.02ms │     36.26ms │ +1.08x faster │
│ QQuery 8     │   923.80ms │    927.93ms │     no change │
│ QQuery 9     │  1194.31ms │   1194.76ms │     no change │
│ QQuery 10    │   261.49ms │    264.83ms │     no change │
│ QQuery 11    │   298.34ms │    300.14ms │     no change │
│ QQuery 12    │   889.86ms │    893.87ms │     no change │
│ QQuery 13    │  1343.97ms │   1319.24ms │     no change │
│ QQuery 14    │   850.37ms │    847.33ms │     no change │
│ QQuery 15    │   829.14ms │    827.15ms │     no change │
│ QQuery 16    │  1756.84ms │   1712.96ms │     no change │
│ QQuery 17    │  1607.84ms │   1612.75ms │     no change │
│ QQuery 18    │  3257.72ms │   3059.25ms │ +1.06x faster │
│ QQuery 19    │    84.43ms │     81.15ms │     no change │
│ QQuery 20    │  1143.94ms │   1098.25ms │     no change │
│ QQuery 21    │  1299.30ms │   1302.19ms │     no change │
│ QQuery 22    │  2139.01ms │   2154.26ms │     no change │
│ QQuery 23    │  7959.94ms │   7886.41ms │     no change │
│ QQuery 24    │   463.30ms │    458.37ms │     no change │
│ QQuery 25    │   386.27ms │    380.85ms │     no change │
│ QQuery 26    │   524.88ms │    519.71ms │     no change │
│ QQuery 27    │  1581.12ms │   1583.97ms │     no change │
│ QQuery 28    │ 12730.61ms │  12509.52ms │     no change │
│ QQuery 29    │   536.15ms │    528.59ms │     no change │
│ QQuery 30    │   793.18ms │    795.03ms │     no change │
│ QQuery 31    │   857.17ms │    852.83ms │     no change │
│ QQuery 32    │  2653.95ms │   2631.77ms │     no change │
│ QQuery 33    │  3315.12ms │   3322.64ms │     no change │
│ QQuery 34    │  3381.42ms │   3355.04ms │     no change │
│ QQuery 35    │  1317.57ms │   1278.45ms │     no change │
│ QQuery 36    │   129.98ms │    122.68ms │ +1.06x faster │
│ QQuery 37    │    56.20ms │     53.71ms │     no change │
│ QQuery 38    │   119.55ms │    121.04ms │     no change │
│ QQuery 39    │   193.87ms │    194.76ms │     no change │
│ QQuery 40    │    48.93ms │     49.63ms │     no change │
│ QQuery 41    │    44.38ms │     45.77ms │     no change │
│ QQuery 42    │    36.47ms │     37.46ms │     no change │
└──────────────┴────────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 56733.55ms │
│ Total Time (issue_16193)   │ 56041.14ms │
│ Average Time (HEAD)        │  1319.38ms │
│ Average Time (issue_16193) │  1303.28ms │
│ Queries Faster             │          3 │
│ Queries Slower             │          0 │
│ Queries with No Change     │         40 │
└────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃     HEAD ┃ issue_16193 ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QQuery 1     │ 120.69ms │    123.19ms │ no change │
│ QQuery 2     │  22.13ms │     21.87ms │ no change │
│ QQuery 3     │  34.29ms │     34.30ms │ no change │
│ QQuery 4     │  19.41ms │     19.60ms │ no change │
│ QQuery 5     │  52.72ms │     51.09ms │ no change │
│ QQuery 6     │  11.87ms │     12.05ms │ no change │
│ QQuery 7     │  96.12ms │     93.63ms │ no change │
│ QQuery 8     │  25.70ms │     25.62ms │ no change │
│ QQuery 9     │  58.58ms │     58.90ms │ no change │
│ QQuery 10    │  55.95ms │     56.01ms │ no change │
│ QQuery 11    │  11.46ms │     11.44ms │ no change │
│ QQuery 12    │  41.40ms │     40.05ms │ no change │
│ QQuery 13    │  27.95ms │     27.54ms │ no change │
│ QQuery 14    │   9.57ms │      9.64ms │ no change │
│ QQuery 15    │  23.71ms │     24.29ms │ no change │
│ QQuery 16    │  21.66ms │     22.40ms │ no change │
│ QQuery 17    │  96.13ms │     95.60ms │ no change │
│ QQuery 18    │ 209.28ms │    219.00ms │ no change │
│ QQuery 19    │  26.43ms │     25.81ms │ no change │
│ QQuery 20    │  37.60ms │     36.09ms │ no change │
│ QQuery 21    │ 161.77ms │    163.00ms │ no change │
│ QQuery 22    │  16.00ms │     16.74ms │ no change │
└──────────────┴──────────┴─────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary          ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 1180.44ms │
│ Total Time (issue_16193)   │ 1187.87ms │
│ Average Time (HEAD)        │   53.66ms │
│ Average Time (issue_16193) │   53.99ms │
│ Queries Faster             │         0 │
│ Queries Slower             │         0 │
│ Queries with No Change     │        22 │
└────────────────────────────┴───────────┘

Thank you @alamb , it's surprising that performance has no regression, even faster for clickbench_partitioned, it may due to we yield for each partition running, and those make the partition running more efficient.

@xudong963 xudong963 changed the title feat: support inability to yield for loop when it's not using Tok… feat: support inability to yield for loop when it's not using Tokio MPSC (RecordBatchReceiverStream) May 29, 2025
@alamb
Copy link
Contributor

alamb commented Jun 9, 2025

🤖: Benchmark completed

Details

Comparing HEAD and issue_16193
--------------------
Benchmark cancellation.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query        ┃     HEAD ┃ issue_16193 ┃    Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ QCancellati… │ 27.20 ms │    27.81 ms │ no change │
└──────────────┴──────────┴─────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
┃ Benchmark Summary          ┃         ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
│ Total Time (HEAD)          │ 27.20ms │
│ Total Time (issue_16193)   │ 27.81ms │
│ Average Time (HEAD)        │ 27.20ms │
│ Average Time (issue_16193) │ 27.81ms │
│ Queries Faster             │       0 │
│ Queries Slower             │       0 │
│ Queries with No Change     │       1 │
│ Queries with Failure       │       0 │
└────────────────────────────┴─────────┘

@ozankabak
Copy link
Contributor

Great, we will incorporate your feedback, answer your questions and write a list of follow-on work to serve as a basis for the tickets. I will only merge after these are done so we have a clear path to make progress on next week and beyond.

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Jun 9, 2025

Thank you @zhuqi-lucas and @ozankabak and @pepijnve -- I think this PR is really nicely commented and structured. It is easy to read and review.

However, I am sorry but I am a bit confused by the implications of this PR now.

From what I can tell, it doesn't insert YieldExec or add Yielding for AggregateExec, which is the operator we have real evidence doesn't yield. Instead it seems to add yielding for DataSource exec, which already will yield when reading Parquet from a remote store, for example 🤔

Thank you @alamb for review, i will try to address the comments and answer the question. And the performance benchmark result looks like no regression, thanks!

Updated:

Addressed all comments in latest RP.

Regarding the question: Instead it seems to add yielding for DataSource exec, which already will yield when reading Parquet from a remote store, for example?

I think currently, we don't do handle this perfectly, but we add to DataSource/etc built-in operator, because we have many corner cases which don't have aggregate, so we do this way, and until now, it will fix all corner cases inside the infinite_cancel.rs testing cases.

I agree, we can reduce the yield if we already have for some cases, we need to investigate those cases, and currently we just want to fix all the corner cases, and also no regression from performance result as the first step. Thanks!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @zhuqi-lucas and @ozankabak -- I think this looks good to me

/// mitigate this by inserting explicit yielding (in as few places as
/// possible to avoid performance degradation). This value represents the
/// yielding period (in batches) at such explicit yielding points. The
/// default value is 64. If set to 0, no DataFusion will not perform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this has a "escape valve" too -- if this mechanism isn't working we can disable the new yields via config

@ozankabak ozankabak merged commit 78e4202 into apache:main Jun 9, 2025
29 checks passed
@ozankabak
Copy link
Contributor

ozankabak commented Jun 9, 2025

Thanks @alamb for the final review. @zhuqi-lucas, I think it would be great to create some tickets to track:

  1. Adding a few tests (maybe SLT?) that show YieldStreamExec being inserted
  2. Improving the documentation to make it clear that any leaf (source) that already yields just has to implement
    fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
        Some(self)
    }
    to signal the planner that no YieldStream combinator is necessary.
  3. Improving InsertYieldExec rule by means of an API that exposes input and output pipelining behaviors of operators effectively
  4. Investigating whether any already-existing manual yielding (for example, like the one in RepartitionExec) can now be removed

Also feel free to open any other issues if I'm missing something here

@alamb
Copy link
Contributor

alamb commented Jun 9, 2025

So I just re-ran the reproducer from @pepijnve in #16196 (comment) against main and this PR doesn't cancel it:

     Running `target/debug/datafusion_test`
Running query; will time out after 5 seconds
InfiniteStream::poll_next 10000 times
InfiniteStream::poll_next 20000 times
InfiniteStream::poll_next 30000 times
InfiniteStream::poll_next 40000 times
InfiniteStream::poll_next 50000 times
InfiniteStream::poll_next 60000 times
InfiniteStream::poll_next 70000 times
InfiniteStream::poll_next 80000 times
InfiniteStream::poll_next 90000 times
InfiniteStream::poll_next 100000 times
InfiniteStream::poll_next 110000 times
InfiniteStream::poll_next 120000 times
InfiniteStream::poll_next 130000 times
InfiniteStream::poll_next 140000 times
InfiniteStream::poll_next 150000 times
InfiniteStream::poll_next 160000 times
InfiniteStream::poll_next 170000 times
InfiniteStream::poll_next 180000 times
InfiniteStream::poll_next 190000 times
InfiniteStream::poll_next 200000 times
InfiniteStream::poll_next 210000 times
InfiniteStream::poll_next 220000 times
InfiniteStream::poll_next 230000 times
InfiniteStream::poll_next 240000 times
InfiniteStream::poll_next 250000 times
InfiniteStream::poll_next 260000 times
InfiniteStream::poll_next 270000 times
InfiniteStream::poll_next 280000 times
InfiniteStream::poll_next 290000 times
InfiniteStream::poll_next 300000 times
InfiniteStream::poll_next 310000 times
InfiniteStream::poll_next 320000 times
InfiniteStream::poll_next 330000 times
...

I did check that aggregating values using range does work, so that is good:

DataFusion CLI v48.0.0
> SELECT SUM(value) FROM range(1, 50000000000);
^C^C

@alamb
Copy link
Contributor

alamb commented Jun 9, 2025

So maybe we need a follow on PR to fix the cancel test from @pepijnve

@ozankabak
Copy link
Contributor

Is that the interleave test? Sorry the link was not clear

@alamb
Copy link
Contributor

alamb commented Jun 9, 2025

Is that the interleave test? Sorry the link was not clear

Sorry -- it was this reproducer https://github.com/pepijnve/datafusion_cancel_test

I pointed it at a local checkout of datafusion after this PR was merged

@ozankabak
Copy link
Contributor

His repo seems to be creating a plan manually and applying some old version of the rule (which is in that repo, not in DF proper). What are we trying to do here? Am I missing something?

@ozankabak
Copy link
Contributor

BTW if you wanted to check whether this PR covers interleave-related cases (which is the test case analyzed in that repo), this PR has two tests for it (test_infinite_interleave_cancel and test_infinite_interleave_agg_cancel). We can probably add even more in the future to further our understanding of the problem as we keep working on this, it is a good test case

@pepijnve
Copy link
Contributor

pepijnve commented Jun 9, 2025

However, I do like a bias of action, and if this PR fixes a real problem, I don't think we should bikeshed it indefinitely

@alamb sorry if it came across as bike shedding; I felt there were legitimate concerns with the design that were being ignored. I wrote up all my notes in #16301.

I was thinking of YieldStream as such a combinator 🤔

It's an approximation of what you need that intrinsically has incomplete information to work with. That makes it so that it can't avoid potentially yielding unnecessarily every x batches. Other PR has more information so I won't repeat here.

@ozankabak
Copy link
Contributor

@zhuqi-lucas, talking about TODO items, in addition to the 4 things I noted in my comment above, I suggest the following: As we study interleave-related cases in more detail, I think we should add a test case with an interleave in a plan that doesn't have RepartitionExec. This can happen when data comes in already partitioned. I think in such cases we can still trigger non-cancellability with InterleaveExec's current code/logic.

@pepijnve
Copy link
Contributor

pepijnve commented Jun 9, 2025

@ozankabak indeed, that was what my original test was simulating. The coalesce batches and repartition end up erasing the scenario I was trying to demonstrate. I fully agree that the test setup is contrived, but the point of the test was not to demonstrate that there was a problem with interleave. It was intended to demonstrate brittleness in the approach because certain legitimate combinations of execution plans still end up not being cancelable in practice.

Hacky patch wrt current main that gets you back to the failing situation https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a

@ozankabak
Copy link
Contributor

ozankabak commented Jun 9, 2025

Great, thanks for the patch. We should use it as one of the new test cases in the follow-on PRs.

Look, I see that you are trying to help and we do want to take it. I suspect we might be facing a "culture" challenge here: Typically, DF community attacks large problems by solving them bit by bit and refining a solution iteratively. This is unlike some other projects which front-load the effort by going through a more comprehensive design process. We also do that for some tasks where this iterative approach is not applicable, but it is not very common.

This "bit by bit approach" doesn't always succeed, every now and then it happens that we get stuck or go down the wrong path for a while, and then change tacks. However, we still typically prefer to "advance the front" and make progress in tangible ways as much as we can (if we see a way). This necessarily results in imperfect solutions being the "state of the code" in some cases, and they survive in the codebase for a while, but we are good at driving things to completion in the long run.

Reflecting all this onto the task at hand, this PR (1) solves many cases already and (2) introduces some machinery that will be useful as we iterate on the full solution. I don't think it is brittle, I think it is imperfect and requires refinement. I am optimistic that we will eventually converge on a good approach that requires minimal operator cooperation (but we won't be able to reduce that to a strict zero) and is close to being optimal in terms of yielding overhead. Where we are at currently is not where we will ultimately be, this is just a step in a long process. I hope that helps. Thanks.

@pepijnve
Copy link
Contributor

pepijnve commented Jun 9, 2025

Reflecting all this onto the task at hand, this PR (1) solves many cases already and (2) introduces some machinery that will be useful as we iterate on the full solution. I don't think it is brittle, I think it is imperfect and requires refinement.

We're not going to see eye to eye on this one and that's ok; people don't always have to agree.

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Jun 10, 2025

So maybe we need a follow on PR to fix the cancel test from @pepijnve

Thank you @alamb, @pepijnve can you add a testing case that this PR will not succeed? I remember i was adding all cases to test cases, if i was missing any testing cases, feel free to add a reproducer, i will continue fix it, thanks!

And i think the description here is also outdated, because this PR not using EmissionType::Final to make choice now.

#16301 (comment)

Inject yield points externally
PR #16196 takes the approach of injecting yield operators externally using a physical optimizer rule. The rule uses the EmissionType of operators to determine where yield points are required. When it determines an operator requires yield points it ensures all leaf nodes under the operator in question periodically return a Pending result. This is done either by asking the leaf node to do so or by wrapping the leaf node in an operator that creates a wrapper Stream.

This is a nice solution because it does not necessarily require cooperation from operator implementations, but there are downsides as well.

Describing precisely when yield points are required in a declarative fashion is not trivial. EmissionType::Final is a good first heuristic, but for some operators like filter and join build phase this is not sufficient. Because of this difficulty PR #16196 now always injects yield points even when unnecessary.

Determining where yield points are required based on the static ExecutionPlan tree alone is not always sufficient. As an example, a grouped aggregation that spills large amounts of data to disk will emit data from a stream that is created after the original input has been drained. This takes the original yielding wrapper stream out of the picture. If the new stream is then consumed by another 'final' operator, the query may become uncancellable again depending on the newly created stream. This could be solved by ensuring operators wrap streams they produce themselves as well, but that again requires operator cooperation again which somewhat negates the benefits of this approach.
In practice this does work out ok today because SortPreservingMergeStream has a tendency to return Pending a lot for some reason. I still need to study why that's the case.

When evaluating this design from an assignment of responsibilities perspective it is not ideal. It is not the responsibility of the producer of data, that happens to always be ready, to return Pending. Looking at the producer Stream implementation in isolation, it's not obvious why it would do this since it's already returning control and it is not waiting for data itself. The only reason it would do this is because it assumes the consumer is not well behaved and needs help to force a yield.

From a performance perspective, injecting yield points at the leaves is theoretically the worst case scenario when looking at the depth of the call stack that needs to be unwound. By definition the call stack is the deepest at the leave streams so yielding from there incurs the highest possible cost.

But i agree the spill cases, we need to investigate as a follow-up, thanks!

@zhuqi-lucas
Copy link
Contributor Author

@zhuqi-lucas, talking about TODO items, in addition to the 4 things I noted in my comment above, I suggest the following: As we study interleave-related cases in more detail, I think we should add a test case with an interleave in a plan that doesn't have RepartitionExec. This can happen when data comes in already partitioned. I think in such cases we can still trigger non-cancellability with InterleaveExec's current code/logic.

Sure, thank you @ozankabak , let me create a follow-up ticket to track this!

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Jun 10, 2025

FYI
Thank you @ozankabak @alamb @pepijnve , i created a EPIC ticket now, currently i added 5 sub-tasks, feel free to add more tasks, we can iterator all possible cases and make it perfect. Thanks!

#16353

Updated: 6 sub-tasks including the #16196 (comment)

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Jun 10, 2025

@ozankabak indeed, that was what my original test was simulating. The coalesce batches and repartition end up erasing the scenario I was trying to demonstrate. I fully agree that the test setup is contrived, but the point of the test was not to demonstrate that there was a problem with interleave. It was intended to demonstrate brittleness in the approach because certain legitimate combinations of execution plans still end up not being cancelable in practice.

Hacky patch wrt current main that gets you back to the failing situation https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a

Sorry, just saw this testing case, thank you @pepijnve !

Also added it to the Epic sub-tasks.

@2010YOUY01 2010YOUY01 requested a review from Copilot June 10, 2025 08:12
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a cooperative yielding mechanism to DataFusion’s physical execution plans, allowing long-running operators (like aggregations) to periodically yield control and respond promptly to cancellations.

  • Introduces YieldStreamExec and YieldStream to wrap streams and inject yields every N batches.
  • Extends the ExecutionPlan trait with with_cooperative_yields, updates many executors to support cooperative mode via wrap_yield_stream.
  • Adds the InsertYieldExec optimizer rule, a new yield_period configuration, and protobuf support for serializing/deserializing yield operators.

Reviewed Changes

Copilot reviewed 24 out of 24 changed files in this pull request and generated no comments.

Show a summary per file
File Description
datafusion/common/src/config.rs Add yield_period setting with documentation
datafusion/physical-plan/src/yield_stream.rs New YieldStream and YieldStreamExec operator
datafusion/physical-plan/src/execution_plan.rs Extend ExecutionPlan trait with with_cooperative_yields
datafusion/physical-plan/src/work_table.rs Wrap WorkTableExec streams with wrap_yield_stream
datafusion/physical-optimizer/src/insert_yield_exec.rs New optimizer rule to insert yield nodes
datafusion/proto/src/physical_plan/mod.rs Protobuf support for YieldStreamExec nodes
datafusion/common/src/config.rs Define default and behavior for yield_period
Comments suppressed due to low confidence (5)

datafusion/common/src/config.rs:733

  • The comment 'If set to 0, no DataFusion will not perform any explicit yielding.' contains a double negative; consider rephrasing to 'If set to 0, DataFusion will not perform any explicit yielding.'
/// default = 64

datafusion/sqllogictest/test_files/information_schema.slt:417

  • The word 'promply' is misspelled; it should be 'promptly'.
datafusion.optimizer.yield_period 64 When DataFusion detects that a plan might not be promply cancellable ...

datafusion/physical-plan/src/yield_stream.rs:107

  • [nitpick] The field is named frequency but the accessor is yield_period; consider renaming one of them (e.g., use period consistently) for clarity.
    frequency: usize,

datafusion/physical-plan/src/work_table.rs:228

  • [nitpick] Add a unit test for WorkTableExec to verify that with cooperative enabled the stream is wrapped in a YieldStream and yields as expected when cancelled.
        Ok(wrap_yield_stream(stream, &context, self.cooperative))

datafusion/datasource/src/source.rs:387

  • [nitpick] The method with_cooperative could be renamed to with_cooperative_yields to match the ExecutionPlan trait method with_cooperative_yields for consistency.
    pub fn with_cooperative(mut self, cooperative: bool) -> Self {

@alamb
Copy link
Contributor

alamb commented Jun 13, 2025

Look, I see that you are trying to help and we do want to take it. I suspect we might be facing a "culture" challenge here: Typically, DF community attacks large problems by solving them bit by bit and refining a solution iteratively. This is unlike some other projects which front-load the effort by going through a more comprehensive design process. We also do that for some tasks where this iterative approach is not applicable, but it is not very common.

BTW I liked this description so much I ported it to a proposed addition in the docs:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-plan Changes to the physical-plan crate proto Related to proto crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
5 participants