From b4b0cf8e17543eb52cbd3338ddc59e6e663e34f0 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Fri, 2 May 2025 23:39:46 -0700 Subject: [PATCH 1/9] Draft the fix --- cpp/src/arrow/acero/asof_join_node.cc | 35 ++++++++++++++++++---- cpp/src/arrow/acero/asof_join_node_test.cc | 29 ++++++++++++++++++ cpp/src/arrow/acero/exec_plan.h | 1 + 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index c21af3da84f25..f1e340af04318 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -639,12 +639,14 @@ class InputState : public util::SerialSequencingQueue::Processor { // hit the end of the batch, need to get the next batch if possible. ++batches_processed_; latest_ref_row_ = 0; - have_active_batch &= !queue_.TryPop(); - if (have_active_batch) { - DCHECK_GT(queue_.Front()->num_rows(), 0); // empty batches disallowed - memo_.UpdateTime(GetTime(queue_.Front().get(), time_type_id_, time_col_index_, - 0)); // time changed - } + std::ignore = queue_.TryPop(); + have_active_batch = !queue_.Empty(); + // have_active_batch &= !queue_.TryPop(); + // if (have_active_batch) { + // DCHECK_GT(queue_.Front()->num_rows(), 0); // empty batches disallowed + // memo_.UpdateTime(GetTime(queue_.Front().get(), time_type_id_, time_col_index_, + // 0)); // time changed + // } } } return have_active_batch; @@ -1427,9 +1429,27 @@ class AsofJoinNode : public ExecNode { ARROW_DCHECK(std_has(inputs_, input)); size_t k = std_find(inputs_, input) - inputs_.begin(); + if (k == 0) { + // static bool hang = true; + // if (hang) { + // hang = false; + // // Hang. + // std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + // } + // std::this_thread::sleep_for(std::chrono::milliseconds(300)); + } + // Put into the sequencing queue ARROW_RETURN_NOT_OK(state_.at(k)->InsertBatch(std::move(batch))); + // if (k == 0) { + // static int i = 0; + // if (i == 1) { + // PushProcess(true); + // PushProcess(true); + // } + // i++; + // } PushProcess(true); return Status::OK(); @@ -1440,6 +1460,9 @@ class AsofJoinNode : public ExecNode { std::lock_guard guard(gate_); ARROW_DCHECK(std_has(inputs_, input)); size_t k = std_find(inputs_, input) - inputs_.begin(); + // if (k == 0) { + // std::this_thread::sleep_for(std::chrono::milliseconds(6000)); + // } state_.at(k)->set_total_batches(total_batches); } // Trigger a process call diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 271ad6018f2b2..b7d23eb02f719 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -44,6 +44,7 @@ #include "arrow/compute/cast.h" #include "arrow/compute/row/row_encoder_internal.h" #include "arrow/compute/test_util_internal.h" +#include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" #include "arrow/testing/random.h" @@ -1770,5 +1771,33 @@ TEST(AsofJoinTest, DestroyNonStartedAsofJoinNode) { DeclarationToStatus(std::move(sink))); } +TEST(AsofJoinTest, DeadLock) { + int64_t n_left = 1; + int64_t n_right = ExecPlan::kMaxBatchSize + 1; + int64_t tolerance = 1; + + auto left_schema = arrow::schema({arrow::field("on", int64())}); + auto right_schema = arrow::schema({arrow::field("on", int64())}); + + ASSERT_OK_AND_ASSIGN(auto left_col, + gen::Constant(MakeScalar(n_right + 1))->Generate(n_left)); + ASSERT_OK_AND_ASSIGN(auto right_col, gen::Step(0ll, 1ll)->Generate(n_right)); + + auto left_table = Table::Make(left_schema, {left_col}); + auto right_table = Table::Make(right_schema, {right_col}); + + for (int i = 0; i < 1000; ++i) { + std::cout << i << std::endl; + + AsofJoinNodeOptions opts({{{"on"}, {}}, {{"on"}, {}}}, tolerance); + auto left = Declaration("table_source", TableSourceNodeOptions(left_table)); + auto right = Declaration("table_source", TableSourceNodeOptions(right_table)); + auto asof_join = arrow::acero::Declaration{"asofjoin", {left, right}, opts}; + ASSERT_OK_AND_ASSIGN(auto result, + arrow::acero::DeclarationToExecBatches(std::move(asof_join))); + std::cout << result.batches[0].length << " rows" << std::endl; + } +} + } // namespace acero } // namespace arrow diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h index dba6c64ddc837..c3d78f61de227 100644 --- a/cpp/src/arrow/acero/exec_plan.h +++ b/cpp/src/arrow/acero/exec_plan.h @@ -55,6 +55,7 @@ class ARROW_ACERO_EXPORT ExecPlan : public std::enable_shared_from_this; virtual ~ExecPlan() = default; From d1e11ae1da3ff445b3b1722c17fdc85e65ac11c9 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Sat, 3 May 2025 00:01:28 -0700 Subject: [PATCH 2/9] Fix build --- cpp/src/arrow/acero/asof_join_node.cc | 6 ------ cpp/src/arrow/acero/asof_join_node_test.cc | 2 +- cpp/src/arrow/compute/row/doc/row_table.md | 0 3 files changed, 1 insertion(+), 7 deletions(-) create mode 100644 cpp/src/arrow/compute/row/doc/row_table.md diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index f1e340af04318..b29626d875e26 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -641,12 +641,6 @@ class InputState : public util::SerialSequencingQueue::Processor { latest_ref_row_ = 0; std::ignore = queue_.TryPop(); have_active_batch = !queue_.Empty(); - // have_active_batch &= !queue_.TryPop(); - // if (have_active_batch) { - // DCHECK_GT(queue_.Front()->num_rows(), 0); // empty batches disallowed - // memo_.UpdateTime(GetTime(queue_.Front().get(), time_type_id_, time_col_index_, - // 0)); // time changed - // } } } return have_active_batch; diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index b7d23eb02f719..db761557194dd 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1781,7 +1781,7 @@ TEST(AsofJoinTest, DeadLock) { ASSERT_OK_AND_ASSIGN(auto left_col, gen::Constant(MakeScalar(n_right + 1))->Generate(n_left)); - ASSERT_OK_AND_ASSIGN(auto right_col, gen::Step(0ll, 1ll)->Generate(n_right)); + ASSERT_OK_AND_ASSIGN(auto right_col, gen::Step()->Generate(n_right)); auto left_table = Table::Make(left_schema, {left_col}); auto right_table = Table::Make(right_schema, {right_col}); diff --git a/cpp/src/arrow/compute/row/doc/row_table.md b/cpp/src/arrow/compute/row/doc/row_table.md new file mode 100644 index 0000000000000..e69de29bb2d1d From f1397a1f9b107cd1ceb59f2aa3f832097e1c49b6 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Sat, 3 May 2025 15:06:15 +0800 Subject: [PATCH 3/9] Delete cpp/src/arrow/compute/row/doc/row_table.md --- cpp/src/arrow/compute/row/doc/row_table.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 cpp/src/arrow/compute/row/doc/row_table.md diff --git a/cpp/src/arrow/compute/row/doc/row_table.md b/cpp/src/arrow/compute/row/doc/row_table.md deleted file mode 100644 index e69de29bb2d1d..0000000000000 From 4b9db70b327af74b4950d2269dd7169895260303 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Tue, 6 May 2025 16:04:36 -0700 Subject: [PATCH 4/9] Remove hack for reproduction --- cpp/src/arrow/acero/asof_join_node.cc | 21 --------------------- cpp/src/arrow/acero/asof_join_node_test.cc | 17 ++++++++++------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index b29626d875e26..498b89d106972 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -1423,27 +1423,9 @@ class AsofJoinNode : public ExecNode { ARROW_DCHECK(std_has(inputs_, input)); size_t k = std_find(inputs_, input) - inputs_.begin(); - if (k == 0) { - // static bool hang = true; - // if (hang) { - // hang = false; - // // Hang. - // std::this_thread::sleep_for(std::chrono::milliseconds(3000)); - // } - // std::this_thread::sleep_for(std::chrono::milliseconds(300)); - } - // Put into the sequencing queue ARROW_RETURN_NOT_OK(state_.at(k)->InsertBatch(std::move(batch))); - // if (k == 0) { - // static int i = 0; - // if (i == 1) { - // PushProcess(true); - // PushProcess(true); - // } - // i++; - // } PushProcess(true); return Status::OK(); @@ -1454,9 +1436,6 @@ class AsofJoinNode : public ExecNode { std::lock_guard guard(gate_); ARROW_DCHECK(std_has(inputs_, input)); size_t k = std_find(inputs_, input) - inputs_.begin(); - // if (k == 0) { - // std::this_thread::sleep_for(std::chrono::milliseconds(6000)); - // } state_.at(k)->set_total_batches(total_batches); } // Trigger a process call diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index db761557194dd..6aa1007080ca7 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1783,19 +1783,22 @@ TEST(AsofJoinTest, DeadLock) { gen::Constant(MakeScalar(n_right + 1))->Generate(n_left)); ASSERT_OK_AND_ASSIGN(auto right_col, gen::Step()->Generate(n_right)); - auto left_table = Table::Make(left_schema, {left_col}); - auto right_table = Table::Make(right_schema, {right_col}); + ExecBatch left_batch({left_col}, n_left); + ExecBatch right_batch({right_col}, n_right); - for (int i = 0; i < 1000; ++i) { - std::cout << i << std::endl; + auto exp_batch = ExecBatchFromJSON({int64(), int64()}, + "[[" + std::to_string(n_right + 1) + ", null]]"); + for (int i = 0; i < 42; ++i) { AsofJoinNodeOptions opts({{{"on"}, {}}, {{"on"}, {}}}, tolerance); - auto left = Declaration("table_source", TableSourceNodeOptions(left_table)); - auto right = Declaration("table_source", TableSourceNodeOptions(right_table)); + auto left = Declaration("exec_batch_source", + ExecBatchSourceNodeOptions(left_schema, {left_batch})); + auto right = Declaration("exec_batch_source", + ExecBatchSourceNodeOptions(right_schema, {right_batch})); auto asof_join = arrow::acero::Declaration{"asofjoin", {left, right}, opts}; ASSERT_OK_AND_ASSIGN(auto result, arrow::acero::DeclarationToExecBatches(std::move(asof_join))); - std::cout << result.batches[0].length << " rows" << std::endl; + AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); } } From ecf58a01f3106e86a77e23a4faccfc9886e10483 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Tue, 6 May 2025 16:06:44 -0700 Subject: [PATCH 5/9] Remove hack for reproduction --- cpp/src/arrow/acero/exec_plan.h | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h index c3d78f61de227..dba6c64ddc837 100644 --- a/cpp/src/arrow/acero/exec_plan.h +++ b/cpp/src/arrow/acero/exec_plan.h @@ -55,7 +55,6 @@ class ARROW_ACERO_EXPORT ExecPlan : public std::enable_shared_from_this; virtual ~ExecPlan() = default; From af70071bd8938c1be0759ad0b34b20a99eb8ed40 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Tue, 6 May 2025 16:09:50 -0700 Subject: [PATCH 6/9] Describe the test --- cpp/src/arrow/acero/asof_join_node_test.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 6aa1007080ca7..751c4cddf74bf 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1771,7 +1771,9 @@ TEST(AsofJoinTest, DestroyNonStartedAsofJoinNode) { DeclarationToStatus(std::move(sink))); } -TEST(AsofJoinTest, DeadLock) { +// Reproduction of GH-46224: Hang when all left timestamps are greater than right +// timestamps. +TEST(AsofJoinTest, LeftGreaterThanRight) { int64_t n_left = 1; int64_t n_right = ExecPlan::kMaxBatchSize + 1; int64_t tolerance = 1; From 3a273b361d19a8977fa963360e025ae7b61fe35f Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Thu, 8 May 2025 02:45:39 +0800 Subject: [PATCH 7/9] Update cpp/src/arrow/acero/asof_join_node.cc Co-authored-by: Antoine Pitrou --- cpp/src/arrow/acero/asof_join_node.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 498b89d106972..1be41107516a5 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -639,7 +639,9 @@ class InputState : public util::SerialSequencingQueue::Processor { // hit the end of the batch, need to get the next batch if possible. ++batches_processed_; latest_ref_row_ = 0; - std::ignore = queue_.TryPop(); + bool did_pop = queue_.TryPop().has_value(); + DCHECK(did_pop); + ARROW_UNUSED(did_pop); have_active_batch = !queue_.Empty(); } } From dd7a66b2971acb4d0c66eeca47383d5a3fd27e53 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Wed, 7 May 2025 12:52:38 -0700 Subject: [PATCH 8/9] Update test according to the review comment --- cpp/src/arrow/acero/asof_join_node_test.cc | 67 +++++++++++++--------- 1 file changed, 41 insertions(+), 26 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 751c4cddf74bf..1e97f04e24052 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1773,34 +1773,49 @@ TEST(AsofJoinTest, DestroyNonStartedAsofJoinNode) { // Reproduction of GH-46224: Hang when all left timestamps are greater than right // timestamps. -TEST(AsofJoinTest, LeftGreaterThanRight) { - int64_t n_left = 1; - int64_t n_right = ExecPlan::kMaxBatchSize + 1; +TEST(AsofJoinTest, OneSideTsAllGreaterThanTheOther) { int64_t tolerance = 1; + int64_t num_rows_big_ts = 1; + int64_t num_rows_small_ts = ExecPlan::kMaxBatchSize + 1; + // Make sure the big_ts is outside the horizon of the tolerance regardless of the side. + int64_t big_ts = num_rows_small_ts + tolerance + 1; + + // Column of big timestamps. + ASSERT_OK_AND_ASSIGN(auto col_big_ts, + gen::Constant(MakeScalar(big_ts))->Generate(num_rows_big_ts)); + // Column of small timestamps from 0 to num_rows_small_ts - 1. + ASSERT_OK_AND_ASSIGN(auto col_small_ts, + gen::Step()->Generate(num_rows_small_ts)); + + struct Case { + std::shared_ptr left_col; + std::shared_ptr right_col; + }; - auto left_schema = arrow::schema({arrow::field("on", int64())}); - auto right_schema = arrow::schema({arrow::field("on", int64())}); - - ASSERT_OK_AND_ASSIGN(auto left_col, - gen::Constant(MakeScalar(n_right + 1))->Generate(n_left)); - ASSERT_OK_AND_ASSIGN(auto right_col, gen::Step()->Generate(n_right)); - - ExecBatch left_batch({left_col}, n_left); - ExecBatch right_batch({right_col}, n_right); - - auto exp_batch = ExecBatchFromJSON({int64(), int64()}, - "[[" + std::to_string(n_right + 1) + ", null]]"); - - for (int i = 0; i < 42; ++i) { - AsofJoinNodeOptions opts({{{"on"}, {}}, {{"on"}, {}}}, tolerance); - auto left = Declaration("exec_batch_source", - ExecBatchSourceNodeOptions(left_schema, {left_batch})); - auto right = Declaration("exec_batch_source", - ExecBatchSourceNodeOptions(right_schema, {right_batch})); - auto asof_join = arrow::acero::Declaration{"asofjoin", {left, right}, opts}; - ASSERT_OK_AND_ASSIGN(auto result, - arrow::acero::DeclarationToExecBatches(std::move(asof_join))); - AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); + for (const auto& c : { + Case{col_big_ts, col_small_ts}, + Case{col_small_ts, col_big_ts}, + }) { + auto left_schema = arrow::schema({arrow::field("on", int64())}); + auto right_schema = arrow::schema({arrow::field("on", int64())}); + + ExecBatch left_batch({c.left_col}, c.left_col->length()); + ExecBatch right_batch({c.right_col}, c.right_col->length()); + ASSERT_OK_AND_ASSIGN(auto col_null, MakeArrayOfNull(int64(), c.left_col->length())); + ExecBatch exp_batch({c.left_col, col_null}, c.left_col->length()); + + // Run moderate number of times to ensure that no hangs occur. + for (int i = 0; i < 42; ++i) { + AsofJoinNodeOptions opts({{{"on"}, {}}, {{"on"}, {}}}, tolerance); + auto left = Declaration("exec_batch_source", + ExecBatchSourceNodeOptions(left_schema, {left_batch})); + auto right = Declaration("exec_batch_source", + ExecBatchSourceNodeOptions(right_schema, {right_batch})); + auto asof_join = arrow::acero::Declaration{"asofjoin", {left, right}, opts}; + ASSERT_OK_AND_ASSIGN(auto result, + arrow::acero::DeclarationToExecBatches(std::move(asof_join))); + AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); + } } } From 5f5b8956196f706f49a382cb7934b5eb6012a615 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Mon, 12 May 2025 10:56:33 -0700 Subject: [PATCH 9/9] Reduce test iteration for sanitizier and valgrind --- cpp/src/arrow/acero/asof_join_node_test.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 1e97f04e24052..59a9b4ebba12c 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1774,6 +1774,11 @@ TEST(AsofJoinTest, DestroyNonStartedAsofJoinNode) { // Reproduction of GH-46224: Hang when all left timestamps are greater than right // timestamps. TEST(AsofJoinTest, OneSideTsAllGreaterThanTheOther) { +#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) + const int rounds = 1; +#else + const int rounds = 42; +#endif int64_t tolerance = 1; int64_t num_rows_big_ts = 1; int64_t num_rows_small_ts = ExecPlan::kMaxBatchSize + 1; @@ -1805,7 +1810,7 @@ TEST(AsofJoinTest, OneSideTsAllGreaterThanTheOther) { ExecBatch exp_batch({c.left_col, col_null}, c.left_col->length()); // Run moderate number of times to ensure that no hangs occur. - for (int i = 0; i < 42; ++i) { + for (int i = 0; i < rounds; ++i) { AsofJoinNodeOptions opts({{{"on"}, {}}, {{"on"}, {}}}, tolerance); auto left = Declaration("exec_batch_source", ExecBatchSourceNodeOptions(left_schema, {left_batch}));