diff --git a/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h b/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h index ada271cf8f..5e370bcff0 100644 --- a/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h +++ b/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h @@ -176,6 +176,8 @@ struct TCreateResourceSettings : public TOperationRequestSettings , public THierarchicalDrrSettings { + using TSelf = TCreateResourceSettings; + TCreateResourceSettings() = default; TCreateResourceSettings(const Ydb::RateLimiter::CreateResourceRequest&); @@ -187,6 +189,8 @@ struct TAlterResourceSettings : public TOperationRequestSettings , public THierarchicalDrrSettings { + using TSelf = TAlterResourceSettings; + FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig); }; diff --git a/src/client/topic/CMakeLists.txt b/src/client/topic/CMakeLists.txt index e18f83ff29..51c651d2a2 100644 --- a/src/client/topic/CMakeLists.txt +++ b/src/client/topic/CMakeLists.txt @@ -20,7 +20,6 @@ target_link_libraries(client-ydb_topic PUBLIC ) target_sources(client-ydb_topic PRIVATE - proto_accessor.cpp out.cpp ) diff --git a/src/client/topic/impl/CMakeLists.txt b/src/client/topic/impl/CMakeLists.txt index 415b39d780..63df082c5d 100644 --- a/src/client/topic/impl/CMakeLists.txt +++ b/src/client/topic/impl/CMakeLists.txt @@ -29,6 +29,7 @@ target_sources(client-ydb_topic-impl deferred_commit.cpp event_handlers.cpp offsets_collector.cpp + proto_accessor.cpp read_session_event.cpp read_session.cpp topic_impl.cpp diff --git a/src/client/topic/proto_accessor.cpp b/src/client/topic/impl/proto_accessor.cpp similarity index 100% rename from src/client/topic/proto_accessor.cpp rename to src/client/topic/impl/proto_accessor.cpp diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index bf2b2e7709..82db5e2327 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -73,6 +73,9 @@ void TPartitionStreamImpl::RequestStatus() { template void TPartitionStreamImpl::ConfirmCreate(std::optional readOffset, std::optional commitOffset) { if (auto sessionShared = CbContext->LockShared()) { + if (commitOffset.has_value()) { + SetFirstNotReadOffset(commitOffset.value()); + } sessionShared->ConfirmPartitionStreamCreate(this, readOffset, commitOffset); } } diff --git a/src/client/topic/ut/basic_usage_ut.cpp b/src/client/topic/ut/basic_usage_ut.cpp index 919438c0c9..cf8792f8d6 100644 --- a/src/client/topic/ut/basic_usage_ut.cpp +++ b/src/client/topic/ut/basic_usage_ut.cpp @@ -3,7 +3,7 @@ #include #include - + #include #include @@ -661,6 +661,64 @@ Y_UNIT_TEST_SUITE(BasicUsage) { Sleep(TDuration::Seconds(5)); } + Y_UNIT_TEST(ConfirmPartitionSessionWithCommitOffset) { + // TStartPartitionSessionEvent::Confirm(readOffset, commitOffset) should work, + // if commitOffset passed to Confirm is greater than the offset committed previously by the consumer. + // https://st.yandex-team.ru/KIKIMR-23015 + + auto setup = TTopicSdkTestSetup(TEST_CASE_NAME); + + { + // Write 2 messages: + auto settings = NTopic::TWriteSessionSettings() + .Path(setup.GetTopicPath()) + .MessageGroupId(TEST_MESSAGE_GROUP_ID) + .ProducerId(TEST_MESSAGE_GROUP_ID); + auto client = setup.MakeClient(); + auto writer = client.CreateSimpleBlockingWriteSession(settings); + writer->Write("message"); + writer->Write("message"); + writer->Close(); + } + + { + // Read messages: + auto settings = NTopic::TReadSessionSettings() + .ConsumerName(TEST_CONSUMER) + .AppendTopics(std::string(setup.GetTopicPath())); + + auto client = setup.MakeClient(); + auto reader = client.CreateReadSession(settings); + + { + // Start partition session and request to read from offset 1 and commit offset 1: + auto event = reader->GetEvent(true); + UNIT_ASSERT(event.has_value()); + UNIT_ASSERT(std::holds_alternative(*event)); + auto& startPartitionSession = std::get(*event); + startPartitionSession.Confirm(/*readOffset=*/ 1, /*commitOffset=*/ 1); + } + + { + // Receive a message with offset 1 and commit it: + auto event = reader->GetEvent(true); + UNIT_ASSERT(event.has_value()); + UNIT_ASSERT(std::holds_alternative(*event)); + auto& dataReceived = std::get(*event); + + // Here we should commit range [1, 2), not [0, 2): + dataReceived.Commit(); + } + + { + // And then get a TCommitOffsetAcknowledgementEvent: + auto event = reader->GetEvent(true); + UNIT_ASSERT(event.has_value()); + UNIT_ASSERT(std::holds_alternative(*event)); + } + } + } + Y_UNIT_TEST(ConflictingWrites) { TTopicSdkTestSetup setup(TEST_CASE_NAME);