Skip to content

bug: remove busy-wait while sort is ongoing #16322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 12, 2025
Merged

Conversation

pepijnve
Copy link
Contributor

@pepijnve pepijnve commented Jun 7, 2025

Which issue does this PR close?

Rationale for this change

SortPreservingMergeStream works in two phases. It first waits for each input stream to be ready to emit. Once everybody's ready it proceeds to an emit phase.

During the waiting phase, it will poll each stream in a round-robin fashion. If any stream returns Pending the code self-wakes the current task and immediately returns Pending. This results in busy-waiting when waiting for, for instance, a SortExec that's sorting its data or any other pipeline breaker.

While this works, it wastes CPU cycles.

What changes are included in this PR?

Rather than returning immediately when one stream is pending, poll each stream once. Only return pending when there are still streams left that have not started emitting. This assumes that the pending streams are well behaved and will wake the task when they need to be polled again as required by the Stream contract. Note that this may surface bugs in other streams.

Rotation of uninitiated_partitions has been removed since that's no longer useful. There was a comment in the code about 'upstream buffer size increase', but I'm not sure what that was referring to.

Are these changes tested?

Only by existing test and manual testing

Are there any user-facing changes?

No

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Jun 7, 2025
@pepijnve pepijnve force-pushed the issue_16321 branch 3 times, most recently from 6334f47 to f7405ee Compare June 7, 2025 21:08
@pepijnve
Copy link
Contributor Author

pepijnve commented Jun 7, 2025

A sort preserving merge specific test case started failing. I’ll dig deeper to better understand what’s going on.

@pepijnve
Copy link
Contributor Author

pepijnve commented Jun 8, 2025

@berkaysynnada I hope it's ok that I ping you directly; reaching out because I believe you are the author of the test case in question. I believe this PR surfaced a mistake in the CongestedStream implementation of the test case. That Stream was returning Pending without ensuring the waker was set up to be notified when the congestion condition cleared. I'm fairly confident that's not correct since the trait contract explicitly states

Poll::Pending means that this stream’s next value is not ready yet. Implementations will ensure that the current task will be notified when the next value may be ready.

I've fixed the test implementation which makes the test pass again, but would like to get your opinion on this. Maybe I'm missing something.

@pepijnve pepijnve force-pushed the issue_16321 branch 3 times, most recently from ad50fbe to 8503407 Compare June 8, 2025 09:18
@Dandandan
Copy link
Contributor

Dandandan commented Jun 8, 2025

This seems really nice 🚀
On my machine I get roughly 10% improvement on queries with SPM - which I think makes sense on a 10 core machine (with less cores it probably has a larger effect).

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ issue_16321 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  809.46 ms │   733.99 ms │ +1.10x faster │
│ QQuery 2     │  165.80 ms │   148.89 ms │ +1.11x faster │
│ QQuery 3     │  432.56 ms │   386.04 ms │ +1.12x faster │
│ QQuery 4     │  459.88 ms │   467.66 ms │     no change │
│ QQuery 5     │  671.13 ms │   621.34 ms │ +1.08x faster │
│ QQuery 6     │  181.36 ms │   184.77 ms │     no change │
│ QQuery 7     │  959.96 ms │   871.05 ms │ +1.10x faster │
│ QQuery 8     │  672.39 ms │   627.96 ms │ +1.07x faster │
│ QQuery 9     │ 1101.98 ms │  1023.84 ms │ +1.08x faster │
│ QQuery 10    │  638.41 ms │   552.55 ms │ +1.16x faster │
│ QQuery 11    │  126.22 ms │   123.91 ms │     no change │
│ QQuery 12    │  358.25 ms │   329.10 ms │ +1.09x faster │
│ QQuery 13    │  720.99 ms │   656.95 ms │ +1.10x faster │
│ QQuery 14    │  247.23 ms │   251.52 ms │     no change │
│ QQuery 15    │  395.58 ms │   369.79 ms │ +1.07x faster │
│ QQuery 16    │  110.37 ms │   103.96 ms │ +1.06x faster │
│ QQuery 17    │ 1193.78 ms │  1206.00 ms │     no change │
│ QQuery 18    │ 1846.58 ms │  1668.23 ms │ +1.11x faster │
│ QQuery 19    │  412.76 ms │   400.33 ms │     no change │
│ QQuery 20    │  421.08 ms │   392.58 ms │ +1.07x faster │
│ QQuery 21    │ 1363.39 ms │  1275.33 ms │ +1.07x faster │
│ QQuery 22    │  149.67 ms │   132.31 ms │ +1.13x faster │
└──────────────┴────────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)          │ 13438.84ms │
│ Total Time (issue_16321)   │ 12528.08ms │
│ Average Time (main)        │   610.86ms │
│ Average Time (issue_16321) │   569.46ms │
│ Queries Faster             │         16 │
│ Queries Slower             │          0 │
│ Queries with No Change     │          6 │
│ Queries with Failure       │          0 │
└────────────────────────────┴────────────┘

