Skip to content

Fixed errors of the distributed commit offset to the partition #17423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 21, 2025
142 changes: 89 additions & 53 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2343,6 +2343,64 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
return true;
}

std::pair<TString, bool> TPartition::ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation) {
const TString& consumer = operation.GetConsumer();

if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "' has been removed");
return {TStringBuilder() << "Consumer '" << consumer << "' has been removed", false};
}

if (!UsersInfoStorage->GetIfExists(consumer)) {
PQ_LOG_D("Partition " << Partition <<
" Unknown consumer '" << consumer << "'");
return {TStringBuilder() << "Unknown consumer '" << consumer << "'", false};
}

TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);

if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (session already dead) " <<
" RequestSessionId '" << operation.GetReadSessionId() <<
"' CurrentSessionId '" << userInfo.Session <<
"'");
return {"Bad request (session already dead)", false};
} else if (operation.GetOnlyCheckCommitedToFinish()) {
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
return {TStringBuilder() << "There are uncommitted messages in partition " << Partition.OriginalPartitionId, false};
} else {
return {"", false};
}
} else {
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (invalid range) " <<
" Begin " << operation.GetCommitOffsetsBegin() <<
" End " << operation.GetCommitOffsetsEnd());
return {"Bad request (invalid range)", true};
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (gap) " <<
" Offset " << userInfo.Offset <<
" Begin " << operation.GetCommitOffsetsBegin());
return {"Bad request (gap)", true};
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (behind the last offset) " <<
" EndOffset " << EndOffset <<
" End " << operation.GetCommitOffsetsEnd());
return {"Bad request (behind the last offset", true};
}
return {"", true};
}
}

TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicateOut)
{
if (tx.ForcePredicateFalse) {
Expand All @@ -2361,68 +2419,25 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
return EProcessResult::Blocked;
}

if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "' has been removed");
result = false;
break;
}

if (!UsersInfoStorage->GetIfExists(consumer)) {
PQ_LOG_D("Partition " << Partition <<
" Unknown consumer '" << consumer << "'");
result = false;
break;
}

bool isAffectedConsumer = AffectedUsers.contains(consumer);
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);

if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (session already dead) " <<
" RequestSessionId '" << operation.GetReadSessionId() <<
" CurrentSessionId '" << userInfo.Session <<
"'");
result = false;
} else if (operation.GetOnlyCheckCommitedToFinish()) {
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
result = false;
}
} else {
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (invalid range) " <<
" Begin " << operation.GetCommitOffsetsBegin() <<
" End " << operation.GetCommitOffsetsEnd());
result = false;
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (gap) " <<
" Offset " << userInfo.Offset <<
" Begin " << operation.GetCommitOffsetsBegin());
result = false;
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
PQ_LOG_D("Partition " << Partition <<
" Consumer '" << consumer << "'" <<
" Bad request (behind the last offset) " <<
" EndOffset " << EndOffset <<
" End " << operation.GetCommitOffsetsEnd());
result = false;
}
auto [error, real] = ValidatePartitionOperation(operation);
result = error.empty();

if (real) {
if (!result) {
bool isAffectedConsumer = AffectedUsers.contains(consumer);

if (!isAffectedConsumer) {
AffectedUsers.erase(consumer);
}
break;
}
consumers.insert(consumer);
}
if (!result) {
break;
}
}

if (result) {
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
}
Expand Down Expand Up @@ -2913,7 +2928,19 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
"incorrect offset range (begin > end)");
return EProcessResult::ContinueDrop;
}
consumers.insert(user);

auto [error, real] = ValidatePartitionOperation(operation);
if (!error.empty()) {
ScheduleReplyPropose(tx,
NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
NKikimrPQ::TError::BAD_REQUEST,
error);
return EProcessResult::ContinueDrop;
}

if (real) {
consumers.insert(user);
}
}
SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end());
WriteKeysSizeEstimate += consumers.size();
Expand All @@ -2937,6 +2964,10 @@ void TPartition::ExecImmediateTx(TTransaction& t)
return;
}
for (const auto& operation : record.GetData().GetOperations()) {
if (operation.GetOnlyCheckCommitedToFinish()) {
continue;
}

if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) {
continue; //Write operation - handled separately via WriteInfo
}
Expand Down Expand Up @@ -2977,6 +3008,11 @@ void TPartition::ExecImmediateTx(TTransaction& t)
"incorrect offset range (commit to the future)");
return;
}

if ((i64)operation.GetCommitOffsetsEnd() < pendingUserInfo.Offset && !operation.GetReadSessionId().empty()) {
continue; // this is stale request, answer ok for it
}

pendingUserInfo.Offset = operation.GetCommitOffsetsEnd();
}
CommitWriteOperations(t);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
void ConsumeBlobQuota();
void UpdateAfterWriteCounters(bool writeComplete);


std::pair<TString, bool> ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation);
void UpdateUserInfoEndOffset(const TInstant& now);
void UpdateWriteBufferIsFullState(const TInstant& now);

Expand Down
Loading
Loading