Skip to content

Commit aa6bba8

Browse files
committed
one case hase been fixed
1 parent c015f88 commit aa6bba8

File tree

4 files changed

+126
-66
lines changed

4 files changed

+126
-66
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 90 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2343,6 +2343,74 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
23432343
return true;
23442344
}
23452345

2346+
std::pair<bool, bool> TPartition::ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation) {
2347+
const TString& consumer = operation.GetConsumer();
2348+
2349+
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
2350+
PQ_LOG_D("Partition " << Partition <<
2351+
" Consumer '" << consumer << "' has been removed");
2352+
return {false, false};
2353+
}
2354+
2355+
if (!UsersInfoStorage->GetIfExists(consumer)) {
2356+
PQ_LOG_D("Partition " << Partition <<
2357+
" Unknown consumer '" << consumer << "'");
2358+
return {false, false};
2359+
}
2360+
2361+
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
2362+
2363+
PQ_LOG_D("CommitOperation Partition " << Partition <<
2364+
" Consumer '" << consumer << "'" <<
2365+
" RequestSessionId '" << operation.GetReadSessionId() <<
2366+
"' CurrentSessionId '" << userInfo.Session <<
2367+
"' OffsetBegin " << operation.GetCommitOffsetsBegin() <<
2368+
"' OffsetEnd " << operation.GetCommitOffsetsEnd() <<
2369+
" OnlyCheckCommitedToFinish " << operation.GetOnlyCheckCommitedToFinish() <<
2370+
" KillReadSession " << operation.GetKillReadSession()
2371+
);
2372+
2373+
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2374+
PQ_LOG_D("Partition " << Partition <<
2375+
" Consumer '" << consumer << "'" <<
2376+
" Bad request (session already dead) " <<
2377+
" RequestSessionId '" << operation.GetReadSessionId() <<
2378+
"' CurrentSessionId '" << userInfo.Session <<
2379+
"'");
2380+
return {false, false};
2381+
} else if (operation.GetOnlyCheckCommitedToFinish()) {
2382+
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
2383+
return {false, false};
2384+
} else {
2385+
return {true, false};
2386+
}
2387+
} else {
2388+
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
2389+
PQ_LOG_D("Partition " << Partition <<
2390+
" Consumer '" << consumer << "'" <<
2391+
" Bad request (invalid range) " <<
2392+
" Begin " << operation.GetCommitOffsetsBegin() <<
2393+
" End " << operation.GetCommitOffsetsEnd());
2394+
return {false, true};
2395+
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
2396+
PQ_LOG_D("Partition " << Partition <<
2397+
" Consumer '" << consumer << "'" <<
2398+
" Bad request (gap) " <<
2399+
" Offset " << userInfo.Offset <<
2400+
" Begin " << operation.GetCommitOffsetsBegin());
2401+
return {false, true};
2402+
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
2403+
PQ_LOG_D("Partition " << Partition <<
2404+
" Consumer '" << consumer << "'" <<
2405+
" Bad request (behind the last offset) " <<
2406+
" EndOffset " << EndOffset <<
2407+
" End " << operation.GetCommitOffsetsEnd());
2408+
return {false, true};
2409+
}
2410+
return {true, true};
2411+
}
2412+
}
2413+
23462414
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicateOut)
23472415
{
23482416
if (tx.ForcePredicateFalse) {
@@ -2361,68 +2429,25 @@ TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPr
23612429
return EProcessResult::Blocked;
23622430
}
23632431

2364-
if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
2365-
PQ_LOG_D("Partition " << Partition <<
2366-
" Consumer '" << consumer << "' has been removed");
2367-
result = false;
2368-
break;
2369-
}
2370-
2371-
if (!UsersInfoStorage->GetIfExists(consumer)) {
2372-
PQ_LOG_D("Partition " << Partition <<
2373-
" Unknown consumer '" << consumer << "'");
2374-
result = false;
2375-
break;
2376-
}
2432+
auto [r, real] = ValidatePartitionOperation(operation);
2433+
result = r;
23772434