@pepijnve
Copy link
Contributor Author

pepijnve commented Jun 8, 2025

@Dandandan project newbie question, my daily practice at work is to handle code review comments using amend/force-push. Did so out of habit before thinking to as ask. Is that ok in this project or does the community prefer fixup commits and squash before merging?

@Dandandan
Copy link
Contributor

@Dandandan project newbie question, my daily practice at work is to handle code review comments using amend/force-push. Did so out of habit before thinking to as ask. Is that ok in this project or does the community prefer fixup commits and squash before merging?

IMO it doesn't matter too much (personally I like to create individual commits).

We'll squash-merge the commits.

@Dandandan
Copy link
Contributor

I profiled some queries to verify it's no longer busy on the sorting thread:

Main (TPC-H query 1):
image

PR (TPC-H query 1)
image

The thread above is the thread executing SPM, you can see it does close to nothing after this PR (after reading metadata).

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚀 maybe @alamb or @berkaysynnada can have another look

@alamb
Copy link
Contributor

alamb commented Jun 9, 2025

🤖 ./gh_compare_branch.sh Benchmark Script Running
Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing issue_16321 (c3d5ae9) to 1daa5ed diff
Benchmarks: tpch_mem clickbench_partitioned clickbench_extended
Results will be posted here when complete

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pepijnve and @Dandandan -- this makes sense to me too. I took a careful look at the code and I found it to be easy to follow and clear ❤️

I think we should wait a few days before mergning this to see if others would like to review. I think @jayzhan211 and @ozankabak may also be interested

// If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None),
// we remove this partition from the queue so it is not polled again.
self.uninitiated_partitions.pop_front();
// The polled stream is ready
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for these comments 👍

@alamb
Copy link
Contributor

alamb commented Jun 9, 2025

🤖: Benchmark completed

Details

