diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index c21af3da84f25..1be41107516a5 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -639,12 +639,10 @@ class InputState : public util::SerialSequencingQueue::Processor { // hit the end of the batch, need to get the next batch if possible. ++batches_processed_; latest_ref_row_ = 0; - have_active_batch &= !queue_.TryPop(); - if (have_active_batch) { - DCHECK_GT(queue_.Front()->num_rows(), 0); // empty batches disallowed - memo_.UpdateTime(GetTime(queue_.Front().get(), time_type_id_, time_col_index_, - 0)); // time changed - } + bool did_pop = queue_.TryPop().has_value(); + DCHECK(did_pop); + ARROW_UNUSED(did_pop); + have_active_batch = !queue_.Empty(); } } return have_active_batch; diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 271ad6018f2b2..59a9b4ebba12c 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -44,6 +44,7 @@ #include "arrow/compute/cast.h" #include "arrow/compute/row/row_encoder_internal.h" #include "arrow/compute/test_util_internal.h" +#include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" #include "arrow/testing/random.h" @@ -1770,5 +1771,58 @@ TEST(AsofJoinTest, DestroyNonStartedAsofJoinNode) { DeclarationToStatus(std::move(sink))); } +// Reproduction of GH-46224: Hang when all left timestamps are greater than right +// timestamps. +TEST(AsofJoinTest, OneSideTsAllGreaterThanTheOther) { +#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) + const int rounds = 1; +#else + const int rounds = 42; +#endif + int64_t tolerance = 1; + int64_t num_rows_big_ts = 1; + int64_t num_rows_small_ts = ExecPlan::kMaxBatchSize + 1; + // Make sure the big_ts is outside the horizon of the tolerance regardless of the side. + int64_t big_ts = num_rows_small_ts + tolerance + 1; + + // Column of big timestamps. + ASSERT_OK_AND_ASSIGN(auto col_big_ts, + gen::Constant(MakeScalar(big_ts))->Generate(num_rows_big_ts)); + // Column of small timestamps from 0 to num_rows_small_ts - 1. + ASSERT_OK_AND_ASSIGN(auto col_small_ts, + gen::Step()->Generate(num_rows_small_ts)); + + struct Case { + std::shared_ptr left_col; + std::shared_ptr right_col; + }; + + for (const auto& c : { + Case{col_big_ts, col_small_ts}, + Case{col_small_ts, col_big_ts}, + }) { + auto left_schema = arrow::schema({arrow::field("on", int64())}); + auto right_schema = arrow::schema({arrow::field("on", int64())}); + + ExecBatch left_batch({c.left_col}, c.left_col->length()); + ExecBatch right_batch({c.right_col}, c.right_col->length()); + ASSERT_OK_AND_ASSIGN(auto col_null, MakeArrayOfNull(int64(), c.left_col->length())); + ExecBatch exp_batch({c.left_col, col_null}, c.left_col->length()); + + // Run moderate number of times to ensure that no hangs occur. + for (int i = 0; i < rounds; ++i) { + AsofJoinNodeOptions opts({{{"on"}, {}}, {{"on"}, {}}}, tolerance); + auto left = Declaration("exec_batch_source", + ExecBatchSourceNodeOptions(left_schema, {left_batch})); + auto right = Declaration("exec_batch_source", + ExecBatchSourceNodeOptions(right_schema, {right_batch})); + auto asof_join = arrow::acero::Declaration{"asofjoin", {left, right}, opts}; + ASSERT_OK_AND_ASSIGN(auto result, + arrow::acero::DeclarationToExecBatches(std::move(asof_join))); + AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); + } + } +} + } // namespace acero } // namespace arrow