2378-
bool isAffectedConsumer = AffectedUsers.contains(consumer);
2379-
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
2380-
2381-
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2382-
PQ_LOG_D("Partition " << Partition <<
2383-
" Consumer '" << consumer << "'" <<
2384-
" Bad request (session already dead) " <<
2385-
" RequestSessionId '" << operation.GetReadSessionId() <<
2386-
" CurrentSessionId '" << userInfo.Session <<
2387-
"'");
2388-
result = false;
2389-
} else if (operation.GetOnlyCheckCommitedToFinish()) {
2390-
if (IsActive() || static_cast<ui64>(userInfo.Offset) != EndOffset) {
2391-
result = false;
2392-
}
2393-
} else {
2394-
if (!operation.GetForceCommit() && operation.GetCommitOffsetsBegin() > operation.GetCommitOffsetsEnd()) {
2395-
PQ_LOG_D("Partition " << Partition <<
2396-
" Consumer '" << consumer << "'" <<
2397-
" Bad request (invalid range) " <<
2398-
" Begin " << operation.GetCommitOffsetsBegin() <<
2399-
" End " << operation.GetCommitOffsetsEnd());
2400-
result = false;
2401-
} else if (!operation.GetForceCommit() && userInfo.Offset != (i64)operation.GetCommitOffsetsBegin()) {
2402-
PQ_LOG_D("Partition " << Partition <<
2403-
" Consumer '" << consumer << "'" <<
2404-
" Bad request (gap) " <<
2405-
" Offset " << userInfo.Offset <<
2406-
" Begin " << operation.GetCommitOffsetsBegin());
2407-
result = false;
2408-
} else if (!operation.GetForceCommit() && operation.GetCommitOffsetsEnd() > EndOffset) {
2409-
PQ_LOG_D("Partition " << Partition <<
2410-
" Consumer '" << consumer << "'" <<
2411-
" Bad request (behind the last offset) " <<
2412-
" EndOffset " << EndOffset <<
2413-
" End " << operation.GetCommitOffsetsEnd());
2414-
result = false;
2415-
}
2435+
if (real) {
2436+
if (!r) {
2437+
bool isAffectedConsumer = AffectedUsers.contains(consumer);
24162438

2417-
if (!result) {
24182439
if (!isAffectedConsumer) {
24192440
AffectedUsers.erase(consumer);
24202441
}
24212442
break;
24222443
}
24232444
consumers.insert(consumer);
24242445
}
2446+
if (!r) {
2447+
break;
2448+
}
24252449
}
2450+
24262451
if (result) {
24272452
TxAffectedConsumers.insert(consumers.begin(), consumers.end());
24282453
}
@@ -2586,6 +2611,9 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
25862611
Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate);
25872612

25882613
for (auto& operation : t->Tx->Operations) {
2614+
2615+
Cerr << ">>>>> CommitTransaction " << Endl << Flush;
2616+
25892617
if (operation.GetOnlyCheckCommitedToFinish()) {
25902618
continue;
25912619
}
@@ -2913,6 +2941,16 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
29132941
"incorrect offset range (begin > end)");
29142942
return EProcessResult::ContinueDrop;
29152943
}
2944+
2945+
auto [r, _] = ValidatePartitionOperation(operation);
2946+
if (!r) {
2947+
ScheduleReplyPropose(tx,
2948+
NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
2949+
NKikimrPQ::TError::BAD_REQUEST,
2950+
"incorrect request");
2951+
return EProcessResult::ContinueDrop;
2952+
}
2953+
29162954
consumers.insert(user);
29172955
}
29182956
SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end());

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
273273
void ConsumeBlobQuota();
274274
void UpdateAfterWriteCounters(bool writeComplete);
275275

276-
276+
std::pair<bool, bool> ValidatePartitionOperation(const NKikimrPQ::TPartitionOperation& operation);
277277
void UpdateUserInfoEndOffset(const TInstant& now);
278278
void UpdateWriteBufferIsFullState(const TInstant& now);
279279

ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,11 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
9797
}
9898

9999
{
100+
Cerr << ">>>>>> BEGIN" << Endl << Flush;
100101
auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id");
101102
UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");
102103
UNIT_ASSERT_VALUES_EQUAL(2, GetCommittedOffset(setup, 0));
104+
Cerr << ">>>>>> END" << Endl << Flush;
103105
}
104106
}
105107

@@ -137,13 +139,15 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
137139
{
138140
auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1);
139141
UNIT_ASSERT_C(result.IsSuccess(), "Commited without session id. It is reset mode");
142+
UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
140143
UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1));
141144
}
142145