Comparing HEAD and issue_16321
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃        HEAD ┃ issue_16321 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │  1900.64 ms │  1941.03 ms │     no change │
│ QQuery 1     │   698.77 ms │   685.55 ms │     no change │
│ QQuery 2     │  1423.32 ms │  1336.60 ms │ +1.06x faster │
│ QQuery 3     │   708.54 ms │   677.63 ms │     no change │
│ QQuery 4     │  1450.18 ms │  1371.41 ms │ +1.06x faster │
│ QQuery 5     │ 15851.43 ms │ 14804.05 ms │ +1.07x faster │
│ QQuery 6     │  2016.34 ms │  2046.14 ms │     no change │
│ QQuery 7     │  2124.47 ms │  1948.45 ms │ +1.09x faster │
│ QQuery 8     │   845.96 ms │   828.21 ms │     no change │
└──────────────┴─────────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 27019.65ms │
│ Total Time (issue_16321)   │ 25639.07ms │
│ Average Time (HEAD)        │  3002.18ms │
│ Average Time (issue_16321) │  2848.79ms │
│ Queries Faster             │          4 │
│ Queries Slower             │          0 │
│ Queries with No Change     │          5 │
│ Queries with Failure       │          0 │
└────────────────────────────┴────────────┘
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃        HEAD ┃ issue_16321 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    14.98 ms │    15.25 ms │     no change │
│ QQuery 1     │    31.43 ms │    32.60 ms │     no change │
│ QQuery 2     │    81.15 ms │    80.01 ms │     no change │
│ QQuery 3     │    97.82 ms │    99.49 ms │     no change │
│ QQuery 4     │   576.70 ms │   584.19 ms │     no change │
│ QQuery 5     │   833.02 ms │   841.56 ms │     no change │
│ QQuery 6     │    23.21 ms │    23.25 ms │     no change │
│ QQuery 7     │    38.30 ms │    34.79 ms │ +1.10x faster │
│ QQuery 8     │   913.51 ms │   898.29 ms │     no change │
│ QQuery 9     │  1197.30 ms │  1126.00 ms │ +1.06x faster │
│ QQuery 10    │   262.45 ms │   248.43 ms │ +1.06x faster │
│ QQuery 11    │   297.77 ms │   282.01 ms │ +1.06x faster │
│ QQuery 12    │   891.35 ms │   868.87 ms │     no change │
│ QQuery 13    │  1321.37 ms │  1247.47 ms │ +1.06x faster │
│ QQuery 14    │   826.48 ms │   809.29 ms │     no change │
│ QQuery 15    │   808.37 ms │   754.75 ms │ +1.07x faster │
│ QQuery 16    │  1722.67 ms │  1620.69 ms │ +1.06x faster │
│ QQuery 17    │  1594.99 ms │  1593.08 ms │     no change │
│ QQuery 18    │  3101.36 ms │  2892.25 ms │ +1.07x faster │
│ QQuery 19    │    83.25 ms │    84.66 ms │     no change │
│ QQuery 20    │  1121.24 ms │  1146.38 ms │     no change │
│ QQuery 21    │  1313.87 ms │  1298.49 ms │     no change │
│ QQuery 22    │  2157.02 ms │  2129.27 ms │     no change │
│ QQuery 23    │  7965.30 ms │  7780.56 ms │     no change │
│ QQuery 24    │   465.54 ms │   454.83 ms │     no change │
│ QQuery 25    │   394.55 ms │   386.33 ms │     no change │
│ QQuery 26    │   537.44 ms │   523.72 ms │     no change │
│ QQuery 27    │  1593.11 ms │  1547.66 ms │     no change │
│ QQuery 28    │ 13504.42 ms │ 11770.24 ms │ +1.15x faster │
│ QQuery 29    │   535.75 ms │   535.55 ms │     no change │
│ QQuery 30    │   791.38 ms │   784.48 ms │     no change │
│ QQuery 31    │   837.24 ms │   809.86 ms │     no change │
│ QQuery 32    │  2631.49 ms │  2501.55 ms │     no change │
│ QQuery 33    │  3317.30 ms │  3148.53 ms │ +1.05x faster │
│ QQuery 34    │  3329.56 ms │  3233.21 ms │     no change │
│ QQuery 35    │  1265.38 ms │  1238.90 ms │     no change │
│ QQuery 36    │   120.28 ms │   120.88 ms │     no change │
│ QQuery 37    │    57.17 ms │    56.21 ms │     no change │
│ QQuery 38    │   122.92 ms │   122.28 ms │     no change │
│ QQuery 39    │   198.52 ms │   198.42 ms │     no change │
│ QQuery 40    │    44.65 ms │    46.41 ms │     no change │
│ QQuery 41    │    45.72 ms │    45.15 ms │     no change │
│ QQuery 42    │    37.71 ms │    37.05 ms │     no change │
└──────────────┴─────────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary          ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 57105.04ms │
│ Total Time (issue_16321)   │ 54052.88ms │
│ Average Time (HEAD)        │  1328.02ms │
│ Average Time (issue_16321) │  1257.04ms │
│ Queries Faster             │         10 │
│ Queries Slower             │          0 │
│ Queries with No Change     │         33 │
│ Queries with Failure       │          0 │
└────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      HEAD ┃ issue_16321 ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 121.29 ms │   100.08 ms │ +1.21x faster │
│ QQuery 2     │  21.52 ms │    21.01 ms │     no change │
│ QQuery 3     │  33.52 ms │    32.30 ms │     no change │
│ QQuery 4     │  19.89 ms │    18.63 ms │ +1.07x faster │
│ QQuery 5     │  52.00 ms │    49.78 ms │     no change │
│ QQuery 6     │  11.93 ms │    12.03 ms │     no change │
│ QQuery 7     │  96.25 ms │    86.38 ms │ +1.11x faster │
│ QQuery 8     │  25.43 ms │    25.22 ms │     no change │
│ QQuery 9     │  60.11 ms │    54.46 ms │ +1.10x faster │
│ QQuery 10    │  48.60 ms │    43.98 ms │ +1.11x faster │
│ QQuery 11    │  11.51 ms │    11.60 ms │     no change │
│ QQuery 12    │  41.89 ms │    35.09 ms │ +1.19x faster │
│ QQuery 13    │  27.03 ms │    25.77 ms │     no change │
│ QQuery 14    │   9.75 ms │     9.82 ms │     no change │
│ QQuery 15    │  22.33 ms │    19.91 ms │ +1.12x faster │
│ QQuery 16    │  21.29 ms │    18.76 ms │ +1.13x faster │
│ QQuery 17    │  94.81 ms │    96.53 ms │     no change │
│ QQuery 18    │ 212.50 ms │   200.15 ms │ +1.06x faster │
│ QQuery 19    │  26.21 ms │    25.71 ms │     no change │
│ QQuery 20    │  34.31 ms │    31.81 ms │ +1.08x faster │
│ QQuery 21    │ 154.08 ms │   148.59 ms │     no change │
│ QQuery 22    │  16.22 ms │    14.97 ms │ +1.08x faster │
└──────────────┴───────────┴─────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary          ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)          │ 1162.48ms │
│ Total Time (issue_16321)   │ 1082.57ms │
│ Average Time (HEAD)        │   52.84ms │
│ Average Time (issue_16321) │   49.21ms │
│ Queries Faster             │        11 │
│ Queries Slower             │         0 │
│ Queries with No Change     │        11 │
│ Queries with Failure       │         0 │
└────────────────────────────┴───────────┘

