Skip to content

Commit 230ac03

Browse files
committed
fix
1 parent 7586cde commit 230ac03

File tree

4 files changed

+45
-18
lines changed

4 files changed

+45
-18
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2361,6 +2361,9 @@ std::pair<TString, bool> TPartition::ValidatePartitionOperation(const NKikimrPQ:
23612361
TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
23622362

23632363
if (!operation.GetReadSessionId().empty() && operation.GetReadSessionId() != userInfo.Session) {
2364+
if (!IsActive() && operation.GetCommitOffsetsEnd() >= EndOffset && userInfo.Offset == i64(EndOffset)) {
2365+
return {"", false};
2366+
}
23642367
PQ_LOG_D("Partition " << Partition <<
23652368
" Consumer '" << consumer << "'" <<
23662369
" Bad request (session already dead) " <<

ydb/core/persqueue/ut/ut_with_sdk/commitoffset_ut.cpp

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -245,18 +245,11 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
245245
PrepareAutopartitionedTopic(setup);
246246
TTopicClient client(setup.MakeDriver());
247247

248-
auto createReadSession = [&](size_t partitionId) {
249-
return client.CreateReadSession(
250-
TReadSessionSettings()
251-
.AutoPartitioningSupport(true)
252-
.AppendTopics(TTopicReadSettings(TEST_TOPIC).AppendPartitionIds(partitionId))
253-
.ConsumerName(TEST_CONSUMER));
254-
};
255-
256-
auto session0 = createReadSession(0);
257-
auto session1 = createReadSession(1);
248+
auto r0 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 0);
249+
auto r1 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 1);
258250

259-
auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1, session1->GetSessionId());
251+
auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1,
252+
r1.StartPartitionSessionEvents.back().GetPartitionSession()->GetReadSessionId());
260253
UNIT_ASSERT(!result.IsSuccess());
261254

262255
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 0));
@@ -266,6 +259,27 @@ Y_UNIT_TEST_SUITE(CommitOffset) {
266259
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
267260
}
268261

262+
Y_UNIT_TEST(Commit_WithSession_ParentNotFinished_OtherSession_ParentCommittedToEnd) {
263+
TTopicSdkTestSetup setup = CreateSetup();
264+
PrepareAutopartitionedTopic(setup);
265+
TTopicClient client(setup.MakeDriver());
266+
267+
setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 3);
268+
269+
auto r0 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 0);
270+
auto r1 = setup.Read(TEST_TOPIC, TEST_CONSUMER, [](auto&) { return false; }, 1);
271+
272+
auto result = setup.Commit(TEST_TOPIC, TEST_CONSUMER, 1, 1,
273+
r1.StartPartitionSessionEvents.back().GetPartitionSession()->GetReadSessionId());
274+
UNIT_ASSERT(result.IsSuccess());
275+
276+
UNIT_ASSERT_VALUES_EQUAL(3, GetCommittedOffset(setup, 0));
277+
UNIT_ASSERT_VALUES_EQUAL(1, GetCommittedOffset(setup, 1));
278+
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 2));
279+
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 3));
280+
UNIT_ASSERT_VALUES_EQUAL(0, GetCommittedOffset(setup, 4));
281+
}
282+
269283
Y_UNIT_TEST(Commit_WithSession_ToPastParentPartition) {
270284
TTopicSdkTestSetup setup = CreateSetup();
271285
PrepareAutopartitionedTopic(setup);

ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,22 @@ void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId, con
9898
session->Close(TDuration::Seconds(5));
9999
}
100100

101-
TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler, const TDuration timeout) {
101+
TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer,
102+
std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler,
103+
std::optional<size_t> partition, const TDuration timeout) {
102104
TTopicClient client(MakeDriver());
103105

104-
auto reader = client.CreateReadSession(
105-
TReadSessionSettings()
106-
.AutoPartitioningSupport(true)
107-
.AppendTopics(TTopicReadSettings(topic))
108-
.ConsumerName(consumer));
106+
auto topicSettings = TTopicReadSettings(topic);
107+
if (partition) {
108+
topicSettings.AppendPartitionIds(partition.value());
109+
}
110+
111+
auto settins = TReadSessionSettings()
112+
.AutoPartitioningSupport(true)
113+
.AppendTopics(topicSettings)
114+
.ConsumerName(consumer);
115+
116+
auto reader = client.CreateReadSession(settins);
109117

110118
TInstant deadlineTime = TInstant::Now() + timeout;
111119

ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ class TTopicSdkTestSetup {
3232

3333
std::vector<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent> StartPartitionSessionEvents;
3434
};
35-
ReadResult Read(const std::string& topic, const std::string& consumer, std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler, const TDuration timeout = TDuration::Seconds(5));
35+
ReadResult Read(const std::string& topic, const std::string& consumer,
36+
std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler,
37+
std::optional<size_t> partition = std::nullopt, const TDuration timeout = TDuration::Seconds(5));
3638
TStatus Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId = std::nullopt);
3739

3840
TString GetEndpoint() const;

0 commit comments

Comments
 (0)