Skip to content

Commit 7d3ba05

Browse files
committed
fix
1 parent ae4c39b commit 7d3ba05

File tree

2 files changed

+18
-18
lines changed

2 files changed

+18
-18
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2343,19 +2343,19 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
23432343
return true;
23442344
}
23452345

2346-
std::pair<bool, bool> TPartition::ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation) {
2346+
std::pair<TString, bool> TPartition::ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation) {
23472347
const TString& consumer = operation.GetConsumer();
23482348

23492349
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
23502350
PQ_LOG_D("Partition " << Partition <<
23512351
" Consumer '" << consumer << "' has been removed");
2352-
return {false, false};
2352+
return {TStringBuilder() << "Consumer '" << consumer << "' has been removed", false};
23532353
}
23542354

23552355
if (!UsersInfoStorage->GetIfExists(consumer)) {
23562356
PQ_LOG_D("Partition " << Partition <<
23572357
" Unknown consumer '" << consumer << "'");
2358-
return {false, false};
2358+
return {TStringBuilder() << "Unknown consumer '" << consumer << "'", false};
23592359
}
23602360

23612361
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
@@ -2367,12 +2367,12 @@ std::pair<bool, bool> TPartition::ValidatePartitionOperation(const NKikimrPQ::TP
23672367
" RequestSessionId '" << operation.GetReadSessionId() <<
23682368
"' CurrentSessionId '" << userInfo.Session <<
23692369
"'");
2370-
return {false, false};
2370+
return {"Bad request (session already dead)", false};
23712371
} else if (operation.GetOnlyCheckCommitedToFinish()) {
23722372
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
2373-
return {false, false};
2373+
return {TStringBuilder() << "There are uncommitted messages in partition " << Partition.OriginalPartitionId, false};
23742374
} else {
2375-
return {true, false};
2375+
return {"", false};
23762376
}
23772377
} else {
23782378
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
@@ -2381,23 +2381,23 @@ std::pair<bool, bool> TPartition::ValidatePartitionOperation(const NKikimrPQ::TP
23812381
" Bad request (invalid range) " <<
23822382
" Begin " << operation.GetCommitOffsetsBegin() <<
23832383
" End " << operation.GetCommitOffsetsEnd());
2384-
return {false, true};
2384+
return {"Bad request (invalid range)", true};
23852385
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
23862386
PQ_LOG_D("Partition " << Partition <<
23872387
" Consumer '" << consumer << "'" <<
23882388
" Bad request (gap) " <<
23892389
" Offset " << userInfo.Offset <<
23902390
" Begin " << operation.GetCommitOffsetsBegin());
2391-
return {false, true};
2391+
return {"Bad request (gap)", true};
23922392
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
23932393
PQ_LOG_D("Partition " << Partition <<
23942394
" Consumer '" << consumer << "'" <<
23952395
" Bad request (behind the last offset) " <<
23962396
" EndOffset " << EndOffset <<
23972397
" End " << operation.GetCommitOffsetsEnd());
2398-
return {false, true};
2398+
return {"Bad request (behind the last offset", true};
23992399
}
2400-
return {true, true};
2400+
return {"", true};
24012401
}
24022402
}
24032403

@@ -2419,11 +2419,11 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
24192419
return EProcessResult::Blocked;
24202420
}
24212421

2422-
auto [r, real] = ValidatePartitionOperation(operation);
2423-
result = r;
2422+
auto [error, real] = ValidatePartitionOperation(operation);
2423+
result = error.empty();
24242424

24252425
if (real) {
2426-
if (!r) {
2426+
if (!result) {
24272427
bool isAffectedConsumer = AffectedUsers.contains(consumer);
24282428

24292429
if (!isAffectedConsumer) {
@@ -2433,7 +2433,7 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
24332433
}
24342434
consumers.insert(consumer);
24352435
}
2436-
if (!r) {
2436+
if (!result) {
24372437
break;
24382438
}
24392439
}
@@ -2929,12 +2929,12 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
29292929
return EProcessResult::ContinueDrop;
29302930
}
29312931

2932-
auto [r, real] = ValidatePartitionOperation(operation);
2933-
if (!r) {
2932+
auto [error, real] = ValidatePartitionOperation(operation);
2933+
if (!error.empty()) {
29342934
ScheduleReplyPropose(tx,
29352935
NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
29362936
NKikimrPQ::TError::BAD_REQUEST,
2937-
"incorrect request");
2937+
error);
29382938
return EProcessResult::ContinueDrop;
29392939
}
29402940

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
273273
void ConsumeBlobQuota();
274274
void UpdateAfterWriteCounters(bool writeComplete);
275275

276-
std::pair<bool, bool> ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation);
276+
std::pair<TString, bool> ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation);
277277
void UpdateUserInfoEndOffset(const TInstant& now);
278278
void UpdateWriteBufferIsFullState(const TInstant& now);
279279

0 commit comments

Comments
 (0)