@ozankabak
Copy link
Contributor

ozankabak commented Jun 9, 2025

Since this changes the congestion behavior test, which I'm not deeply familiar with, let's hear from @berkaysynnada on this to make sure we are not losing anything (he will be back tomorrow, so it shouldn't be long). If he is OK with the change, 10% performance improvement would be great.

@pepijnve
Copy link
Contributor Author

Waiting for benchmarks results here so I have some time to write up my assessment of what was happening and what has changed. This is just to assist any reviewers, not to replace review or justify the changes.

Initial test failure

The congestion test set up three streams to merge:

  1. returns Ready(None) and panics if polled again. This tries to check that None is respected as terminal result
  2. return Pending until stream 3 has been polled, then always returns Ready(None)
  3. clears congestion for 2 and always returns Ready(None)

After the initial change the congestion test started failing. The poll sequence I observed was

  1. stream1.poll_next -> None
  2. stream2.poll_next -> Pending
  3. stream3.poll_next -> Ready(None)

and then the test hung. This turned out to be caused by CongestedStream returning pending, but not holding on to the waker to wake it when the congestion clears. The initial phase of SPM was now assuming it would be woken up by any stream that returned pending and that wasn't happening.

CongestedStream fix

I don't believe it's valid for a Stream to return pending but not set up waking, so I fixed that and then got this poll sequence.

  1. stream1.poll_next -> None
  2. stream2.poll_next -> Pending
  3. stream3.poll_next -> Ready(None)
  4. stream2.poll_next -> Ready(None)

and the test passed.

Using swap_remove

Based on initial review feedback I restored the usage of swap_remove which results in the following sequence.

  1. stream1.poll_next -> None
  2. stream3.poll_next -> Ready(None)
  3. stream2.poll_next -> Ready(None)

Due to swap_remove changing the poll order, stream 3 was getting polled before stream 2. As a consequence we never hit the congestion situation and the test case passes, but no longer tests what it says it does.

Testing congestion again

I adapted CongestedStream to keep track of the set of streams that have been polled. Both stream 2 and 3 are now treated as congested unless all streams have been polled. This results in the following sequence.

  1. stream1.poll_next -> None
  2. stream3.poll_next -> Pending
  3. stream2.poll_next -> Ready(None)
  4. stream3.poll_next -> Ready(None)

The test case now hits the congestion point again. As a bonus the test is no longer dependent on the exact order in which the streams are polled which decouples it from the implementation details a bit more.

Copy link
Contributor

@zhuqi-lucas zhuqi-lucas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pepijnve , good finding LGTM, remaining question for me:

  1. Do we evaluate the overhead when we have high parallelism streams?

It may increased polling overhead
Previously, as soon as one stream returned Pending, the merge would short-circuit and return Pending, minimizing work per cycle. With the new approach, we now poll all N streams once before giving up, which can add extra CPU cost, especially when you have high parallelism (e.g., 16, 64, 128, 256, 512 streams).

  1. The many files spilling cases for sort merge? For example, 10000 files or larger.
    If we have many files to merge, and each file has large IO time to return pending, so we also need to polling each file then to return pending, i am not sure this case will add overhead because we need to polling all files to merge?

Thanks!

@pepijnve
Copy link
Contributor Author

pepijnve commented Jun 11, 2025

Do we evaluate the overhead when we have high parallelism streams?

I did not myself. Only theoretical reasoning and simulating scenarios in my head. I'm still working on finding a reliable benchmarking environment to actually measure things. I seem to always get too much jitter for it to be useful.

Previously, as soon as one stream returned Pending, the merge would short-circuit and return Pending, minimizing work per cycle. With the new approach, we now poll all N streams once before giving up, which can add extra CPU cost, especially when you have high parallelism

@zhuqi-lucas this is a really interesting topic and tricky tradeoff. The previous code would indeed do one poll per cycle and then do a self-wake yield. The new code polls each stream once and then yields without self-wake.
In terms of the amount of work done there is no difference, not any extra CPU cost. On the contrary, the self-wake yield is removal of pure waste. Unless I'm missing something, total elapsed time should be less.

But the tradeoff is that we're going to extend the elapsed time per cycle. As a consequence it may take a bit longer to yield to the caller, and then the cancellation problem rears its ugly head again.

If this actually matters or not is entirely dependent on the streams being polled.
SortPreservingMergeExec for instance lets things up to use RecordBatchReceiverStream instances as children. Polling those repeatedly until the first record batch arrives is quite wasteful because you're really just spinning in place. Checking each receiver in a loop is going to be super quick, and yielding in between every partition is not useful at all.
If the child streams are blocking though, then it's a different matter. You probably don't want to actually drive the progress of each stream synchronously in a loop.

So... it's a complicated balancing act. Would love to hear how others look at this problem.
PR #16319 ties into this. It's an experiment I'm doing to see if we can avoid exposing the potentially blocking portion of streams to the caller so that the problem described above kind of disappears. It's not yet clear if this can be achieved without a performance penalty.

The many files spilling cases for sort merge? For example, 10000 files or larger.

I just checked, and SortExec is also setting up RecordBatchReceiverStream. The worst case scenario in terms of elapsed time in the poll_next call is that all 10k streams are ready in one cycle. This will trigger 10k cursor initialization which does some non-trivial work converting the record batch. But the current code is doing exactly the same thing today already so it's no worse than the status quo as far as I can tell.

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Jun 11, 2025

Previously, as soon as one stream returned Pending, the merge would short-circuit and return Pending, minimizing work per cycle. With the new approach, we now poll all N streams once before giving up, which can add extra CPU cost, especially when you have high parallelism

@zhuqi-lucas this is a really interesting topic and tricky tradeoff. The previous code would indeed do one poll per cycle and then do a self-wake yield. The new code polls each stream once and then yields without self-wake. In terms of the amount of work done there is no difference, not any extra CPU cost. On the contrary, the self-wake yield is removal of pure waste. Unless I'm missing something, total elapsed time should be less.

But the tradeoff is that we're going to extend the elapsed time per cycle. As a consequence it may take a bit longer to yield to the caller, and then the cancellation problem rears its ugly head again.

If this actually matters or not is entirely dependent on the streams being polled. SortPreservingMergeExec for instance lets things up to use RecordBatchReceiverStream instances as children. Polling those repeatedly until the first record batch arrives is quite wasteful because you're really just spinning in place. Checking each receiver in a loop is going to be super quick, and yielding in between every partition is not useful at all. If the child streams are blocking though, then it's a different matter. You probably don't want to actually drive the progress of each stream synchronously in a loop.

So... it's a complicated balancing act. Would love to hear how others look at this problem. PR #16319 ties into this. It's an experiment I'm doing to see if we can avoid exposing the potentially blocking portion of streams to the caller so that the problem described above kind of disappears. It's not yet clear if this can be achieved without a performance penalty.

Thank you @pepijnve , some trade-off solution may be:

Trade-off Strategy : Batch Polling

Approach that sits between the two extremes is batch polling.
Instead of polling all streams every time, you divide the N child streams into several groups (e.g., 3 groups).

On each poll cycle, you only poll one group of streams.

On the next cycle, you move to the next group, and so on.

This helps reduce the per-cycle polling cost while still making steady progress across all streams over time.

Especially in high-parallelism scenarios where polling all streams on every wake-up is unnecessarily expensive.

And our benchmark is not enough i believe, we may need to mock those cases... But it's really hard.

I am ok we can merge this PR first. Because corner case is always existed, we can solve it when it happen. Thanks!

@pepijnve
Copy link
Contributor Author

pepijnve commented Jun 11, 2025

Especially in high-parallelism scenarios where polling all streams on every wake-up is unnecessarily expensive.

@zhuqi-lucas my assumption is that in high-parallelism situations all those streams are distinct tasks with a channel on the end. I that incorrect? And is checking a channel for readiness such an expensive operation?

What we're doing here is, as far as I understand it, a bespoke version of tokio::select! on 10k channels in the end. That should work, shouldn't it?

@Dandandan
Copy link
Contributor

Perhaps we can run the sorting benchmarks (tpch_sort) as well just to be sure there are no regressions? AFAIU there shouldn't be but let's see

@zhuqi-lucas
Copy link
Contributor

Especially in high-parallelism scenarios where polling all streams on every wake-up is unnecessarily expensive.

@zhuqi-lucas my assumption is that in high-parallelism situations all those streams are distinct tasks with a channel on the end. I that incorrect? And is checking a channel for readiness such an expensive operation?

What we're doing here is, as far as I understand it, a bespoke version of tokio::select! on 10k channels in the end. That should work, shouldn't it?

@pepijnve
Just saw the update for max 10k channels, i think it's harmless for performance, i agree we can also run sort-tpch to see the result.

@zhuqi-lucas
Copy link
Contributor

zhuqi-lucas commented Jun 11, 2025

I just checked, and SortExec is also setting up RecordBatchReceiverStream. The worst case scenario in terms of elapsed time in the poll_next call is that all 10k streams are ready in one cycle. This will trigger 10k cursor initialization which does some non-trivial work converting the record batch. But the current code is doing exactly the same thing today already so it's no worse than the status quo as far as I can tell.

@pepijnve This is a good explanation. So even if there are 10k streams backed by RecordBatchReceiverStream, they are ultimately based on Tokio's channel mechanism, which relies on Linux's epoll, so it's still very efficient.

@pepijnve
Copy link
Contributor Author

they are ultimately based on Tokio's channel mechanism, which relies on Linux's epoll, so it's still very efficient

I'm pretty sure that's not the case. There are no file descriptors involved in this as far as I can tell, it's just a bunch of user-space mpsc queues and semaphores (see implementation of RecordBatchReceiverStream, tokio::sync::mpsc::Receiver, and tokio::sync::mpsc::Chan). Kernel-based polling isn't involved here.

@zhuqi-lucas
Copy link
Contributor

they are ultimately based on Tokio's channel mechanism, which relies on Linux's epoll, so it's still very efficient

I'm pretty sure that's not the case. There are no file descriptors involved in this as far as I can tell, it's just a bunch of user-space mpsc queues and semaphores (see implementation of RecordBatchReceiverStream, tokio::sync::mpsc::Receiver, and tokio::sync::mpsc::Chan). Kernel-based polling isn't involved here.

Ok i see, i am not checking the sort merge spilling files case, if it's not the case, it should be small for cpu to run.

@alamb
Copy link
Contributor

alamb commented Jun 12, 2025

I just checked, and SortExec is also setting up RecordBatchReceiverStream. The worst case scenario in terms of elapsed time in the poll_next call is that all 10k streams are ready in one cycle. This will trigger 10k cursor initialization which does some non-trivial work converting the record batch. But the current code is doing exactly the same thing today already so it's no worse than the status quo as far as I can tell.

For what it is worth, trying to merge 10k streams will be bad for a lot of reasons (merge is linear in the number of input streams)

From my perspective this PR has concrete measurements that show it is faster and mostly theoretical conclusions that it is no worse as well.

Therefore I think it is good to go and will merge it. We can adjust / change / further optimize as we move on an get any additional information

@alamb alamb merged commit 4a857f0 into apache:main Jun 12, 2025
28 checks passed
@alamb
Copy link
Contributor

alamb commented Jun 12, 2025

Thanks again @pepijnve @Dandandan @ozankabak and @zhuqi-lucas

@berkaysynnada
Copy link
Contributor

Sorry for the late reply. @pepijnve Your diagnosis is spot on, and the proposed fix totally makes sense. I honestly can’t recall why I added the wake there but not in CongestedStream, most likely I just overlooked it. It's a really impactful gain, thank you.
One small note regarding this PR: if the pending rotation somehow breaks, since SortPreservingMergeStream never yields, tests relying on tokio::timeout (like test_spm_congestion) may fail to trigger the timeout signal. That said, I don’t think it’s a big deal to address right now.

@pepijnve
Copy link
Contributor Author

if the pending rotation somehow breaks, since SortPreservingMergeStream never yields

I'm not sure I understand what you mean @berkaysynnada. Looking at just the initial phase it should still yield after each round of polling if there are any pending streams.

@berkaysynnada
Copy link
Contributor

if the pending rotation somehow breaks, since SortPreservingMergeStream never yields

I'm not sure I understand what you mean @berkaysynnada. Looking at just the initial phase it should still yield after each round of polling if there are any pending streams.

I mean when we remove this incremental (that's how I test that test_spm_congestion covers this rotation):

                         // The polled stream is pending which means we're already set up to
                         // be woken when necessary
                         // Try the next stream
                         idx += 1;

the test never finishes. But as I said, not a big deal as the test is written in that way.

@pepijnve
Copy link
Contributor Author

If think I've got you covered. Try commenting out https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs#L1313 and running the test. This doesn't really test the rotation per se, but it keeps all the streams congested. The timeout triggers and the test reports SortPreservingMerge caused a deadlock for me. Is that sufficient?

I believe I had the test hang you described while I was working on this and adapted the test case a bit to make sure the safety net of the test stays in place.

@alamb
Copy link
Contributor

alamb commented Jun 12, 2025

Is there an additional test we should write perhaps, to add the coverage @berkaysynnada suggests?

@pepijnve
Copy link
Contributor Author

I don't think it's necessary TBH. I applied this patch (which I think is what @berkaysynnada meant) and the test then fails in the way it's intended to.

Index: datafusion/physical-plan/src/sorts/merge.rs
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs
--- a/datafusion/physical-plan/src/sorts/merge.rs	(revision 31c570e3ee7fa830753b2bbab3ec1a635ef16a30)
+++ b/datafusion/physical-plan/src/sorts/merge.rs	(date 1749736195516)
@@ -227,7 +227,8 @@
                         // The polled stream is pending which means we're already set up to
                         // be woken when necessary
                         // Try the next stream
-                        idx += 1;
+                        // idx += 1;
+                        return Poll::Pending;
                     }
                     _ => {
                         // The polled stream is ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Busy-waiting in SortPreservingMergeStream
6 participants