Skip to content

GH-46224: [C++][Acero] Fix the hang in asof join #46300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 14, 2025
Merged
10 changes: 4 additions & 6 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -639,12 +639,10 @@ class InputState : public util::SerialSequencingQueue::Processor {
// hit the end of the batch, need to get the next batch if possible.
++batches_processed_;
latest_ref_row_ = 0;
have_active_batch &= !queue_.TryPop();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this code is apparently NOT doing what it is supposed to do: by here the have_active_batch must be true, meaning queue_ must be non-empty. Then popping the queue always gets a valid std::optional and effectively setting have_active_batch to false. The if branch below won't be executed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed there are two fixes made before in the following if branch #36094 and #36499. Given that this have_active_batch &= !queue_.TryPop() exists since #13028 where it was introduced, and the lack of corresponding tests, I'm not sure how they fixed anything and what the bugs even were. So I just remove them.

if (have_active_batch) {
DCHECK_GT(queue_.Front()->num_rows(), 0); // empty batches disallowed
memo_.UpdateTime(GetTime(queue_.Front().get(), time_type_id_, time_col_index_,
0)); // time changed
}
bool did_pop = queue_.TryPop().has_value();
DCHECK(did_pop);
ARROW_UNUSED(did_pop);
have_active_batch = !queue_.Empty();
}
}
return have_active_batch;
Expand Down
54 changes: 54 additions & 0 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "arrow/compute/cast.h"
#include "arrow/compute/row/row_encoder_internal.h"
#include "arrow/compute/test_util_internal.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
Expand Down Expand Up @@ -1770,5 +1771,58 @@ TEST(AsofJoinTest, DestroyNonStartedAsofJoinNode) {
DeclarationToStatus(std::move(sink)));
}

// Reproduction of GH-46224: Hang when all left timestamps are greater than right
// timestamps.
TEST(AsofJoinTest, OneSideTsAllGreaterThanTheOther) {
#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)
const int rounds = 1;
#else
const int rounds = 42;
#endif
int64_t tolerance = 1;
int64_t num_rows_big_ts = 1;
int64_t num_rows_small_ts = ExecPlan::kMaxBatchSize + 1;
// Make sure the big_ts is outside the horizon of the tolerance regardless of the side.
int64_t big_ts = num_rows_small_ts + tolerance + 1;

// Column of big timestamps.
ASSERT_OK_AND_ASSIGN(auto col_big_ts,
gen::Constant(MakeScalar(big_ts))->Generate(num_rows_big_ts));
// Column of small timestamps from 0 to num_rows_small_ts - 1.
ASSERT_OK_AND_ASSIGN(auto col_small_ts,
gen::Step<int64_t>()->Generate(num_rows_small_ts));

struct Case {
std::shared_ptr<arrow::Array> left_col;
std::shared_ptr<arrow::Array> right_col;
};

for (const auto& c : {
Case{col_big_ts, col_small_ts},
Case{col_small_ts, col_big_ts},
}) {
auto left_schema = arrow::schema({arrow::field("on", int64())});
auto right_schema = arrow::schema({arrow::field("on", int64())});

ExecBatch left_batch({c.left_col}, c.left_col->length());
ExecBatch right_batch({c.right_col}, c.right_col->length());
ASSERT_OK_AND_ASSIGN(auto col_null, MakeArrayOfNull(int64(), c.left_col->length()));
ExecBatch exp_batch({c.left_col, col_null}, c.left_col->length());

// Run moderate number of times to ensure that no hangs occur.
for (int i = 0; i < rounds; ++i) {
AsofJoinNodeOptions opts({{{"on"}, {}}, {{"on"}, {}}}, tolerance);
auto left = Declaration("exec_batch_source",
ExecBatchSourceNodeOptions(left_schema, {left_batch}));
auto right = Declaration("exec_batch_source",
ExecBatchSourceNodeOptions(right_schema, {right_batch}));
auto asof_join = arrow::acero::Declaration{"asofjoin", {left, right}, opts};
ASSERT_OK_AND_ASSIGN(auto result,
arrow::acero::DeclarationToExecBatches(std::move(asof_join)));
AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches);
}
}
}

} // namespace acero
} // namespace arrow
Loading