From d4ed6193fa5b9a959b90b8257f0f8c2ed9680a64 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 17 Apr 2025 13:17:38 +0000 Subject: [PATCH 01/14] added tests --- .../ut/ut_with_sdk/autoscaling_ut.cpp | 2 - .../persqueue/ut/ut_with_sdk/topic_ut.cpp | 203 +++++++++++++++++- 2 files changed, 192 insertions(+), 13 deletions(-) diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index d560b8181d03..48b3de7f3d43 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -1225,7 +1225,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); } - /* TODO uncomment this { // must be ignored, because wrong sessionid auto status = commit("random session", 0); @@ -1236,7 +1235,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { auto stats = getConsumerState(0); UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); } - */ } else { UNIT_ASSERT(false); } diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp index 5cc8256aaa09..9d625fa888d1 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp @@ -163,13 +163,58 @@ Y_UNIT_TEST_SUITE(WithSDK) { } } - Y_UNIT_TEST(CommitWithWrongSessionId) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1); + void PrepareFlatTopic(TTopicSdkTestSetup& setup) { + setup.CreateTopic(); setup.Write("message-1"); setup.Write("message-2"); setup.Write("message-3"); + } + + void PrepareAutopartitionedTopic(TTopicSdkTestSetup& setup) { + setup.CreateTopicWithAutoscale(); + + // Creating partition hierarchy + // 0 ──┬──> 1 ──┬──> 3 + // │ └──> 4 + // └──> 2 + // + // Each partition has 3 messages + + setup.Write("message-0-1", 0); + setup.Write("message-0-2", 0); + setup.Write("message-0-3", 0); + + { + ui64 txId = 1006; + SplitPartition(setup, ++txId, 0, "a"); + } + + setup.Write("message-1-1", 1); + setup.Write("message-1-2", 1); + setup.Write("message-1-3", 1); + + setup.Write("message-2-1", 2); + setup.Write("message-2-2", 2); + setup.Write("message-2-3", 2); + + { + ui64 txId = 1007; + SplitPartition(setup, ++txId, 1, "0"); + } + + setup.Write("message-3-1", 3); + setup.Write("message-3-2", 3); + setup.Write("message-3-3", 3); + + setup.Write("message-4-1", 4); + setup.Write("message-4-2", 4); + setup.Write("message-4-3", 4); + } + + Y_UNIT_TEST(CommitWithWrongSessionId) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareFlatTopic(setup); { auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 1, "wrong-read-session-id"); @@ -182,11 +227,7 @@ Y_UNIT_TEST_SUITE(WithSDK) { Y_UNIT_TEST(CommitToPastWithWrongSessionId) { TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1); - - setup.Write("message-1"); - setup.Write("message-2"); - setup.Write("message-3"); + PrepareFlatTopic(setup); { auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 2); @@ -205,8 +246,7 @@ Y_UNIT_TEST_SUITE(WithSDK) { } } - /* TODO Uncomment this test - Y_UNIT_TEST(CommitToParentPartitionWithWrongSessionId) { + Y_UNIT_TEST(Commit_ToParentPartitionWithWrongSessionId) { TTopicSdkTestSetup setup = CreateSetup(); setup.CreateTopicWithAutoscale(); @@ -248,7 +288,148 @@ Y_UNIT_TEST_SUITE(WithSDK) { Cerr << ">>>>> END" << Endl << Flush; } - */ + + Y_UNIT_TEST(Commit_WithoutSession_ParentNotFinished) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto getCommittedOffset = [&](size_t partition) { + auto desc = setup.DescribeConsumer(); + return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); + }; + + { + // Commit parent partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(0)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(1)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); + UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(1)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); + } + } + + Y_UNIT_TEST(Commit_WithoutSession_ToPastParentPartition) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto getCommittedOffset = [&](size_t partition) { + auto desc = setup.DescribeConsumer(); + return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); + }; + + { + // Commit child partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); + UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(1)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); + UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(3)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); + UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(1)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); + } + } + + Y_UNIT_TEST(Commit_WithSession_ParentNotFinished) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto getCommittedOffset = [&](size_t partition) { + auto desc = setup.DescribeConsumer(); + return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); + }; + + { + // Commit parent partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(0)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(1)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); + } + + { + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) { + return false; + }); + + // Commit parent to middle + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId()); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); + UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(1)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); + } + } + + Y_UNIT_TEST(Commit_WithSession_ToPastParentPartition) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto getCommittedOffset = [&](size_t partition) { + auto desc = setup.DescribeConsumer(); + return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); + }; + + { + // Commit parent partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); + UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(1)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); + UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(3)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); + } + + { + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) { + return false; + }); + + // Commit parent to middle + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId()); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); + UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(1)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); + UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(3)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); + } + } } } // namespace NKikimr From 19cdeda2fc6bbd9e0bc9d2a26a04b1c31c0df979 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 17 Apr 2025 15:48:43 +0000 Subject: [PATCH 02/14] more tests --- .../persqueue/ut/ut_with_sdk/topic_ut.cpp | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp index 9d625fa888d1..67bb0dfcaccb 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp @@ -430,6 +430,45 @@ Y_UNIT_TEST_SUITE(WithSDK) { UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); } } + + Y_UNIT_TEST(Commit_ToNewChild_WithoutCommitToParent) { + TTopicSdkTestSetup setup = CreateSetup(); + setup.CreateTopicWithAutoscale(); + + setup.Write("message-0-0", 0); + setup.Write("message-0-1", 0); + setup.Write("message-0-2", 0); + + auto getCommittedOffset = [&](size_t partition) { + auto desc = setup.DescribeConsumer(); + return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); + }; + + { + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + for (auto & m: x.GetMessages()) { + if (x.GetPartitionSession()->GetPartitionId() == 0 && m.GetOffset() == 1) { + ui64 txId = 1006; + SplitPartition(setup, ++txId, 0, "a"); + + setup.Write("message-1-0", 1); + setup.Write("message-1-1", 1); + setup.Write("message-1-2", 1); + } else if (x.GetPartitionSession()->GetPartitionId() == 1 && m.GetOffset() == 0) { + m.Commit(); + } + } + return false; + }); + + Sleep(TDuration::Seconds(2)); + + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(0)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(1)); + UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); + } + } + } } // namespace NKikimr From f1271f92d3784cfd41521902826bcd57569b36ca Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 17 Apr 2025 16:08:22 +0000 Subject: [PATCH 03/14] fix --- ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp index 67bb0dfcaccb..48542edf5143 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp @@ -456,13 +456,16 @@ Y_UNIT_TEST_SUITE(WithSDK) { setup.Write("message-1-2", 1); } else if (x.GetPartitionSession()->GetPartitionId() == 1 && m.GetOffset() == 0) { m.Commit(); + return false; } } - return false; + + return true; }); - Sleep(TDuration::Seconds(2)); + Sleep(TDuration::Seconds(3)); + // Commit hasn`t applyed because messages from the parent partitions has`t been committed UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(0)); UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(1)); UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); From 27a3c6f3842970eb94fca7f812ec514081694296 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 17 Apr 2025 16:15:01 +0000 Subject: [PATCH 04/14] fix --- ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp index 48542edf5143..2ac114270513 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp @@ -445,6 +445,8 @@ Y_UNIT_TEST_SUITE(WithSDK) { }; { + bool committed = false; + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { for (auto & m: x.GetMessages()) { if (x.GetPartitionSession()->GetPartitionId() == 0 && m.GetOffset() == 1) { @@ -456,6 +458,7 @@ Y_UNIT_TEST_SUITE(WithSDK) { setup.Write("message-1-2", 1); } else if (x.GetPartitionSession()->GetPartitionId() == 1 && m.GetOffset() == 0) { m.Commit(); + committed = true; return false; } } @@ -463,6 +466,8 @@ Y_UNIT_TEST_SUITE(WithSDK) { return true; }); + UNIT_ASSERT(committed); + Sleep(TDuration::Seconds(3)); // Commit hasn`t applyed because messages from the parent partitions has`t been committed From c8bd4c1179f7306a8c31d9548186528044ea9451 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 17 Apr 2025 16:40:07 +0000 Subject: [PATCH 05/14] fix --- ydb/services/persqueue_v1/actors/read_session_actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 611befb9196a..16352cc0407f 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -1278,7 +1278,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartit } std::unordered_set notCommitedToFinishParents; - for (auto& parent: topic.PartitionGraph->GetPartition(record.GetPartition())->DirectParents) { + for (auto& parent: partitionNode->DirectParents) { for (auto& [_, actorInfo]: Partitions) { // TODO: map if (actorInfo.Partition.Partition == parent->Id && !actorInfo.IsLastOffsetCommitted()) { notCommitedToFinishParents.emplace(actorInfo.Partition.Partition); @@ -1286,7 +1286,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartit } } - const auto& parentPartitions = topic.PartitionGraph->GetPartition(partitionId.Partition)->AllParents; + const auto& parentPartitions = partitionNode->AllParents; const auto database = Request->GetDatabaseName().GetOrElse(AppData(ctx)->PQConfig.GetDatabase()); const TActorId actorId = ctx.Register(new TPartitionActor( ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(), From 29d43e34b3b8a2464766e99476b0dd137da0d7d9 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 18 Apr 2025 06:22:41 +0000 Subject: [PATCH 06/14] move tests to commitoffset_ut.cpp --- .../ut/ut_with_sdk/autoscaling_ut.cpp | 493 --------------- .../ut/ut_with_sdk/commitoffset_ut.cpp | 581 ++++++++++++++++++ .../persqueue/ut/ut_with_sdk/topic_ut.cpp | 315 ---------- ydb/core/persqueue/ut/ut_with_sdk/ya.make | 1 + 4 files changed, 582 insertions(+), 808 deletions(-) create mode 100644 ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index 48b3de7f3d43..39dac6d578cf 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -622,98 +622,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { readSession2->Close(); } - Y_UNIT_TEST(PartitionSplit_OffsetCommit) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - TTopicClient client = setup.MakeClient(); - - setup.Write("message-1", 0); - setup.Write("message-2", 0); - setup.Write("message-3", 0); - setup.Write("message-4", 0); - setup.Write("message-5", 0); - setup.Write("message-6", 0); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3); - } - - setup.Write("message-7", 1); - setup.Write("message-8", 1); - setup.Write("message-9", 1); - setup.Write("message-10", 1); - - { - ui64 txId = 1007; - SplitPartition(setup, ++txId, 1, "0"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 5); - } - - setup.Write("message-11", 3); - setup.Write("message-12", 3); - - auto assertCommittedOffset = [&](size_t partition, size_t expectedOffset, const std::string& msg = "") { - auto description = setup.DescribeConsumer(); - auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - UNIT_ASSERT_VALUES_EQUAL_C(expectedOffset, stats->GetCommittedOffset(), "Partition " << partition << ": " << msg); - }; - - { - static constexpr size_t commited = 2; - auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, commited); - UNIT_ASSERT(status.IsSuccess()); - - assertCommittedOffset(0, 6, "Must be commited to the partition end because it is the parent"); - assertCommittedOffset(1, commited); - assertCommittedOffset(3, 0); - } - - { - static constexpr size_t commited = 3; - auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, commited); - UNIT_ASSERT(status.IsSuccess()); - - assertCommittedOffset(0, commited); - assertCommittedOffset(1, 0, "Must be commited to the partition begin because it is the child"); - assertCommittedOffset(3, 0); - } - } - - Y_UNIT_TEST(CommitTopPast) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - - TTopicClient client = setup.MakeClient(); - - setup.Write("message_1", 0); - setup.Write("message_2", 0); - - ui64 txId = 1023; - SplitPartition(setup, ++txId, 0, "a"); - - auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer has just started reading the inactive partition and he can commit"); - - status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 1).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages forward."); - - status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages back."); - - status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 2).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit at the end of the inactive partition."); - - status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit an offset for inactive, read-to-the-end partitions."); - } - Y_UNIT_TEST(ControlPlane_CreateAlterDescribe) { auto autoscalingTestTopic = "autoscalit-topic"; TTopicSdkTestSetup setup = CreateSetup(); @@ -932,407 +840,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST); } - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - TTopicClient client = setup.MakeClient(); - - setup.Write("message-1", 0, "producer-1", 1); - setup.Write("message-2", 0, "producer-1", 2); - setup.Write("message-3", 0, "producer-1", 3); - setup.Write("message-4", 0, "producer-1", 4); - setup.Write("message-5", 0, "producer-1", 5); - setup.Write("message-6", 0, "producer-2", 6); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3); - } - - setup.Write("message-7", 1, "producer-1", 7); - setup.Write("message-8", 1, "producer-1", 8); - setup.Write("message-9", 1, "producer-1", 9); - setup.Write("message-10", 1, "producer-2", 10); - - { - ui64 txId = 1007; - SplitPartition(setup, ++txId, 1, "0"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 5); - } - - auto count = 0; - const auto expected = 10; - - auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - ++count; - auto& message = messages[i]; - Cerr << "SESSION EVENT read message: " << count << " from partition: " << message.GetPartitionSession()->GetPartitionId() << Endl << Flush; - message.Commit(); - } - - return true; - }); - - UNIT_ASSERT(result.Timeout); - UNIT_ASSERT_VALUES_EQUAL(count, expected); - - auto description = setup.DescribeConsumer(); - UNIT_ASSERT(description.GetPartitions().size() == 5); - - auto stats1 = description.GetPartitions().at(1).GetPartitionConsumerStats(); - UNIT_ASSERT(stats1); - UNIT_ASSERT(stats1->GetCommittedOffset() == 4); - } - - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_ChildFirst) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - - TTopicClient client = setup.MakeClient(); - - setup.Write("message-1", 0, "producer-1", 1); - setup.Write("message-2", 0, "producer-1", 2); - setup.Write("message-3", 0, "producer-1", 3); - setup.Write("message-4", 0, "producer-1", 4); - setup.Write("message-5", 0, "producer-1", 5); - setup.Write("message-6", 0, "producer-2", 6); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3); - } - - setup.Write("message-7", 1, "producer-1", 7); - setup.Write("message-8", 1, "producer-1", 8); - setup.Write("message-9", 1, "producer-1", 9); - setup.Write("message-10", 1, "producer-2", 10); - - { - ui64 txId = 1007; - SplitPartition(setup, ++txId, 1, "0"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 5); - } - - - auto count = 0; - const auto expected = 10; - - std::vector partition0Messages; - - auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - auto& message = messages[i]; - count++; - int partitionId = message.GetPartitionSession()->GetPartitionId(); - Cerr << "SESSION EVENT read message: " << count << " from partition: " << partitionId << Endl << Flush; - if (partitionId == 1) { - // Commit messages from partition 1 immediately - message.Commit(); - } else if (partitionId == 0) { - // Store messages from partition 0 for later - partition0Messages.push_back(message); - } - } - - return true; - }); - - UNIT_ASSERT(result.Timeout); - UNIT_ASSERT_VALUES_EQUAL(count, expected); - - Sleep(TDuration::Seconds(5)); - - { - auto description = setup.DescribeConsumer(); - auto stats = description.GetPartitions().at(1).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - - // Messages in the parent partition hasn't been committed - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 0); - } - - for (auto& message : partition0Messages) { - message.Commit(); - } - - Sleep(TDuration::Seconds(5)); - - { - auto description = setup.DescribeConsumer(); - auto stats = description.GetPartitions().at(1).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 4); - } - } - - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckSessionResetAfterCommit) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - - TTopicClient client = setup.MakeClient(); - - auto seqNo = 1; - - setup.Write("message-1", 0, "producer-1", seqNo++); - setup.Write("message-2", 0, "producer-1", seqNo++); - setup.Write("message-3", 0, "producer-1", seqNo++); - setup.Write("message-4", 0, "producer-1", seqNo++); - setup.Write("message-5", 0, "producer-1", seqNo++); - setup.Write("message-6", 0, "producer-2", seqNo++); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3); - } - - setup.Write("message-7", 1, "producer-2", seqNo++); - setup.Write("message-8", 1, "producer-2", seqNo++); - - std::vector counters; - counters.resize(seqNo - 1); - - auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - auto& message = messages[i]; - message.Commit(); - Cerr << "SESSION EVENT READ SeqNo: " << message.GetSeqNo() << Endl << Flush; - auto count = ++counters[message.GetSeqNo() - 1]; - - // check we get this SeqNo two times - if (message.GetSeqNo() == 6 && count == 1) { - Sleep(TDuration::MilliSeconds(300)); - auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3); - UNIT_ASSERT(status.IsSuccess()); - } - } - - return true; - }); - - UNIT_ASSERT_VALUES_EQUAL_C(1, counters[0], TStringBuilder() << "Message must be read 1 times because reset commit to offset 3, but 0 message has been read " << counters[0] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(1, counters[1], TStringBuilder() << "Message must be read 1 times because reset commit to offset 3, but 1 message has been read " << counters[1] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(1, counters[2], TStringBuilder() << "Message must be read 1 times because reset commit to offset 3, but 2 message has been read " << counters[2] << " times") ; - - UNIT_ASSERT_VALUES_EQUAL_C(2, counters[3], TStringBuilder() << "Message 1 must be read two times, but 3 message has been read " << counters[3] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(2, counters[4], TStringBuilder() << "Message 1 must be read two times, but 4 message has been read " << counters[4] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(2, counters[5], TStringBuilder() << "Message 1 must be read two times, but 5 message has been read " << counters[5] << " times") ; - - { - auto s = result.StartPartitionSessionEvents[0]; - UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId()); - UNIT_ASSERT_VALUES_EQUAL(0, s.GetCommittedOffset()); - UNIT_ASSERT_VALUES_EQUAL(6, s.GetEndOffset()); - } - { - auto s = result.StartPartitionSessionEvents[3]; - UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId()); - UNIT_ASSERT_VALUES_EQUAL(3, s.GetCommittedOffset()); - UNIT_ASSERT_VALUES_EQUAL(6, s.GetEndOffset()); - } - } - - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_SplitedTopic) { - TTopicSdkTestSetup setup = CreateSetup(); - TTopicClient client = setup.MakeClient(); - - setup.CreateTopicWithAutoscale(); - - auto commit = [&](const std::string& sessionId, ui64 offset) { - return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId); - }; - - auto getConsumerState = [&](ui32 partition) { - auto description = setup.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER); - - auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - return stats; - }; - - setup.Write("message-1", 0, "producer-1", 1); - setup.Write("message-2", 0, "producer-1", 2); - setup.Write("message-3", 0, "producer-1", 3); - setup.Write("message-4", 0, "producer-1", 4); - setup.Write("message-5", 0, "producer-1", 5); - setup.Write("message-6", 0, "producer-1", 6); - setup.Write("message-7", 0, "producer-1", 7); - setup.Write("message-8", 0, "producer-2", 8); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync(); - UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3); - } - - setup.Write("message-9", 1, "producer-2", 9); - setup.Write("message-10", 1, "producer-2", 10); - - auto commitSent = false; - TString readSessionId = ""; - - setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - auto& message = messages[i]; - Cerr << "SESSION EVENT READ SeqNo: " << message.GetSeqNo() << Endl << Flush; - - if (commitSent) { - // read session not changed - UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId()); - } - - // check we NOT get this SeqNo two times - if (message.GetSeqNo() == 6) { - if (!commitSent) { - commitSent = true; - - readSessionId = message.GetPartitionSession()->GetReadSessionId(); - - { - auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 8); - UNIT_ASSERT(status.IsSuccess()); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - - { - // must be ignored, because commit to past - auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 0); - UNIT_ASSERT(status.IsSuccess()); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - - { - // must be ignored, because wrong sessionid - auto status = commit("random session", 0); - UNIT_ASSERT(!status.IsSuccess()); - - Sleep(TDuration::MilliSeconds(500)); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - } else { - UNIT_ASSERT(false); - } - } else { - message.Commit(); - } - } - - return true; - }); - } - - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_NotSplitedTopic) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - TTopicClient client = setup.MakeClient(); - - auto commit = [&](const std::string& sessionId, ui64 offset) { - return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId); - }; - - auto getConsumerState = [&](ui32 partition) { - auto description = setup.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER); - - auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - return stats; - }; - - setup.Write("message-1", 0, "producer-1", 1); - setup.Write("message-2", 0, "producer-1", 2); - setup.Write("message-3", 0, "producer-1", 3); - setup.Write("message-4", 0, "producer-1", 4); - setup.Write("message-5", 0, "producer-1", 5); - setup.Write("message-6", 0, "producer-1", 6); - setup.Write("message-7", 0, "producer-1", 7); - setup.Write("message-8", 0, "producer-2", 8); - - auto commitSent = false; - TString readSessionId = ""; - - setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - auto& message = messages[i]; - - if (commitSent) { - // read session not changed - UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId()); - } - - // check we NOT get this SeqNo two times - if (message.GetSeqNo() == 6) { - if (!commitSent) { - commitSent = true; - readSessionId = message.GetPartitionSession()->GetReadSessionId(); - - Sleep(TDuration::MilliSeconds(300)); - - { - auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 8); - UNIT_ASSERT(status.IsSuccess()); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - - { - // must be ignored, because commit to past - auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 0); - UNIT_ASSERT(status.IsSuccess()); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - - { - // must be ignored, because wrong sessionid - auto status = commit("random session", 0); - UNIT_ASSERT(!status.IsSuccess()); - - Sleep(TDuration::MilliSeconds(500)); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - } else { - UNIT_ASSERT(false); - } - } else { - message.Commit(); - } - } - - return true; - }); - } - Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) { TTopicSdkTestSetup setup = CreateSetup(); TTopicClient client = setup.MakeClient(); diff --git a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp new file mode 100644 index 000000000000..8daab9999d14 --- /dev/null +++ b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp @@ -0,0 +1,581 @@ +#include + +#include + +#include +#include +#include +#include +#include + +#include + +namespace NKikimr { + +using namespace NYdb::NTopic; +using namespace NYdb::NTopic::NTests; +using namespace NSchemeShardUT_Private; +using namespace NKikimr::NPQ::NTest; + +Y_UNIT_TEST_SUITE(CommitOffset) { + + void PrepareFlatTopic(TTopicSdkTestSetup& setup) { + setup.CreateTopic(); + + setup.Write("message-1"); + setup.Write("message-2"); + setup.Write("message-3"); + } + + void PrepareAutopartitionedTopic(TTopicSdkTestSetup& setup) { + setup.CreateTopicWithAutoscale(); + + // Creating partition hierarchy + // 0 ──┬──> 1 ──┬──> 3 + // │ └──> 4 + // └──> 2 + // + // Each partition has 3 messages + + setup.Write("message-0-1", 0); + setup.Write("message-0-2", 0); + setup.Write("message-0-3", 0); + + { + ui64 txId = 1006; + SplitPartition(setup, ++txId, 0, "a"); + } + + setup.Write("message-1-1", 1); + setup.Write("message-1-2", 1); + setup.Write("message-1-3", 1); + + setup.Write("message-2-1", 2); + setup.Write("message-2-2", 2); + setup.Write("message-2-3", 2); + + { + ui64 txId = 1007; + SplitPartition(setup, ++txId, 1, "0"); + } + + setup.Write("message-3-1", 3); + setup.Write("message-3-2", 3); + setup.Write("message-3-3", 3); + + setup.Write("message-4-1", 4); + setup.Write("message-4-2", 4); + setup.Write("message-4-3", 4); + } + + ui64 GetCommittedOffset(TTopicSdkTestSetup& setup, size_t partition) { + auto description = setup.DescribeConsumer(); + auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats(); + UNIT_ASSERT(stats); + return stats->GetCommittedOffset(); + } + + Y_UNIT_TEST(Commit_Flat_WithWrongSession) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareFlatTopic(setup); + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1, "wrong-read-session-id"); + UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0)); + } + } + + Y_UNIT_TEST(Commit_Flat_WithWrongSession_ToPast) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareFlatTopic(setup); + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 2); + UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); + UNIT_ASSERT_VALUES_EQUAL(2, GetCommittedOffset(setup, 0)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id"); + UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); + UNIT_ASSERT_VALUES_EQUAL(2, GetCommittedOffset(setup, 0)); + } + } + + Y_UNIT_TEST(Commit_WithoutSession_TopPast) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer has just started reading the inactive partition and he can commit"); + + status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages forward."); + + status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages back."); + + status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit at the end of the inactive partition."); + + status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit an offset for inactive, read-to-the-end partitions."); + } + + Y_UNIT_TEST(Commit_WithWrongSession_ToParent) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + setup.CreateTopicWithAutoscale(); + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 0)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); + UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id"); + UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); + UNIT_ASSERT_VALUES_EQUAL_C(1, GetCommittedOffset(setup, 0), "Offset doesn`t changed"); + } + } + + Y_UNIT_TEST(Commit_WithoutSession_ParentNotFinished) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + // Commit parent partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + } + + Y_UNIT_TEST(Commit_WithoutSession_ToPastParentPartition) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + // Commit child partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + } + + Y_UNIT_TEST(Commit_WithSession_ParentNotFinished) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + // Commit parent partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + { + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) { + return false; + }); + + // Commit parent to middle + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId()); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + } + + Y_UNIT_TEST(Commit_WithSession_ToPastParentPartition) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + // Commit parent partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + { + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) { + return false; + }); + + // Commit parent to middle + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId()); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + } + + Y_UNIT_TEST(Commit_FromSession_ToNewChild_WithoutCommitToParent) { + TTopicSdkTestSetup setup = CreateSetup(); + setup.CreateTopicWithAutoscale(); + + setup.Write("message-0-0", 0); + setup.Write("message-0-1", 0); + setup.Write("message-0-2", 0); + + { + bool committed = false; + + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + for (auto & m: x.GetMessages()) { + if (x.GetPartitionSession()->GetPartitionId() == 0 && m.GetOffset() == 1) { + ui64 txId = 1006; + SplitPartition(setup, ++txId, 0, "a"); + + setup.Write("message-1-0", 1); + setup.Write("message-1-1", 1); + setup.Write("message-1-2", 1); + } else if (x.GetPartitionSession()->GetPartitionId() == 1 && m.GetOffset() == 0) { + m.Commit(); + committed = true; + return false; + } + } + + return true; + }); + + UNIT_ASSERT(committed); + + Sleep(TDuration::Seconds(3)); + + // Commit hasn`t applyed because messages from the parent partitions has`t been committed + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + } + } + + Y_UNIT_TEST(PartitionSplit_OffsetCommit) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + static constexpr size_t commited = 2; + auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, commited); + UNIT_ASSERT(status.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL_C(3, GetCommittedOffset(setup, 0), "Must be commited to the partition end because it is the parent"); + UNIT_ASSERT_VALUES_EQUAL(commited, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + } + + { + static constexpr size_t commited = 3; + auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, commited); + UNIT_ASSERT(status.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(commited, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL_C(0, GetCommittedOffset(setup, 1), "Must be commited to the partition begin because it is the child"); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + + } + } + + Y_UNIT_TEST(DistributedTxCommit) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto count = 0; + const auto expected = 15; + + auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + ++count; + auto& message = messages[i]; + Cerr << "SESSION EVENT read message: " << count << " from partition: " << message.GetPartitionSession()->GetPartitionId() << Endl << Flush; + message.Commit(); + } + + return true; + }); + + UNIT_ASSERT(result.Timeout); + UNIT_ASSERT_VALUES_EQUAL(count, expected); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 4)); + } + + Y_UNIT_TEST(DistributedTxCommit_ChildFirst) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto count = 0; + const auto expected = 15; + + std::vector partition0Messages; + + auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + count++; + int partitionId = message.GetPartitionSession()->GetPartitionId(); + Cerr << "SESSION EVENT read message: " << count << " from partition: " << partitionId << Endl << Flush; + if (partitionId == 1) { + // Commit messages from partition 1 immediately + message.Commit(); + } else if (partitionId == 0) { + // Store messages from partition 0 for later + partition0Messages.push_back(message); + } + } + + return true; + }); + + UNIT_ASSERT(result.Timeout); + UNIT_ASSERT_VALUES_EQUAL(count, expected); + + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + + for (auto& message : partition0Messages) { + message.Commit(); + } + + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + Y_UNIT_TEST(DistributedTxCommit_CheckSessionResetAfterCommit) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + std::unordered_map counters; + + auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + message.Commit(); + + auto count = ++counters[message.GetData()]; + + // check we get this SeqNo two times + if (message.GetData() == "message-0-3" && count == 1) { + Sleep(TDuration::MilliSeconds(300)); + auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT(status.IsSuccess()); + } + } + + return true; + }); + + UNIT_ASSERT_VALUES_EQUAL_C(1, counters["message-0-1"], "Message must be read 1 times because reset commit to offset 3, but 0 message has been read " << counters["message-0-1"] << " times") ; + UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-2"], TStringBuilder() << "Message must be read 2 times because reset commit to offset 1, but 1 message has been read " << counters["message-0-2"] << " times") ; + UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-3"], TStringBuilder() << "Message must be read 2 times because reset commit to offset 1, but 2 message has been read " << counters["message-0-3"] << " times") ; + + { + auto s = result.StartPartitionSessionEvents[0]; + UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId()); + UNIT_ASSERT_VALUES_EQUAL(0, s.GetCommittedOffset()); + UNIT_ASSERT_VALUES_EQUAL(3, s.GetEndOffset()); + } + { + auto s = result.StartPartitionSessionEvents[3]; + UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId()); + UNIT_ASSERT_VALUES_EQUAL(1, s.GetCommittedOffset()); + UNIT_ASSERT_VALUES_EQUAL(3, s.GetEndOffset()); + } + } + + Y_UNIT_TEST(DistributedTxCommit_CheckOffsetCommitForDifferentCases) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto commit = [&](const std::string& sessionId, ui64 offset) { + return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId); + }; + + auto commitSent = false; + TString readSessionId = ""; + + setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + if (commitSent) { + // read session not changed + UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId()); + } + + // check we NOT get this SeqNo two times + if (message.GetData() == "message-0-2") { + if (!commitSent) { + commitSent = true; + + readSessionId = message.GetPartitionSession()->GetReadSessionId(); + + { + auto status = commit(readSessionId, 3); + UNIT_ASSERT(status.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + + { + // must be ignored, because commit to past + auto status = commit(readSessionId, 0); + UNIT_ASSERT(status.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + + { + // must be ignored, because wrong sessionid + auto status = commit("random session", 0); + UNIT_ASSERT(!status.IsSuccess()); + Sleep(TDuration::MilliSeconds(500)); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + } else { + UNIT_ASSERT(false); + } + } else { + message.Commit(); + } + } + + return true; + }); + } + + Y_UNIT_TEST(DistributedTxCommit_Flat_CheckOffsetCommitForDifferentCases) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareFlatTopic(setup); + + auto commit = [&](const std::string& sessionId, ui64 offset) { + return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId); + }; + + auto commitSent = false; + TString readSessionId = ""; + + setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + + if (commitSent) { + // read session not changed + UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId()); + } + + // check we NOT get this message two times + if (message.GetData() == "message-0-2") { + if (!commitSent) { + commitSent = true; + + readSessionId = message.GetPartitionSession()->GetReadSessionId(); + + { + auto status = commit(readSessionId, 3); + UNIT_ASSERT(status.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + + { + // must be ignored, because commit to past + auto status = commit(readSessionId, 0); + UNIT_ASSERT(status.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + + { + // must be ignored, because wrong sessionid + auto status = commit("random session", 0); + UNIT_ASSERT(!status.IsSuccess()); + Sleep(TDuration::MilliSeconds(500)); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + } else { + UNIT_ASSERT(false); + } + } else { + message.Commit(); + } + } + + return true; + }); + } +} + +} // namespace NKikimr diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp index 2ac114270513..aadf1ad16276 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp @@ -162,321 +162,6 @@ Y_UNIT_TEST_SUITE(WithSDK) { UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset()); } } - - void PrepareFlatTopic(TTopicSdkTestSetup& setup) { - setup.CreateTopic(); - - setup.Write("message-1"); - setup.Write("message-2"); - setup.Write("message-3"); - } - - void PrepareAutopartitionedTopic(TTopicSdkTestSetup& setup) { - setup.CreateTopicWithAutoscale(); - - // Creating partition hierarchy - // 0 ──┬──> 1 ──┬──> 3 - // │ └──> 4 - // └──> 2 - // - // Each partition has 3 messages - - setup.Write("message-0-1", 0); - setup.Write("message-0-2", 0); - setup.Write("message-0-3", 0); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - } - - setup.Write("message-1-1", 1); - setup.Write("message-1-2", 1); - setup.Write("message-1-3", 1); - - setup.Write("message-2-1", 2); - setup.Write("message-2-2", 2); - setup.Write("message-2-3", 2); - - { - ui64 txId = 1007; - SplitPartition(setup, ++txId, 1, "0"); - } - - setup.Write("message-3-1", 3); - setup.Write("message-3-2", 3); - setup.Write("message-3-3", 3); - - setup.Write("message-4-1", 4); - setup.Write("message-4-2", 4); - setup.Write("message-4-3", 4); - } - - Y_UNIT_TEST(CommitWithWrongSessionId) { - TTopicSdkTestSetup setup = CreateSetup(); - PrepareFlatTopic(setup); - - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 1, "wrong-read-session-id"); - UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(0, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset()); - } - } - - Y_UNIT_TEST(CommitToPastWithWrongSessionId) { - TTopicSdkTestSetup setup = CreateSetup(); - PrepareFlatTopic(setup); - - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 2); - UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(2, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset()); - } - - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 0, "wrong-read-session-id"); - UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(2, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset()); - } - } - - Y_UNIT_TEST(Commit_ToParentPartitionWithWrongSessionId) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - - setup.Write("message-1", 0); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - } - - setup.Write("message-2", 1); - - Cerr << ">>>>> BEGIN 0" << Endl << Flush; - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 1); - UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(1, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset()); - } - - Cerr << ">>>>> BEGIN 1" << Endl << Flush; - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 1, 1); - UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(1, desc.GetPartitions().at(1).GetPartitionConsumerStats()->GetCommittedOffset()); - } - - Cerr << ">>>>> BEGIN 2" << Endl << Flush; - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 0, "wrong-read-session-id"); - UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL_C(1, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset(), "Offset doesn`t changed"); - } - Cerr << ">>>>> END" << Endl << Flush; - - } - - Y_UNIT_TEST(Commit_WithoutSession_ParentNotFinished) { - TTopicSdkTestSetup setup = CreateSetup(); - PrepareAutopartitionedTopic(setup); - - auto getCommittedOffset = [&](size_t partition) { - auto desc = setup.DescribeConsumer(); - return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); - }; - - { - // Commit parent partition to non end - auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); - UNIT_ASSERT(result.IsSuccess()); - - UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(0)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(1)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); - } - - { - auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); - UNIT_ASSERT(result.IsSuccess()); - - UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); - UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(1)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); - } - } - - Y_UNIT_TEST(Commit_WithoutSession_ToPastParentPartition) { - TTopicSdkTestSetup setup = CreateSetup(); - PrepareAutopartitionedTopic(setup); - - auto getCommittedOffset = [&](size_t partition) { - auto desc = setup.DescribeConsumer(); - return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); - }; - - { - // Commit child partition to non end - auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1); - UNIT_ASSERT(result.IsSuccess()); - - UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); - UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(1)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); - UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(3)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); - } - - { - auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); - UNIT_ASSERT(result.IsSuccess()); - - UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); - UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(1)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); - } - } - - Y_UNIT_TEST(Commit_WithSession_ParentNotFinished) { - TTopicSdkTestSetup setup = CreateSetup(); - PrepareAutopartitionedTopic(setup); - - auto getCommittedOffset = [&](size_t partition) { - auto desc = setup.DescribeConsumer(); - return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); - }; - - { - // Commit parent partition to non end - auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); - UNIT_ASSERT(result.IsSuccess()); - - UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(0)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(1)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); - } - - { - auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) { - return false; - }); - - // Commit parent to middle - auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId()); - UNIT_ASSERT(result.IsSuccess()); - - UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); - UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(1)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(3)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); - } - } - - Y_UNIT_TEST(Commit_WithSession_ToPastParentPartition) { - TTopicSdkTestSetup setup = CreateSetup(); - PrepareAutopartitionedTopic(setup); - - auto getCommittedOffset = [&](size_t partition) { - auto desc = setup.DescribeConsumer(); - return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); - }; - - { - // Commit parent partition to non end - auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1); - UNIT_ASSERT(result.IsSuccess()); - - UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); - UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(1)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); - UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(3)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); - } - - { - auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) { - return false; - }); - - // Commit parent to middle - auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId()); - UNIT_ASSERT(result.IsSuccess()); - - UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(0)); - UNIT_ASSERT_VALUES_EQUAL(3, getCommittedOffset(1)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); - UNIT_ASSERT_VALUES_EQUAL(1, getCommittedOffset(3)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(4)); - } - } - - Y_UNIT_TEST(Commit_ToNewChild_WithoutCommitToParent) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - - setup.Write("message-0-0", 0); - setup.Write("message-0-1", 0); - setup.Write("message-0-2", 0); - - auto getCommittedOffset = [&](size_t partition) { - auto desc = setup.DescribeConsumer(); - return desc.GetPartitions().at(partition).GetPartitionConsumerStats()->GetCommittedOffset(); - }; - - { - bool committed = false; - - auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - for (auto & m: x.GetMessages()) { - if (x.GetPartitionSession()->GetPartitionId() == 0 && m.GetOffset() == 1) { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - setup.Write("message-1-0", 1); - setup.Write("message-1-1", 1); - setup.Write("message-1-2", 1); - } else if (x.GetPartitionSession()->GetPartitionId() == 1 && m.GetOffset() == 0) { - m.Commit(); - committed = true; - return false; - } - } - - return true; - }); - - UNIT_ASSERT(committed); - - Sleep(TDuration::Seconds(3)); - - // Commit hasn`t applyed because messages from the parent partitions has`t been committed - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(0)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(1)); - UNIT_ASSERT_VALUES_EQUAL(0, getCommittedOffset(2)); - } - } - } } // namespace NKikimr diff --git a/ydb/core/persqueue/ut/ut_with_sdk/ya.make b/ydb/core/persqueue/ut/ut_with_sdk/ya.make index eb370dff0244..c837ecdfa649 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/ya.make +++ b/ydb/core/persqueue/ut/ut_with_sdk/ya.make @@ -31,6 +31,7 @@ YQL_LAST_ABI_VERSION() SRCS( autoscaling_ut.cpp balancing_ut.cpp + commitoffset_ut.cpp mirrorer_ut.cpp topic_ut.cpp ) From aa6bba87ee5206ea491a21bd4de28e163d04ff1b Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 18 Apr 2025 10:25:34 +0000 Subject: [PATCH 07/14] one case hase been fixed --- ydb/core/persqueue/partition.cpp | 142 +++++++++++------- ydb/core/persqueue/partition.h | 2 +- .../ut/ut_with_sdk/commitoffset_ut.cpp | 10 +- .../actors/commit_offset_actor.cpp | 38 +++-- 4 files changed, 126 insertions(+), 66 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 9a4ea983ba22..2999ad4626e1 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2343,6 +2343,74 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, return true; } +std::pair TPartition::ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation) { + const TString& consumer = operation.GetConsumer(); + + if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "' has been removed"); + return {false, false}; + } + + if (!UsersInfoStorage->GetIfExists(consumer)) { + PQ_LOG_D("Partition " << Partition << + " Unknown consumer '" << consumer << "'"); + return {false, false}; + } + + TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); + + PQ_LOG_D("CommitOperation Partition " << Partition << + " Consumer '" << consumer << "'" << + " RequestSessionId '" << operation.GetReadSessionId() << + "' CurrentSessionId '" << userInfo.Session << + "' OffsetBegin " << operation.GetCommitOffsetsBegin() << + "' OffsetEnd " << operation.GetCommitOffsetsEnd() << + " OnlyCheckCommitedToFinish " << operation.GetOnlyCheckCommitedToFinish() << + " KillReadSession " << operation.GetKillReadSession() + ); + + if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (session already dead) " << + " RequestSessionId '" << operation.GetReadSessionId() << + "' CurrentSessionId '" << userInfo.Session << + "'"); + return {false, false}; + } else if (operation.GetOnlyCheckCommitedToFinish()) { + if (IsActive() || static_cast(userInfo.Offset) != EndOffset) { + return {false, false}; + } else { + return {true, false}; + } + } else { + if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (invalid range) " << + " Begin " << operation.GetCommitOffsetsBegin() << + " End " << operation.GetCommitOffsetsEnd()); + return {false, true}; + } else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (gap) " << + " Offset " << userInfo.Offset << + " Begin " << operation.GetCommitOffsetsBegin()); + return {false, true}; + } else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (behind the last offset) " << + " EndOffset " << EndOffset << + " End " << operation.GetCommitOffsetsEnd()); + return {false, true}; + } + return {true, true}; + } +} + TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe& predicateOut) { if (tx.ForcePredicateFalse) { @@ -2361,60 +2429,13 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr return EProcessResult::Blocked; } - if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "' has been removed"); - result = false; - break; - } - - if (!UsersInfoStorage->GetIfExists(consumer)) { - PQ_LOG_D("Partition " << Partition << - " Unknown consumer '" << consumer << "'"); - result = false; - break; - } + auto [r, real] = ValidatePartitionOperation(operation); + result = r; - bool isAffectedConsumer = AffectedUsers.contains(consumer); - TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); - - if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "'" << - " Bad request (session already dead) " << - " RequestSessionId '" << operation.GetReadSessionId() << - " CurrentSessionId '" << userInfo.Session << - "'"); - result = false; - } else if (operation.GetOnlyCheckCommitedToFinish()) { - if (IsActive() || static_cast(userInfo.Offset) != EndOffset) { - result = false; - } - } else { - if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "'" << - " Bad request (invalid range) " << - " Begin " << operation.GetCommitOffsetsBegin() << - " End " << operation.GetCommitOffsetsEnd()); - result = false; - } else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "'" << - " Bad request (gap) " << - " Offset " << userInfo.Offset << - " Begin " << operation.GetCommitOffsetsBegin()); - result = false; - } else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "'" << - " Bad request (behind the last offset) " << - " EndOffset " << EndOffset << - " End " << operation.GetCommitOffsetsEnd()); - result = false; - } + if (real) { + if (!r) { + bool isAffectedConsumer = AffectedUsers.contains(consumer); - if (!result) { if (!isAffectedConsumer) { AffectedUsers.erase(consumer); } @@ -2422,7 +2443,11 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr } consumers.insert(consumer); } + if (!r) { + break; + } } + if (result) { TxAffectedConsumers.insert(consumers.begin(), consumers.end()); } @@ -2586,6 +2611,9 @@ void TPartition::CommitTransaction(TSimpleSharedPtr& t) Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate); for (auto& operation : t->Tx->Operations) { + + Cerr << ">>>>> CommitTransaction " << Endl << Flush; + if (operation.GetOnlyCheckCommitedToFinish()) { continue; } @@ -2913,6 +2941,16 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE "incorrect offset range (begin > end)"); return EProcessResult::ContinueDrop; } + + auto [r, _] = ValidatePartitionOperation(operation); + if (!r) { + ScheduleReplyPropose(tx, + NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST, + NKikimrPQ::TError::BAD_REQUEST, + "incorrect request"); + return EProcessResult::ContinueDrop; + } + consumers.insert(user); } SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end()); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 358260cc2459..e01c941a821d 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -273,7 +273,7 @@ class TPartition : public TActorBootstrapped { void ConsumeBlobQuota(); void UpdateAfterWriteCounters(bool writeComplete); - + std::pair ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation); void UpdateUserInfoEndOffset(const TInstant& now); void UpdateWriteBufferIsFullState(const TInstant& now); diff --git a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp index 8daab9999d14..35b7be828385 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp @@ -97,9 +97,11 @@ Y_UNIT_TEST_SUITE(CommitOffset) { } { + Cerr << ">>>>>> BEGIN" << Endl << Flush; auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id"); UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); UNIT_ASSERT_VALUES_EQUAL(2, GetCommittedOffset(setup, 0)); + Cerr << ">>>>>> END" << Endl << Flush; } } @@ -137,13 +139,15 @@ Y_UNIT_TEST_SUITE(CommitOffset) { { auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); } { auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id"); UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); - UNIT_ASSERT_VALUES_EQUAL_C(1, GetCommittedOffset(setup, 0), "Offset doesn`t changed"); + UNIT_ASSERT_VALUES_EQUAL_C(3, GetCommittedOffset(setup, 0), "Offset doesn`t changed"); + UNIT_ASSERT_VALUES_EQUAL_C(1, GetCommittedOffset(setup, 1), "Offset doesn`t changed"); } } @@ -441,8 +445,8 @@ Y_UNIT_TEST_SUITE(CommitOffset) { }); UNIT_ASSERT_VALUES_EQUAL_C(1, counters["message-0-1"], "Message must be read 1 times because reset commit to offset 3, but 0 message has been read " << counters["message-0-1"] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-2"], TStringBuilder() << "Message must be read 2 times because reset commit to offset 1, but 1 message has been read " << counters["message-0-2"] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-3"], TStringBuilder() << "Message must be read 2 times because reset commit to offset 1, but 2 message has been read " << counters["message-0-3"] << " times") ; + UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-2"], "Message must be read 2 times because reset commit to offset 1, but 1 message has been read " << counters["message-0-2"] << " times") ; + UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-3"], "Message must be read 2 times because reset commit to offset 1, but 2 message has been read " << counters["message-0-3"] << " times") ; { auto s = result.StartPartitionSessionEvents[0]; diff --git a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp index 471162c5f9ae..415e99d676fe 100644 --- a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp +++ b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp @@ -106,24 +106,42 @@ void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAc if (partitionNode->AllParents.size() == 0 && partitionNode->DirectChildren.size() == 0) { SendCommit(topicInitInfo, commitRequest, ctx); } else { - auto killReadSession = commitRequest->read_session_id().empty(); + auto hasReadSession = !commitRequest->read_session_id().empty(); + auto killReadSession = !hasReadSession; + const TString& readSessionId = commitRequest->read_session_id(); + std::vector commits; for (auto& parent: partitionNode->AllParents) { - TDistributedCommitHelper::TCommitInfo commit {.PartitionId = parent->Id, .Offset = Max(), .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false}; + TDistributedCommitHelper::TCommitInfo commit { + .PartitionId = parent->Id, + .Offset = Max(), + .KillReadSession = killReadSession, + .OnlyCheckCommitedToFinish = false, + .ReadSessionId = readSessionId + }; commits.push_back(commit); } - for (auto& child: partitionNode->AllChildren) { - TDistributedCommitHelper::TCommitInfo commit {.PartitionId = child->Id, .Offset = 0, .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false}; - commits.push_back(commit); + if (!hasReadSession) { + for (auto& child: partitionNode->AllChildren) { + TDistributedCommitHelper::TCommitInfo commit { + .PartitionId = child->Id, + .Offset = 0, + .KillReadSession = true, + .OnlyCheckCommitedToFinish = false + }; + commits.push_back(commit); + } } - TDistributedCommitHelper::TCommitInfo commit {.PartitionId = partitionNode->Id, .Offset = commitRequest->offset(), .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false}; - - if (!commitRequest->read_session_id().empty()) { - commit.ReadSessionId = commitRequest->read_session_id(); - } + TDistributedCommitHelper::TCommitInfo commit { + .PartitionId = partitionNode->Id, + .Offset = commitRequest->offset(), + .KillReadSession = killReadSession, + .OnlyCheckCommitedToFinish = false, + .ReadSessionId = readSessionId + }; commits.push_back(commit); Kqp = std::make_unique(Request().GetDatabaseName().GetOrElse(TString()), ClientId, topic, commits); From cdd697c53c82f69b71a92b34687b2b29608228e6 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 18 Apr 2025 10:59:20 +0000 Subject: [PATCH 08/14] fix immediate --- ydb/core/persqueue/partition.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 2999ad4626e1..2277525bc087 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -3015,6 +3015,11 @@ void TPartition::ExecImmediateTx(TTransaction& t) "incorrect offset range (commit to the future)"); return; } + + if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) { + continue; // this is stale request, answer ok for it + } + pendingUserInfo.Offset = operation.GetCommitOffsetsEnd(); } CommitWriteOperations(t); From ae4c39b1a2bb898cb88773ee595c559bccb9a01d Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 18 Apr 2025 11:29:50 +0000 Subject: [PATCH 09/14] fix --- ydb/core/persqueue/partition.cpp | 23 +++++-------- .../ut/ut_with_sdk/commitoffset_ut.cpp | 32 +++++++++++++++++-- 2 files changed, 37 insertions(+), 18 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 2277525bc087..c3c050f25686 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2360,16 +2360,6 @@ std::pair TPartition::ValidatePartitionOperation(const NKikimrPQ::TP TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); - PQ_LOG_D("CommitOperation Partition " << Partition << - " Consumer '" << consumer << "'" << - " RequestSessionId '" << operation.GetReadSessionId() << - "' CurrentSessionId '" << userInfo.Session << - "' OffsetBegin " << operation.GetCommitOffsetsBegin() << - "' OffsetEnd " << operation.GetCommitOffsetsEnd() << - " OnlyCheckCommitedToFinish " << operation.GetOnlyCheckCommitedToFinish() << - " KillReadSession " << operation.GetKillReadSession() - ); - if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) { PQ_LOG_D("Partition " << Partition << " Consumer '" << consumer << "'" << @@ -2611,9 +2601,6 @@ void TPartition::CommitTransaction(TSimpleSharedPtr& t) Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate); for (auto& operation : t->Tx->Operations) { - - Cerr << ">>>>> CommitTransaction " << Endl << Flush; - if (operation.GetOnlyCheckCommitedToFinish()) { continue; } @@ -2942,7 +2929,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::ContinueDrop; } - auto [r, _] = ValidatePartitionOperation(operation); + auto [r, real] = ValidatePartitionOperation(operation); if (!r) { ScheduleReplyPropose(tx, NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST, @@ -2951,7 +2938,9 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::ContinueDrop; } - consumers.insert(user); + if (real) { + consumers.insert(user); + } } SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end()); WriteKeysSizeEstimate += consumers.size(); @@ -2975,6 +2964,10 @@ void TPartition::ExecImmediateTx(TTransaction& t) return; } for (const auto& operation : record.GetData().GetOperations()) { + if (operation.GetOnlyCheckCommitedToFinish()) { + continue; + } + if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) { continue; //Write operation - handled separately via WriteInfo } diff --git a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp index 35b7be828385..860efedb339a 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp @@ -207,7 +207,7 @@ Y_UNIT_TEST_SUITE(CommitOffset) { } } - Y_UNIT_TEST(Commit_WithSession_ParentNotFinished) { + Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_SameSession) { TTopicSdkTestSetup setup = CreateSetup(); PrepareAutopartitionedTopic(setup); @@ -224,8 +224,8 @@ Y_UNIT_TEST_SUITE(CommitOffset) { } { - auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) { - return false; + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + return x.GetMessages().back().GetData() != "message-3-3"; }); // Commit parent to middle @@ -240,6 +240,32 @@ Y_UNIT_TEST_SUITE(CommitOffset) { } } + Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_OtherSession) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + TTopicClient client(setup.MakeDriver()); + + auto createReadSession = [&](size_t partitionId) { + return client.CreateReadSession( + TReadSessionSettings() + .AutoPartitioningSupport(true) + .AppendTopics(TTopicReadSettings(TEST_TOPIC).AppendPartitionIds(partitionId)) + .ConsumerName(TEST_CONSUMER)); + }; + + auto session0 = createReadSession(0); + auto session1 = createReadSession(1); + + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, session1->GetSessionId()); + UNIT_ASSERT(!result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + Y_UNIT_TEST(Commit_WithSession_ToPastParentPartition) { TTopicSdkTestSetup setup = CreateSetup(); PrepareAutopartitionedTopic(setup); From 7d3ba0561e32522cac53a3ef1fce5f1a6e47c00f Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 18 Apr 2025 11:44:56 +0000 Subject: [PATCH 10/14] fix --- ydb/core/persqueue/partition.cpp | 34 ++++++++++++++++---------------- ydb/core/persqueue/partition.h | 2 +- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index c3c050f25686..adca023a8a1a 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2343,19 +2343,19 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, return true; } -std::pair TPartition::ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation) { +std::pair TPartition::ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation) { const TString& consumer = operation.GetConsumer(); if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) { PQ_LOG_D("Partition " << Partition << " Consumer '" << consumer << "' has been removed"); - return {false, false}; + return {TStringBuilder() << "Consumer '" << consumer << "' has been removed", false}; } if (!UsersInfoStorage->GetIfExists(consumer)) { PQ_LOG_D("Partition " << Partition << " Unknown consumer '" << consumer << "'"); - return {false, false}; + return {TStringBuilder() << "Unknown consumer '" << consumer << "'", false}; } TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); @@ -2367,12 +2367,12 @@ std::pair TPartition::ValidatePartitionOperation(const NKikimrPQ::TP " RequestSessionId '" << operation.GetReadSessionId() << "' CurrentSessionId '" << userInfo.Session << "'"); - return {false, false}; + return {"Bad request (session already dead)", false}; } else if (operation.GetOnlyCheckCommitedToFinish()) { if (IsActive() || static_cast(userInfo.Offset) != EndOffset) { - return {false, false}; + return {TStringBuilder() << "There are uncommitted messages in partition " << Partition.OriginalPartitionId, false}; } else { - return {true, false}; + return {"", false}; } } else { if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) { @@ -2381,23 +2381,23 @@ std::pair TPartition::ValidatePartitionOperation(const NKikimrPQ::TP " Bad request (invalid range) " << " Begin " << operation.GetCommitOffsetsBegin() << " End " << operation.GetCommitOffsetsEnd()); - return {false, true}; + return {"Bad request (invalid range)", true}; } else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) { PQ_LOG_D("Partition " << Partition << " Consumer '" << consumer << "'" << " Bad request (gap) " << " Offset " << userInfo.Offset << " Begin " << operation.GetCommitOffsetsBegin()); - return {false, true}; + return {"Bad request (gap)", true}; } else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) { PQ_LOG_D("Partition " << Partition << " Consumer '" << consumer << "'" << " Bad request (behind the last offset) " << " EndOffset " << EndOffset << " End " << operation.GetCommitOffsetsEnd()); - return {false, true}; + return {"Bad request (behind the last offset", true}; } - return {true, true}; + return {"", true}; } } @@ -2419,11 +2419,11 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr return EProcessResult::Blocked; } - auto [r, real] = ValidatePartitionOperation(operation); - result = r; + auto [error, real] = ValidatePartitionOperation(operation); + result = error.empty(); if (real) { - if (!r) { + if (!result) { bool isAffectedConsumer = AffectedUsers.contains(consumer); if (!isAffectedConsumer) { @@ -2433,7 +2433,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr } consumers.insert(consumer); } - if (!r) { + if (!result) { break; } } @@ -2929,12 +2929,12 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::ContinueDrop; } - auto [r, real] = ValidatePartitionOperation(operation); - if (!r) { + auto [error, real] = ValidatePartitionOperation(operation); + if (!error.empty()) { ScheduleReplyPropose(tx, NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST, NKikimrPQ::TError::BAD_REQUEST, - "incorrect request"); + error); return EProcessResult::ContinueDrop; } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index e01c941a821d..53c30c19c5c4 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -273,7 +273,7 @@ class TPartition : public TActorBootstrapped { void ConsumeBlobQuota(); void UpdateAfterWriteCounters(bool writeComplete); - std::pair ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation); + std::pair ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation); void UpdateUserInfoEndOffset(const TInstant& now); void UpdateWriteBufferIsFullState(const TInstant& now); From 7586cdea98d9a637b8eea8bb1e9bba7281ed22a0 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Fri, 18 Apr 2025 12:21:28 +0000 Subject: [PATCH 11/14] rename --- ydb/core/persqueue/partition.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index adca023a8a1a..69fee1875e1a 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2419,10 +2419,10 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr return EProcessResult::Blocked; } - auto [error, real] = ValidatePartitionOperation(operation); + auto [error, needCheckConsumer] = ValidatePartitionOperation(operation); result = error.empty(); - if (real) { + if (needCheckConsumer) { if (!result) { bool isAffectedConsumer = AffectedUsers.contains(consumer); @@ -2929,7 +2929,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::ContinueDrop; } - auto [error, real] = ValidatePartitionOperation(operation); + auto [error, needCheckConsumer] = ValidatePartitionOperation(operation); if (!error.empty()) { ScheduleReplyPropose(tx, NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST, @@ -2938,7 +2938,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::ContinueDrop; } - if (real) { + if (needCheckConsumer) { consumers.insert(user); } } From 230ac03fdcdf30a8b3d9b229bc3d81136766fa94 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Sun, 20 Apr 2025 11:08:09 +0000 Subject: [PATCH 12/14] fix --- ydb/core/persqueue/partition.cpp | 3 ++ .../ut/ut_with_sdk/commitoffset_ut.cpp | 36 +++++++++++++------ .../ut/ut_utils/topic_sdk_test_setup.cpp | 20 +++++++---- .../topic/ut/ut_utils/topic_sdk_test_setup.h | 4 ++- 4 files changed, 45 insertions(+), 18 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 69fee1875e1a..615e6359a4d6 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2361,6 +2361,9 @@ std::pair TPartition::ValidatePartitionOperation(const NKikimrPQ: TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) { + if (!IsActive() && operation.GetCommitOffsetsEnd() >= EndOffset && userInfo.Offset == i64(EndOffset)) { + return {"", false}; + } PQ_LOG_D("Partition " << Partition << " Consumer '" << consumer << "'" << " Bad request (session already dead) " << diff --git a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp index 860efedb339a..854ac14b3736 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp @@ -245,18 +245,11 @@ Y_UNIT_TEST_SUITE(CommitOffset) { PrepareAutopartitionedTopic(setup); TTopicClient client(setup.MakeDriver()); - auto createReadSession = [&](size_t partitionId) { - return client.CreateReadSession( - TReadSessionSettings() - .AutoPartitioningSupport(true) - .AppendTopics(TTopicReadSettings(TEST_TOPIC).AppendPartitionIds(partitionId)) - .ConsumerName(TEST_CONSUMER)); - }; - - auto session0 = createReadSession(0); - auto session1 = createReadSession(1); + auto r0 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 0); + auto r1 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 1); - auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, session1->GetSessionId()); + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, + r1.StartPartitionSessionEvents.back().GetPartitionSession()->GetReadSessionId()); UNIT_ASSERT(!result.IsSuccess()); UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0)); @@ -266,6 +259,27 @@ Y_UNIT_TEST_SUITE(CommitOffset) { UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); } + Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_OtherSession_ParentCommittedToEnd) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + TTopicClient client(setup.MakeDriver()); + + setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3); + + auto r0 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 0); + auto r1 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 1); + + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, + r1.StartPartitionSessionEvents.back().GetPartitionSession()->GetReadSessionId()); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + Y_UNIT_TEST(Commit_WithSession_ToPastParentPartition) { TTopicSdkTestSetup setup = CreateSetup(); PrepareAutopartitionedTopic(setup); diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 1110c0f038b1..0055dd8543cf 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -98,14 +98,22 @@ void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId, con session->Close(TDuration::Seconds(5)); } -TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function handler, const TDuration timeout) { +TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, + std::function handler, + std::optional partition, const TDuration timeout) { TTopicClient client(MakeDriver()); - auto reader = client.CreateReadSession( - TReadSessionSettings() - .AutoPartitioningSupport(true) - .AppendTopics(TTopicReadSettings(topic)) - .ConsumerName(consumer)); + auto topicSettings = TTopicReadSettings(topic); + if (partition) { + topicSettings.AppendPartitionIds(partition.value()); + } + + auto settins = TReadSessionSettings() + .AutoPartitioningSupport(true) + .AppendTopics(topicSettings) + .ConsumerName(consumer); + + auto reader = client.CreateReadSession(settins); TInstant deadlineTime = TInstant::Now() + timeout; diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index 9502b9fd4ff7..190fb3bd6241 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -32,7 +32,9 @@ class TTopicSdkTestSetup { std::vector StartPartitionSessionEvents; }; - ReadResult Read(const std::string& topic, const std::string& consumer, std::function handler, const TDuration timeout = TDuration::Seconds(5)); + ReadResult Read(const std::string& topic, const std::string& consumer, + std::function handler, + std::optional partition = std::nullopt, const TDuration timeout = TDuration::Seconds(5)); TStatus Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional sessionId = std::nullopt); TString GetEndpoint() const; From 02572056b38858752afca0dc21d4e215463323f5 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 21 Apr 2025 09:01:36 +0000 Subject: [PATCH 13/14] fix --- ydb/core/persqueue/partition.cpp | 136 +++++++----------- ydb/core/persqueue/partition.h | 1 - ydb/core/persqueue/ut/partition_ut.cpp | 7 +- .../ut/ut_with_sdk/commitoffset_ut.cpp | 2 - 4 files changed, 61 insertions(+), 85 deletions(-) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 615e6359a4d6..dc5c0e69bdec 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2343,67 +2343,6 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr& t, return true; } -std::pair TPartition::ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation) { - const TString& consumer = operation.GetConsumer(); - - if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "' has been removed"); - return {TStringBuilder() << "Consumer '" << consumer << "' has been removed", false}; - } - - if (!UsersInfoStorage->GetIfExists(consumer)) { - PQ_LOG_D("Partition " << Partition << - " Unknown consumer '" << consumer << "'"); - return {TStringBuilder() << "Unknown consumer '" << consumer << "'", false}; - } - - TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); - - if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) { - if (!IsActive() && operation.GetCommitOffsetsEnd() >= EndOffset && userInfo.Offset == i64(EndOffset)) { - return {"", false}; - } - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "'" << - " Bad request (session already dead) " << - " RequestSessionId '" << operation.GetReadSessionId() << - "' CurrentSessionId '" << userInfo.Session << - "'"); - return {"Bad request (session already dead)", false}; - } else if (operation.GetOnlyCheckCommitedToFinish()) { - if (IsActive() || static_cast(userInfo.Offset) != EndOffset) { - return {TStringBuilder() << "There are uncommitted messages in partition " << Partition.OriginalPartitionId, false}; - } else { - return {"", false}; - } - } else { - if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "'" << - " Bad request (invalid range) " << - " Begin " << operation.GetCommitOffsetsBegin() << - " End " << operation.GetCommitOffsetsEnd()); - return {"Bad request (invalid range)", true}; - } else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "'" << - " Bad request (gap) " << - " Offset " << userInfo.Offset << - " Begin " << operation.GetCommitOffsetsBegin()); - return {"Bad request (gap)", true}; - } else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "'" << - " Bad request (behind the last offset) " << - " EndOffset " << EndOffset << - " End " << operation.GetCommitOffsetsEnd()); - return {"Bad request (behind the last offset", true}; - } - return {"", true}; - } -} - TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe& predicateOut) { if (tx.ForcePredicateFalse) { @@ -2422,13 +2361,62 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr return EProcessResult::Blocked; } - auto [error, needCheckConsumer] = ValidatePartitionOperation(operation); - result = error.empty(); + if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "' has been removed"); + result = false; + break; + } + + if (!UsersInfoStorage->GetIfExists(consumer)) { + PQ_LOG_D("Partition " << Partition << + " Unknown consumer '" << consumer << "'"); + result = false; + break; + } + + bool isAffectedConsumer = AffectedUsers.contains(consumer); + TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); - if (needCheckConsumer) { - if (!result) { - bool isAffectedConsumer = AffectedUsers.contains(consumer); + if (operation.GetOnlyCheckCommitedToFinish()) { + if (IsActive() || static_cast(userInfo.Offset) != EndOffset) { + result = false; + } + } else if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) { + if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || userInfo.Offset != i64(EndOffset)) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (session already dead) " << + " RequestSessionId '" << operation.GetReadSessionId() << + " CurrentSessionId '" << userInfo.Session << + "'"); + result = false; + } + } else { + if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (invalid range) " << + " Begin " << operation.GetCommitOffsetsBegin() << + " End " << operation.GetCommitOffsetsEnd()); + result = false; + } else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (gap) " << + " Offset " << userInfo.Offset << + " Begin " << operation.GetCommitOffsetsBegin()); + result = false; + } else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (behind the last offset) " << + " EndOffset " << EndOffset << + " End " << operation.GetCommitOffsetsEnd()); + result = false; + } + if (!result) { if (!isAffectedConsumer) { AffectedUsers.erase(consumer); } @@ -2436,9 +2424,6 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr } consumers.insert(consumer); } - if (!result) { - break; - } } if (result) { @@ -2932,18 +2917,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE return EProcessResult::ContinueDrop; } - auto [error, needCheckConsumer] = ValidatePartitionOperation(operation); - if (!error.empty()) { - ScheduleReplyPropose(tx, - NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST, - NKikimrPQ::TError::BAD_REQUEST, - error); - return EProcessResult::ContinueDrop; - } - - if (needCheckConsumer) { - consumers.insert(user); - } + consumers.insert(user); } SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end()); WriteKeysSizeEstimate += consumers.size(); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 53c30c19c5c4..acf82db4d909 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -273,7 +273,6 @@ class TPartition : public TActorBootstrapped { void ConsumeBlobQuota(); void UpdateAfterWriteCounters(bool writeComplete); - std::pair ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation); void UpdateUserInfoEndOffset(const TInstant& now); void UpdateWriteBufferIsFullState(const TInstant& now); diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 3a98fd2c6870..a7d5385d6578 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -21,6 +21,11 @@ #include "make_config.h" +template<> +void Out(IOutputStream& out, NKikimrPQ::TEvProposeTransactionResult_EStatus v) { + out << NKikimrPQ::TEvProposeTransactionResult::EStatus_Name(v); +} + namespace NKikimr::NPQ { namespace NHelpers { @@ -1007,7 +1012,7 @@ void TPartitionFixture::WaitProposeTransactionResponse(const TProposeTransaction if (matcher.Status) { UNIT_ASSERT(event->Record.HasStatus()); - UNIT_ASSERT(*matcher.Status == event->Record.GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(*matcher.Status, event->Record.GetStatus()); } } diff --git a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp index 854ac14b3736..cd80af0b1ecf 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp @@ -97,11 +97,9 @@ Y_UNIT_TEST_SUITE(CommitOffset) { } { - Cerr << ">>>>>> BEGIN" << Endl << Flush; auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id"); UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); UNIT_ASSERT_VALUES_EQUAL(2, GetCommittedOffset(setup, 0)); - Cerr << ">>>>>> END" << Endl << Flush; } } From b05903af2294d6163fb3ecbc49e01c77d1d271e1 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Mon, 21 Apr 2025 11:40:19 +0000 Subject: [PATCH 14/14] fix --- ydb/core/persqueue/partition.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index dc5c0e69bdec..cd384de27343 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2986,6 +2986,16 @@ void TPartition::ExecImmediateTx(TTransaction& t) return; } + if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != pendingUserInfo.Session) { + if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || pendingUserInfo.Offset != i64(EndOffset)) { + ScheduleReplyPropose(record, + NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST, + NKikimrPQ::TError::BAD_REQUEST, + "session already dead"); + return; + } + } + if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) { continue; // this is stale request, answer ok for it }