Skip to content

Commit 4497a80

Browse files
committed
Fixed errors of the distributed commit offset to the partition (ydb-platform#17423)
1 parent b0e0199 commit 4497a80

File tree

11 files changed

+709
-612
lines changed

11 files changed

+709
-612
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2335,18 +2335,20 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
23352335
bool isAffectedConsumer = AffectedUsers.contains(consumer);
23362336
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
23372337

2338-
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2339-
PQ_LOG_D("Partition " << Partition <<
2340-
" Consumer '" << consumer << "'" <<
2341-
" Bad request (session already dead) " <<
2342-
" RequestSessionId '" << operation.GetReadSessionId() <<
2343-
" CurrentSessionId '" << userInfo.Session <<
2344-
"'");
2345-
result = false;
2346-
} else if (operation.GetOnlyCheckCommitedToFinish()) {
2338+
if (operation.GetOnlyCheckCommitedToFinish()) {
23472339
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
23482340
result = false;
23492341
}
2342+
} else if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2343+
if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || userInfo.Offset != i64(EndOffset)) {
2344+
PQ_LOG_D("Partition " << Partition <<
2345+
" Consumer '" << consumer << "'" <<
2346+
" Bad request (session already dead) " <<
2347+
" RequestSessionId '" << operation.GetReadSessionId() <<
2348+
" CurrentSessionId '" << userInfo.Session <<
2349+
"'");
2350+
result = false;
2351+
}
23502352
} else {
23512353
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
23522354
PQ_LOG_D("Partition " << Partition <<
@@ -2380,6 +2382,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
23802382
consumers.insert(consumer);
23812383
}
23822384
}
2385+
23832386
if (result) {
23842387
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
23852388
}
@@ -2868,6 +2871,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
28682871
"incorrect offset range (begin > end)");
28692872
return EProcessResult::ContinueDrop;
28702873
}
2874+
28712875
consumers.insert(user);
28722876
}
28732877
SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end());
@@ -2892,6 +2896,10 @@ void TPartition::ExecImmediateTx(TTransaction& t)
28922896
return;
28932897
}
28942898
for (const auto& operation : record.GetData().GetOperations()) {
2899+
if (operation.GetOnlyCheckCommitedToFinish()) {
2900+
continue;
2901+
}
2902+
28952903
if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) {
28962904
continue; //Write operation - handled separately via WriteInfo
28972905
}
@@ -2932,6 +2940,21 @@ void TPartition::ExecImmediateTx(TTransaction& t)
29322940
"incorrect offset range (commit to the future)");
29332941
return;
29342942
}
2943+
2944+
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != pendingUserInfo.Session) {
2945+
if (IsActive() || operation.GetCommitOffsetsEnd() < EndOffset || pendingUserInfo.Offset != i64(EndOffset)) {
2946+
ScheduleReplyPropose(record,
2947+
NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
2948+
NKikimrPQ::TError::BAD_REQUEST,
2949+
"session already dead");
2950+
return;
2951+
}
2952+
}
2953+
2954+
if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) {
2955+
continue; // this is stale request, answer ok for it
2956+
}
2957+
29352958
pendingUserInfo.Offset = operation.GetCommitOffsetsEnd();
29362959
}
29372960
CommitWriteOperations(t);

ydb/core/persqueue/partition.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
273273
void ConsumeBlobQuota();
274274
void UpdateAfterWriteCounters(bool writeComplete);
275275

276-
277276
void UpdateUserInfoEndOffset(const TInstant& now);
278277
void UpdateWriteBufferIsFullState(const TInstant& now);
279278

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121

2222
#include "make_config.h"
2323

24+
template<>
25+
void Out<NKikimrPQ::TEvProposeTransactionResult_EStatus>(IOutputStream& out, NKikimrPQ::TEvProposeTransactionResult_EStatus v) {
26+
out << NKikimrPQ::TEvProposeTransactionResult::EStatus_Name(v);
27+
}
28+
2429
namespace NKikimr::NPQ {
2530

2631
namespace NHelpers {
@@ -992,7 +997,7 @@ void TPartitionFixture::WaitProposeTransactionResponse(const TProposeTransaction
992997

993998
if (matcher.Status) {
994999
UNIT_ASSERT(event->Record.HasStatus());
995-
UNIT_ASSERT(*matcher.Status == event->Record.GetStatus());
1000+
UNIT_ASSERT_VALUES_EQUAL(*matcher.Status, event->Record.GetStatus());
9961001
}
9971002
}
9981003

0 commit comments

Comments
 (0)