-
Notifications
You must be signed in to change notification settings - Fork 1.5k
bug: remove busy-wait while sort is ongoing #16322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6334f47
to
f7405ee
Compare
A sort preserving merge specific test case started failing. I’ll dig deeper to better understand what’s going on. |
@berkaysynnada I hope it's ok that I ping you directly; reaching out because I believe you are the author of the test case in question. I believe this PR surfaced a mistake in the
I've fixed the test implementation which makes the test pass again, but would like to get your opinion on this. Maybe I'm missing something. |
ad50fbe
to
8503407
Compare
This seems really nice 🚀
|
@Dandandan project newbie question, my daily practice at work is to handle code review comments using amend/force-push. Did so out of habit before thinking to as ask. Is that ok in this project or does the community prefer fixup commits and squash before merging? |
IMO it doesn't matter too much (personally I like to create individual commits). We'll squash-merge the commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🚀 maybe @alamb or @berkaysynnada can have another look
🤖 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve and @Dandandan -- this makes sense to me too. I took a careful look at the code and I found it to be easy to follow and clear ❤️
I think we should wait a few days before mergning this to see if others would like to review. I think @jayzhan211 and @ozankabak may also be interested
// If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None), | ||
// we remove this partition from the queue so it is not polled again. | ||
self.uninitiated_partitions.pop_front(); | ||
// The polled stream is ready |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for these comments 👍
🤖: Benchmark completed Details
|
Since this changes the congestion behavior test, which I'm not deeply familiar with, let's hear from @berkaysynnada on this to make sure we are not losing anything (he will be back tomorrow, so it shouldn't be long). If he is OK with the change, 10% performance improvement would be great. |
Waiting for benchmarks results here so I have some time to write up my assessment of what was happening and what has changed. This is just to assist any reviewers, not to replace review or justify the changes. Initial test failureThe congestion test set up three streams to merge:
After the initial change the congestion test started failing. The poll sequence I observed was
and then the test hung. This turned out to be caused by CongestedStream returning pending, but not holding on to the waker to wake it when the congestion clears. The initial phase of SPM was now assuming it would be woken up by any stream that returned pending and that wasn't happening. CongestedStream fixI don't believe it's valid for a Stream to return pending but not set up waking, so I fixed that and then got this poll sequence.
and the test passed. Using swap_removeBased on initial review feedback I restored the usage of swap_remove which results in the following sequence.
Due to swap_remove changing the poll order, stream 3 was getting polled before stream 2. As a consequence we never hit the congestion situation and the test case passes, but no longer tests what it says it does. Testing congestion againI adapted CongestedStream to keep track of the set of streams that have been polled. Both stream 2 and 3 are now treated as congested unless all streams have been polled. This results in the following sequence.
The test case now hits the congestion point again. As a bonus the test is no longer dependent on the exact order in which the streams are polled which decouples it from the implementation details a bit more. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve , good finding LGTM, remaining question for me:
- Do we evaluate the overhead when we have high parallelism streams?
It may increased polling overhead
Previously, as soon as one stream returned Pending, the merge would short-circuit and return Pending, minimizing work per cycle. With the new approach, we now poll all N streams once before giving up, which can add extra CPU cost, especially when you have high parallelism (e.g., 16, 64, 128, 256, 512 streams).
- The many files spilling cases for sort merge? For example, 10000 files or larger.
If we have many files to merge, and each file has large IO time to return pending, so we also need to polling each file then to return pending, i am not sure this case will add overhead because we need to polling all files to merge?
Thanks!
I did not myself. Only theoretical reasoning and simulating scenarios in my head. I'm still working on finding a reliable benchmarking environment to actually measure things. I seem to always get too much jitter for it to be useful.
@zhuqi-lucas this is a really interesting topic and tricky tradeoff. The previous code would indeed do one poll per cycle and then do a self-wake yield. The new code polls each stream once and then yields without self-wake. But the tradeoff is that we're going to extend the elapsed time per cycle. As a consequence it may take a bit longer to yield to the caller, and then the cancellation problem rears its ugly head again. If this actually matters or not is entirely dependent on the streams being polled. So... it's a complicated balancing act. Would love to hear how others look at this problem.
I just checked, and SortExec is also setting up |
Thank you @pepijnve , some trade-off solution may be: Trade-off Strategy : Batch Polling
Approach that sits between the two extremes is batch polling.
Instead of polling all streams every time, you divide the N child streams into several groups (e.g., 3 groups).
On each poll cycle, you only poll one group of streams.
On the next cycle, you move to the next group, and so on.
This helps reduce the per-cycle polling cost while still making steady progress across all streams over time.
Especially in high-parallelism scenarios where polling all streams on every wake-up is unnecessarily expensive. And our benchmark is not enough i believe, we may need to mock those cases... But it's really hard. I am ok we can merge this PR first. Because corner case is always existed, we can solve it when it happen. Thanks! |
@zhuqi-lucas my assumption is that in high-parallelism situations all those streams are distinct tasks with a channel on the end. I that incorrect? And is checking a channel for readiness such an expensive operation? What we're doing here is, as far as I understand it, a bespoke version of |
Perhaps we can run the sorting benchmarks (tpch_sort) as well just to be sure there are no regressions? AFAIU there shouldn't be but let's see |
@pepijnve |
@pepijnve This is a good explanation. So even if there are 10k streams backed by RecordBatchReceiverStream, they are ultimately based on Tokio's channel mechanism, which relies on Linux's epoll, so it's still very efficient. |
I'm pretty sure that's not the case. There are no file descriptors involved in this as far as I can tell, it's just a bunch of user-space mpsc queues and semaphores (see implementation of |
Ok i see, i am not checking the sort merge spilling files case, if it's not the case, it should be small for cpu to run. |
For what it is worth, trying to merge 10k streams will be bad for a lot of reasons (merge is linear in the number of input streams) From my perspective this PR has concrete measurements that show it is faster and mostly theoretical conclusions that it is no worse as well. Therefore I think it is good to go and will merge it. We can adjust / change / further optimize as we move on an get any additional information |
Thanks again @pepijnve @Dandandan @ozankabak and @zhuqi-lucas |
Sorry for the late reply. @pepijnve Your diagnosis is spot on, and the proposed fix totally makes sense. I honestly can’t recall why I added the wake there but not in CongestedStream, most likely I just overlooked it. It's a really impactful gain, thank you. |
I'm not sure I understand what you mean @berkaysynnada. Looking at just the initial phase it should still yield after each round of polling if there are any pending streams. |
I mean when we remove this incremental (that's how I test that test_spm_congestion covers this rotation):
the test never finishes. But as I said, not a big deal as the test is written in that way. |
If think I've got you covered. Try commenting out https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs#L1313 and running the test. This doesn't really test the rotation per se, but it keeps all the streams congested. The timeout triggers and the test reports I believe I had the test hang you described while I was working on this and adapted the test case a bit to make sure the safety net of the test stays in place. |
Is there an additional test we should write perhaps, to add the coverage @berkaysynnada suggests? |
I don't think it's necessary TBH. I applied this patch (which I think is what @berkaysynnada meant) and the test then fails in the way it's intended to.
|
Which issue does this PR close?
Rationale for this change
SortPreservingMergeStream
works in two phases. It first waits for each input stream to be ready to emit. Once everybody's ready it proceeds to an emit phase.During the waiting phase, it will poll each stream in a round-robin fashion. If any stream returns
Pending
the code self-wakes the current task and immediately returnsPending
. This results in busy-waiting when waiting for, for instance, aSortExec
that's sorting its data or any other pipeline breaker.While this works, it wastes CPU cycles.
What changes are included in this PR?
Rather than returning immediately when one stream is pending, poll each stream once. Only return pending when there are still streams left that have not started emitting. This assumes that the pending streams are well behaved and will wake the task when they need to be polled again as required by the
Stream
contract. Note that this may surface bugs in other streams.Rotation of
uninitiated_partitions
has been removed since that's no longer useful. There was a comment in the code about 'upstream buffer size increase', but I'm not sure what that was referring to.Are these changes tested?
Only by existing test and manual testing
Are there any user-facing changes?
No