Skip to content

Commit ae4c39b

Browse files
committed
fix
1 parent cdd697c commit ae4c39b

File tree

2 files changed

+37
-18
lines changed

2 files changed

+37
-18
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2360,16 +2360,6 @@ std::pair<bool, bool> TPartition::ValidatePartitionOperation(const NKikimrPQ::TP
23602360

23612361
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
23622362

2363-
PQ_LOG_D("CommitOperation Partition " << Partition <<
2364-
" Consumer '" << consumer << "'" <<
2365-
" RequestSessionId '" << operation.GetReadSessionId() <<
2366-
"' CurrentSessionId '" << userInfo.Session <<
2367-
"' OffsetBegin " << operation.GetCommitOffsetsBegin() <<
2368-
"' OffsetEnd " << operation.GetCommitOffsetsEnd() <<
2369-
" OnlyCheckCommitedToFinish " << operation.GetOnlyCheckCommitedToFinish() <<
2370-
" KillReadSession " << operation.GetKillReadSession()
2371-
);
2372-
23732363
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
23742364
PQ_LOG_D("Partition " << Partition <<
23752365
" Consumer '" << consumer << "'" <<
@@ -2611,9 +2601,6 @@ void TPartition::CommitTransaction(TSimpleSharedPtr<TTransaction>& t)
26112601
Y_ABORT_UNLESS(t->Predicate.Defined() && *t->Predicate);
26122602

26132603
for (auto& operation : t->Tx->Operations) {
2614-
2615-
Cerr << ">>>>> CommitTransaction " << Endl << Flush;
2616-
26172604
if (operation.GetOnlyCheckCommitedToFinish()) {
26182605
continue;
26192606
}
@@ -2942,7 +2929,7 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
29422929
return EProcessResult::ContinueDrop;
29432930
}
29442931

2945-
auto [r, _] = ValidatePartitionOperation(operation);
2932+
auto [r, real] = ValidatePartitionOperation(operation);
29462933
if (!r) {
29472934
ScheduleReplyPropose(tx,
29482935
NKikimrPQ::TEvProposeTransactionResult::BAD_REQUEST,
@@ -2951,7 +2938,9 @@ TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TE
29512938
return EProcessResult::ContinueDrop;
29522939
}
29532940

2954-
consumers.insert(user);
2941+
if (real) {
2942+
consumers.insert(user);
2943+
}
29552944
}
29562945
SetOffsetAffectedConsumers.insert(consumers.begin(), consumers.end());
29572946
WriteKeysSizeEstimate += consumers.size();
@@ -2975,6 +2964,10 @@ void TPartition::ExecImmediateTx(TTransaction& t)
29752964
return;
29762965
}
29772966
for (const auto& operation : record.GetData().GetOperations()) {
2967+
if (operation.GetOnlyCheckCommitedToFinish()) {
2968+
continue;
2969+
}
2970+
29782971
if (!operation.HasCommitOffsetsBegin() || !operation.HasCommitOffsetsEnd() || !operation.HasConsumer()) {
29792972
continue; //Write operation - handled separately via WriteInfo
29802973
}

ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
207207
}
208208
}
209209

210-
Y_UNIT_TEST(Commit_WithSession_ParentNotFinished) {
210+
Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_SameSession) {
211211
TTopicSdkTestSetup setup = CreateSetup();
212212
PrepareAutopartitionedTopic(setup);
213213

@@ -224,8 +224,8 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
224224
}
225225

226226
{
227-
auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto&) {
228-
return false;
227+
auto r = setup.Read(TEST_TOPIC, TEST_CONSUMER, [&](auto& x) {
228+
return x.GetMessages().back().GetData() != "message-3-3";
229229
});
230230

231231
// Commit parent to middle
@@ -240,6 +240,32 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
240240
}
241241
}
242242

243+
Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_OtherSession) {
244+
TTopicSdkTestSetup setup = CreateSetup();
245+
PrepareAutopartitionedTopic(setup);
246+
TTopicClient client(setup.MakeDriver());
247+
248+
auto createReadSession = [&](size_t partitionId) {
249+
return client.CreateReadSession(
250+
TReadSessionSettings()
251+
.AutoPartitioningSupport(true)
252+
.AppendTopics(TTopicReadSettings(TEST_TOPIC).AppendPartitionIds(partitionId))
253+
.ConsumerName(TEST_CONSUMER));
254+
};
255+
256+
auto session0 = createReadSession(0);
257+
auto session1 = createReadSession(1);
258+
259+
auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, session1->GetSessionId());
260+
UNIT_ASSERT(!result.IsSuccess());
261+
262+
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0));
263+
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 1));
264+
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
265+
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
266+
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
267+
}
268+
243269
Y_UNIT_TEST(Commit_WithSession_ToPastParentPartition) {
244270
TTopicSdkTestSetup setup = CreateSetup();
245271
PrepareAutopartitionedTopic(setup);

0 commit comments

Comments
 (0)