-
Notifications
You must be signed in to change notification settings - Fork 3.7k
GH-46224: [C++][Acero] Fix the hang in asof join #46300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@github-actions crossbow submit -g cpp -g r |
Revision: aa5a9be Submitted crossbow builds: ursacomputing/crossbow @ actions-d733af0ab3 |
@github-actions crossbow submit -g cpp -g r |
Revision: 6ea4624 Submitted crossbow builds: ursacomputing/crossbow @ actions-a2fa793a58 |
@@ -639,12 +639,8 @@ class InputState : public util::SerialSequencingQueue::Processor { | |||
// hit the end of the batch, need to get the next batch if possible. | |||
++batches_processed_; | |||
latest_ref_row_ = 0; | |||
have_active_batch &= !queue_.TryPop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this code is apparently NOT doing what it is supposed to do: by here the have_active_batch
must be true
, meaning queue_
must be non-empty. Then popping the queue always gets a valid std::optional
and effectively setting have_active_batch
to false. The if
branch below won't be executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed there are two fixes made before in the following if
branch #36094 and #36499. Given that this have_active_batch &= !queue_.TryPop()
exists since #13028 where it was introduced, and the lack of corresponding tests, I'm not sure how they fixed anything and what the bugs even were. So I just remove them.
Hi, @westonpace @icexelloss @pitrou would you help to review? Thanks. |
@github-actions crossbow submit -g cpp -g python -g r |
Revision: af70071 Submitted crossbow builds: ursacomputing/crossbow @ actions-ccf7708cfd |
I'm curious, this does only happen when the right side is concerned? The update logic is not symmetric? |
Right, not symmetric as one might would naturally expect (the same situation for hash join as well - right side to build the hash table and left side to probe the hash table). The left VS. right differs in the following aspects:
So it's normal that some issues are right side only. But it does remind me that the method I was fixing, namely Thanks for the question! |
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Hi @pitrou , comments all addressed. Would you take a look again? Thanks. |
@github-actions crossbow submit -g cpp |
Revision: dd7a66b Submitted crossbow builds: ursacomputing/crossbow @ actions-7d095c8a25 |
@github-actions crossbow submit -g cpp |
Revision: 5f5b895 Submitted crossbow builds: ursacomputing/crossbow @ actions-973e5e92ed |
Thanks @pitrou for reviewing. Is there anything else to address? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
After merging your PR, Conbench analyzed the 4 benchmarking runs that have been run so far on merge-commit fe29b7d. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about 19 possible false positives for unstable benchmarks that are known to sometimes produce them. |
Rationale for this change
A hang of asof join is reported in #46224 . To explain the cause of the hang, I shall first brief the basic processing of asof join.
There is an outstanding processing thread for the asof join, whose lifespan is bound to the asof node. This thread will wait on a command queue. Once a command is pushed into the queue, the thread pops it and do one round of processing. The command is issued as the batches are received from either left or right side input of the asof join node. Each round of processing will advance the accumulated left and right inputs as much as possible to see if it is sufficient to output a batch (if yes, i.e., either side of the asof join can be concluded by the latest input timestamp, then emit it, otherwise adjust the internal state according to the latest input timestamp).
Notably, the processing is by design to advance the inputs "as much as possible". However there is a bug (seemingly since day 1) when advancing the right side input in a tricky condition: if all right side rows are at the times before the least time of the left rows, then advancing the right side input won't cross batches in one round of processing. This is OK (i.e., no hang) as long as there are enough commands issued into the queue to trigger enough rounds of processing. However if lucky enough, say, the left input batches come late, then the processing with consume some commands issued by the right side input but advance nothing (because the left side input is empty), resulting in insufficient commands to fully advance both the input.
What changes are included in this PR?
Fix the issue that advancing the right side input won't cross batches when the minimal left time is bigger than all the right times.
Are these changes tested?
Yes, added a dedicated case.
Are there any user-facing changes?
None.