143146
{
144147
auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 0, "wrong-read-session-id");
145148
UNIT_ASSERT_C(!result.IsSuccess(), "Commit doesn`t work with wrong session id");
146-
UNIT_ASSERT_VALUES_EQUAL_C(1, GetCommittedOffset(setup, 0), "Offset doesn`t changed");
149+
UNIT_ASSERT_VALUES_EQUAL_C(3, GetCommittedOffset(setup, 0), "Offset doesn`t changed");
150+
UNIT_ASSERT_VALUES_EQUAL_C(1, GetCommittedOffset(setup, 1), "Offset doesn`t changed");
147151
}
148152
}
149153

@@ -441,8 +445,8 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
441445
});
442446

443447
UNIT_ASSERT_VALUES_EQUAL_C(1, counters["message-0-1"], "Message must be read 1 times because reset commit to offset 3, but 0 message has been read " << counters["message-0-1"] << " times") ;
444-
UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-2"], TStringBuilder() << "Message must be read 2 times because reset commit to offset 1, but 1 message has been read " << counters["message-0-2"] << " times") ;
445-
UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-3"], TStringBuilder() << "Message must be read 2 times because reset commit to offset 1, but 2 message has been read " << counters["message-0-3"] << " times") ;
448+
UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-2"], "Message must be read 2 times because reset commit to offset 1, but 1 message has been read " << counters["message-0-2"] << " times") ;
449+
UNIT_ASSERT_VALUES_EQUAL_C(2, counters["message-0-3"], "Message must be read 2 times because reset commit to offset 1, but 2 message has been read " << counters["message-0-3"] << " times") ;
446450

447451
{
448452
auto s = result.StartPartitionSessionEvents[0];

ydb/services/persqueue_v1/actors/commit_offset_actor.cpp

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,24 +106,42 @@ void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAc
106106
if (partitionNode->AllParents.size() == 0 && partitionNode->DirectChildren.size() == 0) {
107107
SendCommit(topicInitInfo, commitRequest, ctx);
108108
} else {
109-
auto killReadSession = commitRequest->read_session_id().empty();
109+
auto hasReadSession = !commitRequest->read_session_id().empty();
110+
auto killReadSession = !hasReadSession;
111+
const TString& readSessionId = commitRequest->read_session_id();
112+
110113
std::vector<TDistributedCommitHelper::TCommitInfo> commits;
111114

112115
for (auto& parent: partitionNode->AllParents) {
113-
TDistributedCommitHelper::TCommitInfo commit {.PartitionId = parent->Id, .Offset = Max<i64>(), .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false};
116+
TDistributedCommitHelper::TCommitInfo commit {
117+
.PartitionId = parent->Id,
118+
.Offset = Max<i64>(),
119+
.KillReadSession = killReadSession,
120+
.OnlyCheckCommitedToFinish = false,
121+
.ReadSessionId = readSessionId
122+
};
114123
commits.push_back(commit);
115124
}
116125

117-
for (auto& child: partitionNode->AllChildren) {
118-
TDistributedCommitHelper::TCommitInfo commit {.PartitionId = child->Id, .Offset = 0, .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false};
119-
commits.push_back(commit);
126+
if (!hasReadSession) {
127+
for (auto& child: partitionNode->AllChildren) {
128+
TDistributedCommitHelper::TCommitInfo commit {
129+
.PartitionId = child->Id,
130+
.Offset = 0,
131+
.KillReadSession = true,
132+
.OnlyCheckCommitedToFinish = false
133+
};
134+
commits.push_back(commit);
135+
}
120136
}
121137

122-
TDistributedCommitHelper::TCommitInfo commit {.PartitionId = partitionNode->Id, .Offset = commitRequest->offset(), .KillReadSession = killReadSession, .OnlyCheckCommitedToFinish = false};
123-
124-
if (!commitRequest->read_session_id().empty()) {
125-
commit.ReadSessionId = commitRequest->read_session_id();
126-
}
138+
TDistributedCommitHelper::TCommitInfo commit {
139+
.PartitionId = partitionNode->Id,
140+
.Offset = commitRequest->offset(),
141+
.KillReadSession = killReadSession,
142+
.OnlyCheckCommitedToFinish = false,
143+
.ReadSessionId = readSessionId
144+
};
127145
commits.push_back(commit);
128146

129147
Kqp = std::make_unique<TDistributedCommitHelper>(Request().GetDatabaseName().GetOrElse(TString()), ClientId, topic, commits);

0 commit comments

Comments
 (0)