Skip to content

Commit d38ff4d

Browse files
committed
Fixed errors of the distributed commit offset to the partition (ydb-platform#17423)
1 parent 7519913 commit d38ff4d

File tree

11 files changed

+709
-612
lines changed

11 files changed

+709
-612
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2343,18 +2343,20 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
23432343
bool isAffectedConsumer = AffectedUsers.contains(consumer);
23442344
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
23452345

2346-
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2347-
PQ_LOG_D("Partition " << Partition <<
2348-
" Consumer '" << consumer << "'" <<
2349-
" Bad request (session already dead) " <<
2350-
" RequestSessionId '" << operation.GetReadSessionId() <<
2351-
" CurrentSessionId '" << userInfo.Session <<
2352-
"'");
2353-
result = false;
2354-
} else if (operation.GetOnlyCheckCommitedToFinish()) {
2346+
if (operation.GetOnlyCheckCommitedToFinish()) {
23552347
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
23562348
result = false;
23572349
}
2350+
} else if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2351+
if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || userInfo.Offset != i64(EndOffset)) {
2352+
PQ_LOG_D("Partition " << Partition <<
2353+
" Consumer '" << consumer << "'" <<
2354+
" Bad request (session already dead) " <<
2355+
" RequestSessionId '" << operation.GetReadSessionId() <<
2356+
" CurrentSessionId '" << userInfo.Session <<
2357+
"'");
2358+
result = false;
2359+
}
23582360
} else {
23592361
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
23602362
PQ_LOG_D("Partition " << Partition <<
@@ -2388,6 +2390,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
23882390
consumers.insert(consumer);
23892391
}
23902392
}
2393+
23912394
if (result) {
23922395
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
23932396
}
@@ -2876,6 +2879,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
28762879
"incorrect offset range (begin > end)");
28772880
return EProcessResult::ContinueDrop;
28782881
}
2882+
28792883
consumers.insert(user);
28802884
}
28812885
SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end());
@@ -2900,6 +2904,10 @@ void TPartition::ExecImmediateTx(TTransaction& t)
29002904
return;
29012905
}
29022906
for (const auto& operation : record.GetData().GetOperations()) {
2907+
if (operation.GetOnlyCheckCommitedToFinish()) {
2908+
continue;
2909+
}
2910+
29032911
if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) {
29042912
continue; //Write operation - handled separately via WriteInfo
29052913
}
@@ -2940,6 +2948,21 @@ void TPartition::ExecImmediateTx(TTransaction& t)
29402948
"incorrect offset range (commit to the future)");
29412949
return;
29422950
}
2951+
2952+
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != pendingUserInfo.Session) {
2953+
if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || pendingUserInfo.Offset != i64(EndOffset)) {
2954+
ScheduleReplyPropose(record,
2955+
NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
2956+
NKikimrPQ::TError::BAD_REQUEST,
2957+
"session already dead");
2958+
return;
2959+
}
2960+
}
2961+
2962+
if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) {
2963+
continue; // this is stale request, answer ok for it
2964+
}
2965+
29432966
pendingUserInfo.Offset = operation.GetCommitOffsetsEnd();
29442967
}
29452968
CommitWriteOperations(t);

ydb/core/persqueue/partition.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
273273
void ConsumeBlobQuota();
274274
void UpdateAfterWriteCounters(bool writeComplete);
275275

276-
277276
void UpdateUserInfoEndOffset(const TInstant& now);
278277
void UpdateWriteBufferIsFullState(const TInstant& now);
279278

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121

2222
#include "make_config.h"
2323

24+
template<>
25+
void Out<NKikimrPQ::TEvProposeTransactionResult_EStatus>(IOutputStream& out, NKikimrPQ::TEvProposeTransactionResult_EStatus v) {
26+
out << NKikimrPQ::TEvProposeTransactionResult::EStatus_Name(v);
27+
}
28+
2429
namespace NKikimr::NPQ {
2530

2631
namespace NHelpers {
@@ -1009,7 +1014,7 @@ void TPartitionFixture::WaitProposeTransactionResponse(const TProposeTransaction
10091014

10101015
if (matcher.Status) {
10111016
UNIT_ASSERT(event->Record.HasStatus());
1012-
UNIT_ASSERT(*matcher.Status == event->Record.GetStatus());
1017+
UNIT_ASSERT_VALUES_EQUAL(*matcher.Status, event->Record.GetStatus());
10131018
}
10141019
}
10151020

0 commit comments

Comments
 (0)