diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 9a4ea983ba22..cd384de27343 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2378,18 +2378,20 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr bool isAffectedConsumer = AffectedUsers.contains(consumer); TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); - if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) { - PQ_LOG_D("Partition " << Partition << - " Consumer '" << consumer << "'" << - " Bad request (session already dead) " << - " RequestSessionId '" << operation.GetReadSessionId() << - " CurrentSessionId '" << userInfo.Session << - "'"); - result = false; - } else if (operation.GetOnlyCheckCommitedToFinish()) { + if (operation.GetOnlyCheckCommitedToFinish()) { if (IsActive() || static_cast(userInfo.Offset) != EndOffset) { result = false; } + } else if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) { + if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || userInfo.Offset != i64(EndOffset)) { + PQ_LOG_D("Partition " << Partition << + " Consumer '" << consumer << "'" << + " Bad request (session already dead) " << + " RequestSessionId '" << operation.GetReadSessionId() << + " CurrentSessionId '" << userInfo.Session << + "'"); + result = false; + } } else { if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) { PQ_LOG_D("Partition " << Partition << @@ -2423,6 +2425,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr consumers.insert(consumer); } } + if (result) { TxAffectedConsumers.insert(consumers.begin(), consumers.end()); } @@ -2913,6 +2916,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE "incorrect offset range (begin > end)"); return EProcessResult::ContinueDrop; } + consumers.insert(user); } SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end()); @@ -2937,6 +2941,10 @@ void TPartition::ExecImmediateTx(TTransaction& t) return; } for (const auto& operation : record.GetData().GetOperations()) { + if (operation.GetOnlyCheckCommitedToFinish()) { + continue; + } + if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) { continue; //Write operation - handled separately via WriteInfo } @@ -2977,6 +2985,21 @@ void TPartition::ExecImmediateTx(TTransaction& t) "incorrect offset range (commit to the future)"); return; } + + if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != pendingUserInfo.Session) { + if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || pendingUserInfo.Offset != i64(EndOffset)) { + ScheduleReplyPropose(record, + NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST, + NKikimrPQ::TError::BAD_REQUEST, + "session already dead"); + return; + } + } + + if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) { + continue; // this is stale request, answer ok for it + } + pendingUserInfo.Offset = operation.GetCommitOffsetsEnd(); } CommitWriteOperations(t); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 358260cc2459..acf82db4d909 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -273,7 +273,6 @@ class TPartition : public TActorBootstrapped { void ConsumeBlobQuota(); void UpdateAfterWriteCounters(bool writeComplete); - void UpdateUserInfoEndOffset(const TInstant& now); void UpdateWriteBufferIsFullState(const TInstant& now); diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 3a98fd2c6870..a7d5385d6578 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -21,6 +21,11 @@ #include "make_config.h" +template<> +void Out(IOutputStream& out, NKikimrPQ::TEvProposeTransactionResult_EStatus v) { + out << NKikimrPQ::TEvProposeTransactionResult::EStatus_Name(v); +} + namespace NKikimr::NPQ { namespace NHelpers { @@ -1007,7 +1012,7 @@ void TPartitionFixture::WaitProposeTransactionResponse(const TProposeTransaction if (matcher.Status) { UNIT_ASSERT(event->Record.HasStatus()); - UNIT_ASSERT(*matcher.Status == event->Record.GetStatus()); + UNIT_ASSERT_VALUES_EQUAL(*matcher.Status, event->Record.GetStatus()); } } diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index d560b8181d03..39dac6d578cf 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -622,98 +622,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { readSession2->Close(); } - Y_UNIT_TEST(PartitionSplit_OffsetCommit) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - TTopicClient client = setup.MakeClient(); - - setup.Write("message-1", 0); - setup.Write("message-2", 0); - setup.Write("message-3", 0); - setup.Write("message-4", 0); - setup.Write("message-5", 0); - setup.Write("message-6", 0); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3); - } - - setup.Write("message-7", 1); - setup.Write("message-8", 1); - setup.Write("message-9", 1); - setup.Write("message-10", 1); - - { - ui64 txId = 1007; - SplitPartition(setup, ++txId, 1, "0"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 5); - } - - setup.Write("message-11", 3); - setup.Write("message-12", 3); - - auto assertCommittedOffset = [&](size_t partition, size_t expectedOffset, const std::string& msg = "") { - auto description = setup.DescribeConsumer(); - auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - UNIT_ASSERT_VALUES_EQUAL_C(expectedOffset, stats->GetCommittedOffset(), "Partition " << partition << ": " << msg); - }; - - { - static constexpr size_t commited = 2; - auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, commited); - UNIT_ASSERT(status.IsSuccess()); - - assertCommittedOffset(0, 6, "Must be commited to the partition end because it is the parent"); - assertCommittedOffset(1, commited); - assertCommittedOffset(3, 0); - } - - { - static constexpr size_t commited = 3; - auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, commited); - UNIT_ASSERT(status.IsSuccess()); - - assertCommittedOffset(0, commited); - assertCommittedOffset(1, 0, "Must be commited to the partition begin because it is the child"); - assertCommittedOffset(3, 0); - } - } - - Y_UNIT_TEST(CommitTopPast) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - - TTopicClient client = setup.MakeClient(); - - setup.Write("message_1", 0); - setup.Write("message_2", 0); - - ui64 txId = 1023; - SplitPartition(setup, ++txId, 0, "a"); - - auto status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer has just started reading the inactive partition and he can commit"); - - status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 1).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages forward."); - - status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages back."); - - status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 2).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit at the end of the inactive partition."); - - status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit an offset for inactive, read-to-the-end partitions."); - } - Y_UNIT_TEST(ControlPlane_CreateAlterDescribe) { auto autoscalingTestTopic = "autoscalit-topic"; TTopicSdkTestSetup setup = CreateSetup(); @@ -932,409 +840,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST); } - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - TTopicClient client = setup.MakeClient(); - - setup.Write("message-1", 0, "producer-1", 1); - setup.Write("message-2", 0, "producer-1", 2); - setup.Write("message-3", 0, "producer-1", 3); - setup.Write("message-4", 0, "producer-1", 4); - setup.Write("message-5", 0, "producer-1", 5); - setup.Write("message-6", 0, "producer-2", 6); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3); - } - - setup.Write("message-7", 1, "producer-1", 7); - setup.Write("message-8", 1, "producer-1", 8); - setup.Write("message-9", 1, "producer-1", 9); - setup.Write("message-10", 1, "producer-2", 10); - - { - ui64 txId = 1007; - SplitPartition(setup, ++txId, 1, "0"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 5); - } - - auto count = 0; - const auto expected = 10; - - auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - ++count; - auto& message = messages[i]; - Cerr << "SESSION EVENT read message: " << count << " from partition: " << message.GetPartitionSession()->GetPartitionId() << Endl << Flush; - message.Commit(); - } - - return true; - }); - - UNIT_ASSERT(result.Timeout); - UNIT_ASSERT_VALUES_EQUAL(count, expected); - - auto description = setup.DescribeConsumer(); - UNIT_ASSERT(description.GetPartitions().size() == 5); - - auto stats1 = description.GetPartitions().at(1).GetPartitionConsumerStats(); - UNIT_ASSERT(stats1); - UNIT_ASSERT(stats1->GetCommittedOffset() == 4); - } - - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_ChildFirst) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - - TTopicClient client = setup.MakeClient(); - - setup.Write("message-1", 0, "producer-1", 1); - setup.Write("message-2", 0, "producer-1", 2); - setup.Write("message-3", 0, "producer-1", 3); - setup.Write("message-4", 0, "producer-1", 4); - setup.Write("message-5", 0, "producer-1", 5); - setup.Write("message-6", 0, "producer-2", 6); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3); - } - - setup.Write("message-7", 1, "producer-1", 7); - setup.Write("message-8", 1, "producer-1", 8); - setup.Write("message-9", 1, "producer-1", 9); - setup.Write("message-10", 1, "producer-2", 10); - - { - ui64 txId = 1007; - SplitPartition(setup, ++txId, 1, "0"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 5); - } - - - auto count = 0; - const auto expected = 10; - - std::vector partition0Messages; - - auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - auto& message = messages[i]; - count++; - int partitionId = message.GetPartitionSession()->GetPartitionId(); - Cerr << "SESSION EVENT read message: " << count << " from partition: " << partitionId << Endl << Flush; - if (partitionId == 1) { - // Commit messages from partition 1 immediately - message.Commit(); - } else if (partitionId == 0) { - // Store messages from partition 0 for later - partition0Messages.push_back(message); - } - } - - return true; - }); - - UNIT_ASSERT(result.Timeout); - UNIT_ASSERT_VALUES_EQUAL(count, expected); - - Sleep(TDuration::Seconds(5)); - - { - auto description = setup.DescribeConsumer(); - auto stats = description.GetPartitions().at(1).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - - // Messages in the parent partition hasn't been committed - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 0); - } - - for (auto& message : partition0Messages) { - message.Commit(); - } - - Sleep(TDuration::Seconds(5)); - - { - auto description = setup.DescribeConsumer(); - auto stats = description.GetPartitions().at(1).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 4); - } - } - - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckSessionResetAfterCommit) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - - TTopicClient client = setup.MakeClient(); - - auto seqNo = 1; - - setup.Write("message-1", 0, "producer-1", seqNo++); - setup.Write("message-2", 0, "producer-1", seqNo++); - setup.Write("message-3", 0, "producer-1", seqNo++); - setup.Write("message-4", 0, "producer-1", seqNo++); - setup.Write("message-5", 0, "producer-1", seqNo++); - setup.Write("message-6", 0, "producer-2", seqNo++); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = setup.DescribeTopic(); - UNIT_ASSERT_EQUAL(describe.GetPartitions().size(), 3); - } - - setup.Write("message-7", 1, "producer-2", seqNo++); - setup.Write("message-8", 1, "producer-2", seqNo++); - - std::vector counters; - counters.resize(seqNo - 1); - - auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - auto& message = messages[i]; - message.Commit(); - Cerr << "SESSION EVENT READ SeqNo: " << message.GetSeqNo() << Endl << Flush; - auto count = ++counters[message.GetSeqNo() - 1]; - - // check we get this SeqNo two times - if (message.GetSeqNo() == 6 && count == 1) { - Sleep(TDuration::MilliSeconds(300)); - auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3); - UNIT_ASSERT(status.IsSuccess()); - } - } - - return true; - }); - - UNIT_ASSERT_VALUES_EQUAL_C(1, counters[0], TStringBuilder() << "Message must be read 1 times because reset commit to offset 3, but 0 message has been read " << counters[0] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(1, counters[1], TStringBuilder() << "Message must be read 1 times because reset commit to offset 3, but 1 message has been read " << counters[1] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(1, counters[2], TStringBuilder() << "Message must be read 1 times because reset commit to offset 3, but 2 message has been read " << counters[2] << " times") ; - - UNIT_ASSERT_VALUES_EQUAL_C(2, counters[3], TStringBuilder() << "Message 1 must be read two times, but 3 message has been read " << counters[3] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(2, counters[4], TStringBuilder() << "Message 1 must be read two times, but 4 message has been read " << counters[4] << " times") ; - UNIT_ASSERT_VALUES_EQUAL_C(2, counters[5], TStringBuilder() << "Message 1 must be read two times, but 5 message has been read " << counters[5] << " times") ; - - { - auto s = result.StartPartitionSessionEvents[0]; - UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId()); - UNIT_ASSERT_VALUES_EQUAL(0, s.GetCommittedOffset()); - UNIT_ASSERT_VALUES_EQUAL(6, s.GetEndOffset()); - } - { - auto s = result.StartPartitionSessionEvents[3]; - UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId()); - UNIT_ASSERT_VALUES_EQUAL(3, s.GetCommittedOffset()); - UNIT_ASSERT_VALUES_EQUAL(6, s.GetEndOffset()); - } - } - - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_SplitedTopic) { - TTopicSdkTestSetup setup = CreateSetup(); - TTopicClient client = setup.MakeClient(); - - setup.CreateTopicWithAutoscale(); - - auto commit = [&](const std::string& sessionId, ui64 offset) { - return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId); - }; - - auto getConsumerState = [&](ui32 partition) { - auto description = setup.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER); - - auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - return stats; - }; - - setup.Write("message-1", 0, "producer-1", 1); - setup.Write("message-2", 0, "producer-1", 2); - setup.Write("message-3", 0, "producer-1", 3); - setup.Write("message-4", 0, "producer-1", 4); - setup.Write("message-5", 0, "producer-1", 5); - setup.Write("message-6", 0, "producer-1", 6); - setup.Write("message-7", 0, "producer-1", 7); - setup.Write("message-8", 0, "producer-2", 8); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - - auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync(); - UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3); - } - - setup.Write("message-9", 1, "producer-2", 9); - setup.Write("message-10", 1, "producer-2", 10); - - auto commitSent = false; - TString readSessionId = ""; - - setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - auto& message = messages[i]; - Cerr << "SESSION EVENT READ SeqNo: " << message.GetSeqNo() << Endl << Flush; - - if (commitSent) { - // read session not changed - UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId()); - } - - // check we NOT get this SeqNo two times - if (message.GetSeqNo() == 6) { - if (!commitSent) { - commitSent = true; - - readSessionId = message.GetPartitionSession()->GetReadSessionId(); - - { - auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 8); - UNIT_ASSERT(status.IsSuccess()); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - - { - // must be ignored, because commit to past - auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 0); - UNIT_ASSERT(status.IsSuccess()); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - - /* TODO uncomment this - { - // must be ignored, because wrong sessionid - auto status = commit("random session", 0); - UNIT_ASSERT(!status.IsSuccess()); - - Sleep(TDuration::MilliSeconds(500)); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - */ - } else { - UNIT_ASSERT(false); - } - } else { - message.Commit(); - } - } - - return true; - }); - } - - Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckOffsetCommitForDifferentCases_NotSplitedTopic) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - TTopicClient client = setup.MakeClient(); - - auto commit = [&](const std::string& sessionId, ui64 offset) { - return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId); - }; - - auto getConsumerState = [&](ui32 partition) { - auto description = setup.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER); - - auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats(); - UNIT_ASSERT(stats); - return stats; - }; - - setup.Write("message-1", 0, "producer-1", 1); - setup.Write("message-2", 0, "producer-1", 2); - setup.Write("message-3", 0, "producer-1", 3); - setup.Write("message-4", 0, "producer-1", 4); - setup.Write("message-5", 0, "producer-1", 5); - setup.Write("message-6", 0, "producer-1", 6); - setup.Write("message-7", 0, "producer-1", 7); - setup.Write("message-8", 0, "producer-2", 8); - - auto commitSent = false; - TString readSessionId = ""; - - setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { - auto& messages = x.GetMessages(); - for (size_t i = 0u; i < messages.size(); ++i) { - auto& message = messages[i]; - - if (commitSent) { - // read session not changed - UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId()); - } - - // check we NOT get this SeqNo two times - if (message.GetSeqNo() == 6) { - if (!commitSent) { - commitSent = true; - readSessionId = message.GetPartitionSession()->GetReadSessionId(); - - Sleep(TDuration::MilliSeconds(300)); - - { - auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 8); - UNIT_ASSERT(status.IsSuccess()); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - - { - // must be ignored, because commit to past - auto status = commit(message.GetPartitionSession()->GetReadSessionId(), 0); - UNIT_ASSERT(status.IsSuccess()); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - - { - // must be ignored, because wrong sessionid - auto status = commit("random session", 0); - UNIT_ASSERT(!status.IsSuccess()); - - Sleep(TDuration::MilliSeconds(500)); - - auto stats = getConsumerState(0); - UNIT_ASSERT_VALUES_EQUAL(stats->GetCommittedOffset(), 8); - } - } else { - UNIT_ASSERT(false); - } - } else { - message.Commit(); - } - } - - return true; - }); - } - Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) { TTopicSdkTestSetup setup = CreateSetup(); TTopicClient client = setup.MakeClient(); diff --git a/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp new file mode 100644 index 000000000000..cd80af0b1ecf --- /dev/null +++ b/ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp @@ -0,0 +1,623 @@ +#include + +#include + +#include +#include +#include +#include +#include + +#include + +namespace NKikimr { + +using namespace NYdb::NTopic; +using namespace NYdb::NTopic::NTests; +using namespace NSchemeShardUT_Private; +using namespace NKikimr::NPQ::NTest; + +Y_UNIT_TEST_SUITE(CommitOffset) { + + void PrepareFlatTopic(TTopicSdkTestSetup& setup) { + setup.CreateTopic(); + + setup.Write("message-1"); + setup.Write("message-2"); + setup.Write("message-3"); + } + + void PrepareAutopartitionedTopic(TTopicSdkTestSetup& setup) { + setup.CreateTopicWithAutoscale(); + + // Creating partition hierarchy + // 0 ──┬──> 1 ──┬──> 3 + // │ └──> 4 + // └──> 2 + // + // Each partition has 3 messages + + setup.Write("message-0-1", 0); + setup.Write("message-0-2", 0); + setup.Write("message-0-3", 0); + + { + ui64 txId = 1006; + SplitPartition(setup, ++txId, 0, "a"); + } + + setup.Write("message-1-1", 1); + setup.Write("message-1-2", 1); + setup.Write("message-1-3", 1); + + setup.Write("message-2-1", 2); + setup.Write("message-2-2", 2); + setup.Write("message-2-3", 2); + + { + ui64 txId = 1007; + SplitPartition(setup, ++txId, 1, "0"); + } + + setup.Write("message-3-1", 3); + setup.Write("message-3-2", 3); + setup.Write("message-3-3", 3); + + setup.Write("message-4-1", 4); + setup.Write("message-4-2", 4); + setup.Write("message-4-3", 4); + } + + ui64 GetCommittedOffset(TTopicSdkTestSetup& setup, size_t partition) { + auto description = setup.DescribeConsumer(); + auto stats = description.GetPartitions().at(partition).GetPartitionConsumerStats(); + UNIT_ASSERT(stats); + return stats->GetCommittedOffset(); + } + + Y_UNIT_TEST(Commit_Flat_WithWrongSession) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareFlatTopic(setup); + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1, "wrong-read-session-id"); + UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0)); + } + } + + Y_UNIT_TEST(Commit_Flat_WithWrongSession_ToPast) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareFlatTopic(setup); + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 2); + UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); + UNIT_ASSERT_VALUES_EQUAL(2, GetCommittedOffset(setup, 0)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id"); + UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); + UNIT_ASSERT_VALUES_EQUAL(2, GetCommittedOffset(setup, 0)); + } + } + + Y_UNIT_TEST(Commit_WithoutSession_TopPast) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer has just started reading the inactive partition and he can commit"); + + status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages forward."); + + status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "A consumer who has not read to the end can commit messages back."); + + status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit at the end of the inactive partition."); + + status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0); + UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::SUCCESS, status.GetStatus(), "The consumer can commit an offset for inactive, read-to-the-end partitions."); + } + + Y_UNIT_TEST(Commit_WithWrongSession_ToParent) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + setup.CreateTopicWithAutoscale(); + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 0)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); + UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id"); + UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); + UNIT_ASSERT_VALUES_EQUAL_C(3, GetCommittedOffset(setup, 0), "Offset doesn`t changed"); + UNIT_ASSERT_VALUES_EQUAL_C(1, GetCommittedOffset(setup, 1), "Offset doesn`t changed"); + } + } + + Y_UNIT_TEST(Commit_WithoutSession_ParentNotFinished) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + // Commit parent partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + } + + Y_UNIT_TEST(Commit_WithoutSession_ToPastParentPartition) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + // Commit child partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + { + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + } + + Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_SameSession) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + // Commit parent partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + { + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + return x.GetMessages().back().GetData() != "message-3-3"; + }); + + // Commit parent to middle + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId()); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + } + + Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_OtherSession) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + TTopicClient client(setup.MakeDriver()); + + auto r0 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 0); + auto r1 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 1); + + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, + r1.StartPartitionSessionEvents.back().GetPartitionSession()->GetReadSessionId()); + UNIT_ASSERT(!result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_OtherSession_ParentCommittedToEnd) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + TTopicClient client(setup.MakeDriver()); + + setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3); + + auto r0 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 0); + auto r1 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 1); + + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, + r1.StartPartitionSessionEvents.back().GetPartitionSession()->GetReadSessionId()); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + Y_UNIT_TEST(Commit_WithSession_ToPastParentPartition) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + // Commit parent partition to non end + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 3, 1); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + { + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) { + return false; + }); + + // Commit parent to middle + auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, r.StartPartitionSessionEvents.front().GetPartitionSession()->GetReadSessionId()); + UNIT_ASSERT(result.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + } + + Y_UNIT_TEST(Commit_FromSession_ToNewChild_WithoutCommitToParent) { + TTopicSdkTestSetup setup = CreateSetup(); + setup.CreateTopicWithAutoscale(); + + setup.Write("message-0-0", 0); + setup.Write("message-0-1", 0); + setup.Write("message-0-2", 0); + + { + bool committed = false; + + auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + for (auto & m: x.GetMessages()) { + if (x.GetPartitionSession()->GetPartitionId() == 0 && m.GetOffset() == 1) { + ui64 txId = 1006; + SplitPartition(setup, ++txId, 0, "a"); + + setup.Write("message-1-0", 1); + setup.Write("message-1-1", 1); + setup.Write("message-1-2", 1); + } else if (x.GetPartitionSession()->GetPartitionId() == 1 && m.GetOffset() == 0) { + m.Commit(); + committed = true; + return false; + } + } + + return true; + }); + + UNIT_ASSERT(committed); + + Sleep(TDuration::Seconds(3)); + + // Commit hasn`t applyed because messages from the parent partitions has`t been committed + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + } + } + + Y_UNIT_TEST(PartitionSplit_OffsetCommit) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + { + static constexpr size_t commited = 2; + auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, commited); + UNIT_ASSERT(status.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL_C(3, GetCommittedOffset(setup, 0), "Must be commited to the partition end because it is the parent"); + UNIT_ASSERT_VALUES_EQUAL(commited, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + } + + { + static constexpr size_t commited = 3; + auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, commited); + UNIT_ASSERT(status.IsSuccess()); + + UNIT_ASSERT_VALUES_EQUAL(commited, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL_C(0, GetCommittedOffset(setup, 1), "Must be commited to the partition begin because it is the child"); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + + } + } + + Y_UNIT_TEST(DistributedTxCommit) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto count = 0; + const auto expected = 15; + + auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + ++count; + auto& message = messages[i]; + Cerr << "SESSION EVENT read message: " << count << " from partition: " << message.GetPartitionSession()->GetPartitionId() << Endl << Flush; + message.Commit(); + } + + return true; + }); + + UNIT_ASSERT(result.Timeout); + UNIT_ASSERT_VALUES_EQUAL(count, expected); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 4)); + } + + Y_UNIT_TEST(DistributedTxCommit_ChildFirst) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto count = 0; + const auto expected = 15; + + std::vector partition0Messages; + + auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + count++; + int partitionId = message.GetPartitionSession()->GetPartitionId(); + Cerr << "SESSION EVENT read message: " << count << " from partition: " << partitionId << Endl << Flush; + if (partitionId == 1) { + // Commit messages from partition 1 immediately + message.Commit(); + } else if (partitionId == 0) { + // Store messages from partition 0 for later + partition0Messages.push_back(message); + } + } + + return true; + }); + + UNIT_ASSERT(result.Timeout); + UNIT_ASSERT_VALUES_EQUAL(count, expected); + + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + + for (auto& message : partition0Messages) { + message.Commit(); + } + + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0)); + UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 1)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3)); + UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4)); + } + + Y_UNIT_TEST(DistributedTxCommit_CheckSessionResetAfterCommit) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + std::unordered_map counters; + + auto result = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + message.Commit(); + + auto count = ++counters[message.GetData()]; + + // check we get this SeqNo two times + if (message.GetData() == "message-0-3" && count == 1) { + Sleep(TDuration::MilliSeconds(300)); + auto status = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1); + UNIT_ASSERT(status.IsSuccess()); + } + } + + return true; + }); + + UNIT_ASSERT_VALUES_EQUAL_C(1, counters["message-0-1"], "Message must be read 1 times because reset commit to offset 3, but 0 message has been read " << counters["message-0-1"] << " times") ; + UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-2"], "Message must be read 2 times because reset commit to offset 1, but 1 message has been read " << counters["message-0-2"] << " times") ; + UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-3"], "Message must be read 2 times because reset commit to offset 1, but 2 message has been read " << counters["message-0-3"] << " times") ; + + { + auto s = result.StartPartitionSessionEvents[0]; + UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId()); + UNIT_ASSERT_VALUES_EQUAL(0, s.GetCommittedOffset()); + UNIT_ASSERT_VALUES_EQUAL(3, s.GetEndOffset()); + } + { + auto s = result.StartPartitionSessionEvents[3]; + UNIT_ASSERT_VALUES_EQUAL(0, s.GetPartitionSession()->GetPartitionId()); + UNIT_ASSERT_VALUES_EQUAL(1, s.GetCommittedOffset()); + UNIT_ASSERT_VALUES_EQUAL(3, s.GetEndOffset()); + } + } + + Y_UNIT_TEST(DistributedTxCommit_CheckOffsetCommitForDifferentCases) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareAutopartitionedTopic(setup); + + auto commit = [&](const std::string& sessionId, ui64 offset) { + return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId); + }; + + auto commitSent = false; + TString readSessionId = ""; + + setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + if (commitSent) { + // read session not changed + UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId()); + } + + // check we NOT get this SeqNo two times + if (message.GetData() == "message-0-2") { + if (!commitSent) { + commitSent = true; + + readSessionId = message.GetPartitionSession()->GetReadSessionId(); + + { + auto status = commit(readSessionId, 3); + UNIT_ASSERT(status.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + + { + // must be ignored, because commit to past + auto status = commit(readSessionId, 0); + UNIT_ASSERT(status.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + + { + // must be ignored, because wrong sessionid + auto status = commit("random session", 0); + UNIT_ASSERT(!status.IsSuccess()); + Sleep(TDuration::MilliSeconds(500)); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + } else { + UNIT_ASSERT(false); + } + } else { + message.Commit(); + } + } + + return true; + }); + } + + Y_UNIT_TEST(DistributedTxCommit_Flat_CheckOffsetCommitForDifferentCases) { + TTopicSdkTestSetup setup = CreateSetup(); + PrepareFlatTopic(setup); + + auto commit = [&](const std::string& sessionId, ui64 offset) { + return setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, offset, sessionId); + }; + + auto commitSent = false; + TString readSessionId = ""; + + setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) { + auto& messages = x.GetMessages(); + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + + if (commitSent) { + // read session not changed + UNIT_ASSERT_EQUAL(readSessionId, message.GetPartitionSession()->GetReadSessionId()); + } + + // check we NOT get this message two times + if (message.GetData() == "message-0-2") { + if (!commitSent) { + commitSent = true; + + readSessionId = message.GetPartitionSession()->GetReadSessionId(); + + { + auto status = commit(readSessionId, 3); + UNIT_ASSERT(status.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + + { + // must be ignored, because commit to past + auto status = commit(readSessionId, 0); + UNIT_ASSERT(status.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + + { + // must be ignored, because wrong sessionid + auto status = commit("random session", 0); + UNIT_ASSERT(!status.IsSuccess()); + Sleep(TDuration::MilliSeconds(500)); + UNIT_ASSERT_VALUES_EQUAL(GetCommittedOffset(setup, 0), 3); + } + } else { + UNIT_ASSERT(false); + } + } else { + message.Commit(); + } + } + + return true; + }); + } +} + +} // namespace NKikimr diff --git a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp index 5cc8256aaa09..aadf1ad16276 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp @@ -162,93 +162,6 @@ Y_UNIT_TEST_SUITE(WithSDK) { UNIT_ASSERT_VALUES_EQUAL(2, c->GetLastReadOffset()); } } - - Y_UNIT_TEST(CommitWithWrongSessionId) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1); - - setup.Write("message-1"); - setup.Write("message-2"); - setup.Write("message-3"); - - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 1, "wrong-read-session-id"); - UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(0, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset()); - } - } - - Y_UNIT_TEST(CommitToPastWithWrongSessionId) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopic(std::string{TEST_TOPIC}, std::string{TEST_CONSUMER}, 1); - - setup.Write("message-1"); - setup.Write("message-2"); - setup.Write("message-3"); - - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 2); - UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(2, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset()); - } - - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 0, "wrong-read-session-id"); - UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(2, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset()); - } - } - - /* TODO Uncomment this test - Y_UNIT_TEST(CommitToParentPartitionWithWrongSessionId) { - TTopicSdkTestSetup setup = CreateSetup(); - setup.CreateTopicWithAutoscale(); - - setup.Write("message-1", 0); - - { - ui64 txId = 1006; - SplitPartition(setup, ++txId, 0, "a"); - } - - setup.Write("message-2", 1); - - Cerr << ">>>>> BEGIN 0" << Endl << Flush; - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 1); - UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(1, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset()); - } - - Cerr << ">>>>> BEGIN 1" << Endl << Flush; - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 1, 1); - UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL(1, desc.GetPartitions().at(1).GetPartitionConsumerStats()->GetCommittedOffset()); - } - - Cerr << ">>>>> BEGIN 2" << Endl << Flush; - { - auto result = setup.Commit(TString(TEST_TOPIC), TEST_CONSUMER, 0, 0, "wrong-read-session-id"); - UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id"); - - auto desc = setup.DescribeConsumer(TString(TEST_TOPIC), TEST_CONSUMER); - UNIT_ASSERT_VALUES_EQUAL_C(1, desc.GetPartitions().at(0).GetPartitionConsumerStats()->GetCommittedOffset(), "Offset doesn`t changed"); - } - Cerr << ">>>>> END" << Endl << Flush; - - } - */ } } // namespace NKikimr diff --git a/ydb/core/persqueue/ut/ut_with_sdk/ya.make b/ydb/core/persqueue/ut/ut_with_sdk/ya.make index eb370dff0244..c837ecdfa649 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/ya.make +++ b/ydb/core/persqueue/ut/ut_with_sdk/ya.make @@ -31,6 +31,7 @@ YQL_LAST_ABI_VERSION() SRCS( autoscaling_ut.cpp balancing_ut.cpp + commitoffset_ut.cpp mirrorer_ut.cpp topic_ut.cpp ) diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 1110c0f038b1..0055dd8543cf 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -98,14 +98,22 @@ void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId, con session->Close(TDuration::Seconds(5)); } -TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function handler, const TDuration timeout) { +TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, + std::function handler, + std::optional partition, const TDuration timeout) { TTopicClient client(MakeDriver()); - auto reader = client.CreateReadSession( - TReadSessionSettings() - .AutoPartitioningSupport(true) - .AppendTopics(TTopicReadSettings(topic)) - .ConsumerName(consumer)); + auto topicSettings = TTopicReadSettings(topic); + if (partition) { + topicSettings.AppendPartitionIds(partition.value()); + } + + auto settins = TReadSessionSettings() + .AutoPartitioningSupport(true) + .AppendTopics(topicSettings) + .ConsumerName(consumer); + + auto reader = client.CreateReadSession(settins); TInstant deadlineTime = TInstant::Now() + timeout; diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index 9502b9fd4ff7..190fb3bd6241 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -32,7 +32,9 @@ class TTopicSdkTestSetup { std::vector StartPartitionSessionEvents; }; - ReadResult Read(const std::string& topic, const std::string& consumer, std::function handler, const TDuration timeout = TDuration::Seconds(5)); + ReadResult Read(const std::string& topic, const std::string& consumer, + std::function handler, + std::optional partition = std::nullopt, const TDuration timeout = TDuration::Seconds(5)); TStatus Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional sessionId = std::nullopt); TString GetEndpoint() const; diff --git a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp index 471162c5f9ae..415e99d676fe 100644 --- a/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp +++ b/ydb/services/persqueue_v1/actors/commit_offset_actor.cpp @@ -106,24 +106,42 @@ void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAc if (partitionNode->AllParents.size() == 0 && partitionNode->DirectChildren.size() == 0) { SendCommit(topicInitInfo, commitRequest, ctx); } else { - auto killReadSession = commitRequest->read_session_id().empty(); + auto hasReadSession = !commitRequest->read_session_id().empty(); + auto killReadSession = !hasReadSession; + const TString& readSessionId = commitRequest->read_session_id(); + std::vector commits; for (auto& parent: partitionNode->AllParents) { - TDistributedCommitHelper::TCommitInfo commit {.PartitionId = parent->Id, .Offset = Max(), .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false}; + TDistributedCommitHelper::TCommitInfo commit { + .PartitionId = parent->Id, + .Offset = Max(), + .KillReadSession = killReadSession, + .OnlyCheckCommitedToFinish = false, + .ReadSessionId = readSessionId + }; commits.push_back(commit); } - for (auto& child: partitionNode->AllChildren) { - TDistributedCommitHelper::TCommitInfo commit {.PartitionId = child->Id, .Offset = 0, .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false}; - commits.push_back(commit); + if (!hasReadSession) { + for (auto& child: partitionNode->AllChildren) { + TDistributedCommitHelper::TCommitInfo commit { + .PartitionId = child->Id, + .Offset = 0, + .KillReadSession = true, + .OnlyCheckCommitedToFinish = false + }; + commits.push_back(commit); + } } - TDistributedCommitHelper::TCommitInfo commit {.PartitionId = partitionNode->Id, .Offset = commitRequest->offset(), .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false}; - - if (!commitRequest->read_session_id().empty()) { - commit.ReadSessionId = commitRequest->read_session_id(); - } + TDistributedCommitHelper::TCommitInfo commit { + .PartitionId = partitionNode->Id, + .Offset = commitRequest->offset(), + .KillReadSession = killReadSession, + .OnlyCheckCommitedToFinish = false, + .ReadSessionId = readSessionId + }; commits.push_back(commit); Kqp = std::make_unique(Request().GetDatabaseName().GetOrElse(TString()), ClientId, topic, commits); diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 611befb9196a..16352cc0407f 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -1278,7 +1278,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartit } std::unordered_set notCommitedToFinishParents; - for (auto& parent: topic.PartitionGraph->GetPartition(record.GetPartition())->DirectParents) { + for (auto& parent: partitionNode->DirectParents) { for (auto& [_, actorInfo]: Partitions) { // TODO: map if (actorInfo.Partition.Partition == parent->Id && !actorInfo.IsLastOffsetCommitted()) { notCommitedToFinishParents.emplace(actorInfo.Partition.Partition); @@ -1286,7 +1286,7 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartit } } - const auto& parentPartitions = topic.PartitionGraph->GetPartition(partitionId.Partition)->AllParents; + const auto& parentPartitions = partitionNode->AllParents; const auto database = Request->GetDatabaseName().GetOrElse(AppData(ctx)->PQConfig.GetDatabase()); const TActorId actorId = ctx.Register(new TPartitionActor( ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(),