From 2304a6cd452b702ecf284ab8882949244abe86ac Mon Sep 17 00:00:00 2001 From: Bulat Date: Tue, 22 Apr 2025 13:39:32 +0000 Subject: [PATCH 1/8] [C++ SDK] Fixed register of GZIP and ZSTD codecs in topic client (#17192) --- src/client/topic/codecs/codecs.cpp | 12 ++++++++++++ src/client/topic/impl/topic.cpp | 9 --------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/client/topic/codecs/codecs.cpp b/src/client/topic/codecs/codecs.cpp index 14c5eddead..5471325289 100644 --- a/src/client/topic/codecs/codecs.cpp +++ b/src/client/topic/codecs/codecs.cpp @@ -63,4 +63,16 @@ std::unique_ptr TUnsupportedCodec::CreateCoder(TBuffer&, int) con throw yexception() << "use of unsupported codec"; } +class TCommonCodecsProvider { +public: + TCommonCodecsProvider() { + TCodecMap::GetTheCodecMap().Set((uint32_t)ECodec::GZIP, std::make_unique()); + TCodecMap::GetTheCodecMap().Set((uint32_t)ECodec::ZSTD, std::make_unique()); + } +}; + +namespace { +TCommonCodecsProvider COMMON_CODECS_PROVIDER; +} + }; // namespace NYdb::NTopic diff --git a/src/client/topic/impl/topic.cpp b/src/client/topic/impl/topic.cpp index 4f49700af7..1adb2cf846 100644 --- a/src/client/topic/impl/topic.cpp +++ b/src/client/topic/impl/topic.cpp @@ -11,15 +11,6 @@ namespace NYdb::inline V3::NTopic { -class TCommonCodecsProvider { -public: - TCommonCodecsProvider() { - TCodecMap::GetTheCodecMap().Set((uint32_t)ECodec::GZIP, std::make_unique()); - TCodecMap::GetTheCodecMap().Set((uint32_t)ECodec::ZSTD, std::make_unique()); - } -}; -TCommonCodecsProvider COMMON_CODECS_PROVIDER; - TDescribeTopicResult::TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result) : TStatus(std::move(status)) , TopicDescription_(std::move(result)) From 1009e1366e50274c221e88cc3b6588cb94002280 Mon Sep 17 00:00:00 2001 From: Semyon Danilov Date: Tue, 22 Apr 2025 13:41:24 +0000 Subject: [PATCH 2/8] Fix UUID handling in backup/restore in YDB CLI (#17198) --- include/ydb-cpp-sdk/client/value/value.h | 3 +++ src/client/value/value.cpp | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/include/ydb-cpp-sdk/client/value/value.h b/include/ydb-cpp-sdk/client/value/value.h index 3c2787090c..c4260e6015 100644 --- a/include/ydb-cpp-sdk/client/value/value.h +++ b/include/ydb-cpp-sdk/client/value/value.h @@ -539,3 +539,6 @@ class TValueBuilder : public TValueBuilderBase { }; } // namespace NYdb + +template<> +void Out(IOutputStream& o, const NYdb::TUuidValue& value); diff --git a/src/client/value/value.cpp b/src/client/value/value.cpp index 38ad345b26..5faf069151 100644 --- a/src/client/value/value.cpp +++ b/src/client/value/value.cpp @@ -3374,3 +3374,8 @@ TValue TValueBuilder::Build() { } } // namespace NYdb + +template<> +void Out(IOutputStream& o, const NYdb::TUuidValue& value) { + o << value.ToString(); +} From 70d1101cf2bf09e6df38dd100f0691d982b05f99 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Tue, 22 Apr 2025 13:42:17 +0000 Subject: [PATCH 3/8] Fixed flapping tests (#17210) --- .../ut/ut_utils/topic_sdk_test_setup.cpp | 20 +++++++++++++++++-- .../topic/ut/ut_utils/topic_sdk_test_setup.h | 3 ++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index b61ec85322..59dcceca60 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -80,10 +80,26 @@ TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, c return status.GetConsumerDescription(); } -TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset) { +void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId) { TTopicClient client(MakeDriver()); - return client.CommitOffset(path, partitionId, consumerName, offset).GetValueSync(); + TWriteSessionSettings settings; + settings.Path(TEST_TOPIC); + settings.PartitionId(partitionId); + settings.DeduplicationEnabled(false); + auto session = client.CreateSimpleBlockingWriteSession(settings); + + TWriteMessage msg(TStringBuilder() << message); + UNIT_ASSERT(session->Write(std::move(msg))); + + session->Close(TDuration::Seconds(5)); +} + +TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional sessionId) { + TTopicClient client(MakeDriver()); + + TCommitOffsetSettings commitSettings {.ReadSessionId_ = sessionId}; + return client.CommitOffset(path, partitionId, consumerName, offset, commitSettings).GetValueSync(); } diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index 3f8550db2e..dea97097e4 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -24,7 +24,8 @@ class TTopicSdkTestSetup { TTopicDescription DescribeTopic(const TString& path = TString{TEST_TOPIC}); TConsumerDescription DescribeConsumer(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TString{TEST_CONSUMER}); - TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset); + void Write(const std::string& message, ui32 partitionId = 0); + TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional sessionId = std::nullopt); TString GetEndpoint() const; TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const; From 8ecef72568699cb8e6b6d725aef663ee44a9251c Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 22 Apr 2025 13:43:08 +0000 Subject: [PATCH 4/8] Fix tablet-topic transactions with sink (#17258) --- src/api/protos/draft/fq.proto | 57 +++++++++++++++++++++++ src/client/topic/ut/topic_to_table_ut.cpp | 2 + 2 files changed, 59 insertions(+) diff --git a/src/api/protos/draft/fq.proto b/src/api/protos/draft/fq.proto index 84177a909b..1864a04d81 100644 --- a/src/api/protos/draft/fq.proto +++ b/src/api/protos/draft/fq.proto @@ -525,6 +525,61 @@ message Logging { IamAuth auth = 2; } +// TIcebergWarehouse represents settings specific to iceberg warehouse +message IcebergWarehouse { + // Iceberg data located in a S3 storage + message S3 { + // Bucket in a storage + // e.g., s3a://iceberg-bucket + optional string bucket = 1 [(Ydb.length).le = 1024]; + + // Path in a bucket + // e.g., /storage + optional string path = 2 [(Ydb.length).le = 1024]; + } + + oneof payload { + S3 s3 = 1; + } +} + +// TIcebergCatalog represents settings specific to iceberg catalog +message IcebergCatalog { + // Hadoop Iceberg Catalog which is built on top of a storage + message Hadoop { + // Directory where iceberg tables are located. In case of a S3 storage the location + // will be "S3.uri + S3.path + Hadoop.path", e.g., if "Hadoop.path" is equal "warehouse" then + // the final location will be "s3a://iceberg-bucket/storage/warehouse" + optional string directory = 1 [(Ydb.length).le = 1024]; + } + + // Hive Iceberg Catalog which is based on a Hive Metastore + message HiveMetastore { + // Location of a hive metastore + // e.g., thrift://host:9083/ + optional string uri = 1 [(Ydb.length).le = 1024]; + + // Hive metastore database which holds iceberg namespace + optional string database_name = 2 [(Ydb.length).le = 1024]; + } + + oneof payload { + Hadoop hadoop = 1; + HiveMetastore hive_metastore = 2; + } +} + +message Iceberg { + // credentials to access a warehouse + IamAuth warehouse_auth = 2; + + // warehouse config + IcebergWarehouse warehouse = 3; + + // catalog config + IcebergCatalog catalog = 4; +} + message ConnectionSetting { enum ConnectionType { CONNECTION_TYPE_UNSPECIFIED = 0; @@ -537,6 +592,7 @@ message ConnectionSetting { GREENPLUM_CLUSTER = 7; MYSQL_CLUSTER = 8; LOGGING = 9; + ICEBERG = 10; } oneof connection { @@ -549,6 +605,7 @@ message ConnectionSetting { GreenplumCluster greenplum_cluster = 7; MySQLCluster mysql_cluster = 8; Logging logging = 9; + Iceberg iceberg = 10; } } diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 17bf94337b..c7705af45f 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -2980,6 +2980,7 @@ Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5, TFixtureSinks) Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_1, TFixtureSinks) { + return; // https://github.com/ydb-platform/ydb/issues/17271 CreateTopic("topic_A"); CreateColumnTable("/Root/table_A"); @@ -3002,6 +3003,7 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_1, TFixtureSinks) Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_2, TFixtureSinks) { + return; // https://github.com/ydb-platform/ydb/issues/17271 CreateTopic("topic_A"); CreateTopic("topic_B"); From 49a25e340539cabaf42adb2205d6cab7375e4916 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Tue, 22 Apr 2025 13:48:20 +0000 Subject: [PATCH 5/8] Refactoring tests of commit offset (#17299) --- .../ut/ut_utils/topic_sdk_test_setup.cpp | 77 ++++++++++++++++--- .../topic/ut/ut_utils/topic_sdk_test_setup.h | 20 +++-- 2 files changed, 81 insertions(+), 16 deletions(-) diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 59dcceca60..1110c0f038 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -22,11 +22,11 @@ TTopicSdkTestSetup::TTopicSdkTestSetup(const TString& testCaseName, const NKikim } } -void TTopicSdkTestSetup::CreateTopicWithAutoscale(const TString& path, const TString& consumer, size_t partitionCount, size_t maxPartitionCount) { +void TTopicSdkTestSetup::CreateTopicWithAutoscale(const std::string& path, const std::string& consumer, size_t partitionCount, size_t maxPartitionCount) { CreateTopic(path, consumer, partitionCount, maxPartitionCount); } -void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer, size_t partitionCount, std::optional maxPartitionCount) +void TTopicSdkTestSetup::CreateTopic(const std::string& path, const std::string& consumer, size_t partitionCount, std::optional maxPartitionCount) { TTopicClient client(MakeDriver()); @@ -49,10 +49,10 @@ void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consume auto status = client.CreateTopic(path, topics).GetValueSync(); UNIT_ASSERT(status.IsSuccess()); - Server.WaitInit(path); + Server.WaitInit(TString{path}); } -TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path) +TTopicDescription TTopicSdkTestSetup::DescribeTopic(const std::string& path) { TTopicClient client(MakeDriver()); @@ -66,7 +66,7 @@ TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path) return status.GetTopicDescription(); } -TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, const TString& consumer) +TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const std::string& path, const std::string& consumer) { TTopicClient client(MakeDriver()); @@ -80,22 +80,79 @@ TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, c return status.GetConsumerDescription(); } -void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId) { +void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId, const std::optional producer, std::optional seqNo) { TTopicClient client(MakeDriver()); TWriteSessionSettings settings; settings.Path(TEST_TOPIC); settings.PartitionId(partitionId); - settings.DeduplicationEnabled(false); + settings.DeduplicationEnabled(producer.has_value()); + if (producer) { + settings.ProducerId(producer.value()) + .MessageGroupId(producer.value()); + } auto session = client.CreateSimpleBlockingWriteSession(settings); - TWriteMessage msg(TStringBuilder() << message); - UNIT_ASSERT(session->Write(std::move(msg))); + UNIT_ASSERT(session->Write(message, seqNo)); session->Close(TDuration::Seconds(5)); } -TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional sessionId) { +TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function handler, const TDuration timeout) { + TTopicClient client(MakeDriver()); + + auto reader = client.CreateReadSession( + TReadSessionSettings() + .AutoPartitioningSupport(true) + .AppendTopics(TTopicReadSettings(topic)) + .ConsumerName(consumer)); + + TInstant deadlineTime = TInstant::Now() + timeout; + + ReadResult result; + result.Reader = reader; + + bool continueFlag = true; + while(continueFlag && deadlineTime > TInstant::Now()) { + for (auto event : reader->GetEvents(false)) { + if (auto* x = std::get_if(&event)) { + Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush; + if (!handler(*x)) { + continueFlag = false; + break; + } + } else if (auto* x = std::get_if(&event)) { + Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush; + x->Confirm(); + result.StartPartitionSessionEvents.push_back(*x); + } else if (auto* x = std::get_if(&event)) { + Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush; + } else if (auto* x = std::get_if(&event)) { + Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush; + } else if (auto* x = std::get_if(&event)) { + Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush; + x->Confirm(); + } else if (auto* x = std::get_if(&event)) { + Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush; + } else if (auto* x = std::get_if(&event)) { + Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush; + x->Confirm(); + } else if (auto* x = std::get_if(&event)) { + Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush; + } else { + Cerr << "SESSION EVENT unhandled \n"; + } + } + + Sleep(TDuration::MilliSeconds(250)); + } + + result.Timeout = continueFlag; + + return result; +} + +TStatus TTopicSdkTestSetup::Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional sessionId) { TTopicClient client(MakeDriver()); TCommitOffsetSettings commitSettings {.ReadSessionId_ = sessionId}; diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index dea97097e4..fe49f84230 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -16,16 +16,24 @@ class TTopicSdkTestSetup { public: TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true); - void CreateTopic(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, + void CreateTopic(const std::string& path = TEST_TOPIC, const std::string& consumer = TEST_CONSUMER, size_t partitionCount = 1, std::optional maxPartitionCount = std::nullopt); - void CreateTopicWithAutoscale(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1, + void CreateTopicWithAutoscale(const std::string& path = TEST_TOPIC, const std::string& consumer = TEST_CONSUMER, size_t partitionCount = 1, size_t maxPartitionCount = 100); - TTopicDescription DescribeTopic(const TString& path = TString{TEST_TOPIC}); - TConsumerDescription DescribeConsumer(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TString{TEST_CONSUMER}); + TTopicDescription DescribeTopic(const std::string& path = TEST_TOPIC); + TConsumerDescription DescribeConsumer(const std::string& path = TEST_TOPIC, const std::string& consumer = TEST_CONSUMER); - void Write(const std::string& message, ui32 partitionId = 0); - TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional sessionId = std::nullopt); + void Write(const std::string& message, ui32 partitionId = 0, const std::optional producer = std::nullopt, std::optional seqNo = std::nullopt); + + struct ReadResult { + std::shared_ptr Reader; + bool Timeout; + + std::vector StartPartitionSessionEvents; + }; + ReadResult Read(const std::string& topic, const std::string& consumer, std::function handler, const TDuration timeout = TDuration::Seconds(5)); + TStatus Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional sessionId = std::nullopt); TString GetEndpoint() const; TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const; From 17e62f703b18c1103e4c1dd942a2d4a9daf83640 Mon Sep 17 00:00:00 2001 From: qyryq Date: Tue, 22 Apr 2025 13:50:49 +0000 Subject: [PATCH 6/8] Add NFederatedTopic::TDeferredCommit implementation (#17430) --- .../client/federated_topic/federated_topic.h | 8 +- .../impl/federated_deferred_commit.cpp | 138 ++++++++++++++++++ 2 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 src/client/federated_topic/impl/federated_deferred_commit.cpp diff --git a/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h b/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h index 45e9809b63..ab2ca5bbb6 100644 --- a/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h +++ b/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h @@ -21,6 +21,8 @@ using TAsyncDescribeTopicResult = NTopic::TAsyncDescribeTopicResult; struct TFederatedPartitionSession : public TThrRefBase, public TPrintable { using TPtr = TIntrusivePtr; + friend class TDeferredCommit; + public: TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession, std::shared_ptr db, @@ -223,10 +225,10 @@ class TDeferredCommit { void Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent); //! Add offsets range to set. - void Add(const TFederatedPartitionSession& partitionSession, ui64 startOffset, ui64 endOffset); + void Add(const TFederatedPartitionSession::TPtr& partitionSession, ui64 startOffset, ui64 endOffset); //! Add offset to set. - void Add(const TFederatedPartitionSession& partitionSession, ui64 offset); + void Add(const TFederatedPartitionSession::TPtr& partitionSession, ui64 offset); //! Commit all added offsets. void Commit(); @@ -399,7 +401,7 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { //! See description in TFederatedEventHandlers class. FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers); - + //! Read policy settings diff --git a/src/client/federated_topic/impl/federated_deferred_commit.cpp b/src/client/federated_topic/impl/federated_deferred_commit.cpp new file mode 100644 index 0000000000..eb0d711338 --- /dev/null +++ b/src/client/federated_topic/impl/federated_deferred_commit.cpp @@ -0,0 +1,138 @@ +#include +#include + +#include + +namespace NYdb::inline V3::NFederatedTopic { + +std::pair GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) { + if (dataReceivedEvent.HasCompressedMessages()) { + const auto& msg = dataReceivedEvent.GetCompressedMessages()[index]; + return {msg.GetOffset(), msg.GetOffset() + 1}; + } + const auto& msg = dataReceivedEvent.GetMessages()[index]; + return {msg.GetOffset(), msg.GetOffset() + 1}; +} + + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NFederatedTopic::TDeferredCommit + +class TDeferredCommit::TImpl { +public: + + void Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset); + void Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 offset); + + void Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message); + void Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent); + + void Commit(); + +private: + static void Add(const TFederatedPartitionSession::TPtr& partitionStream, TDisjointIntervalTree& offsetSet, ui64 startOffset, ui64 endOffset); + +private: + // Partition stream -> offsets set. + std::unordered_map, THash> Offsets; +}; + +TDeferredCommit::TDeferredCommit() { +} + +TDeferredCommit::TDeferredCommit(TDeferredCommit&&) = default; + +TDeferredCommit& TDeferredCommit::operator=(TDeferredCommit&&) = default; + +TDeferredCommit::~TDeferredCommit() { +} + +#define GET_IMPL() \ + if (!Impl) { \ + Impl = std::make_unique(); \ + } \ + Impl + +void TDeferredCommit::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset) { + GET_IMPL()->Add(partitionStream, startOffset, endOffset); +} + +void TDeferredCommit::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 offset) { + GET_IMPL()->Add(partitionStream, offset); +} + +void TDeferredCommit::Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + GET_IMPL()->Add(message); +} + +void TDeferredCommit::Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent) { + GET_IMPL()->Add(dataReceivedEvent); +} + +#undef GET_IMPL + +void TDeferredCommit::Commit() { + if (Impl) { + Impl->Commit(); + } +} + +void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + Y_ASSERT(message.GetFederatedPartitionSession()); + Add(message.GetFederatedPartitionSession(), message.GetOffset()); +} + +void TDeferredCommit::TImpl::Add(const TFederatedPartitionSession::TPtr& partitionStream, TDisjointIntervalTree& offsetSet, ui64 startOffset, ui64 endOffset) { + if (offsetSet.Intersects(startOffset, endOffset)) { + ThrowFatalError(TStringBuilder() << "Commit set already has some offsets from half-interval [" + << startOffset << "; " << endOffset + << ") for partition stream with id " << partitionStream->GetPartitionSessionId()); + } else { + offsetSet.InsertInterval(startOffset, endOffset); + } +} + +void TDeferredCommit::TImpl::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset) { + Y_ASSERT(partitionStream); + Add(partitionStream, Offsets[partitionStream], startOffset, endOffset); +} + +void TDeferredCommit::TImpl::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 offset) { + Y_ASSERT(partitionStream); + auto& offsetSet = Offsets[partitionStream]; + if (offsetSet.Has(offset)) { + ThrowFatalError(TStringBuilder() << "Commit set already has offset " << offset + << " for partition stream with id " << partitionStream->GetPartitionSessionId()); + } else { + offsetSet.Insert(offset); + } +} + +void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent) { + const TFederatedPartitionSession::TPtr& partitionStream = dataReceivedEvent.GetFederatedPartitionSession(); + Y_ASSERT(partitionStream); + auto& offsetSet = Offsets[partitionStream]; + auto [startOffset, endOffset] = GetMessageOffsetRange(dataReceivedEvent, 0); + for (size_t i = 1; i < dataReceivedEvent.GetMessagesCount(); ++i) { + auto msgOffsetRange = GetMessageOffsetRange(dataReceivedEvent, i); + if (msgOffsetRange.first == endOffset) { + endOffset= msgOffsetRange.second; + } else { + Add(partitionStream, offsetSet, startOffset, endOffset); + startOffset = msgOffsetRange.first; + endOffset = msgOffsetRange.second; + } + } + Add(partitionStream, offsetSet, startOffset, endOffset); +} + +void TDeferredCommit::TImpl::Commit() { + for (auto&& [partitionStream, offsetRanges] : Offsets) { + for (auto&& [startOffset, endOffset] : offsetRanges) { + static_cast*>(partitionStream.Get()->PartitionSession.Get())->Commit(startOffset, endOffset); + } + } + Offsets.clear(); +} + +} From ac206ef3816f632e3b86156e5a4d469134e1a719 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Tue, 22 Apr 2025 13:51:56 +0000 Subject: [PATCH 7/8] Fixed errors of the distributed commit offset to the partition (#17423) --- .../ut/ut_utils/topic_sdk_test_setup.cpp | 20 +++++++++++++------ .../topic/ut/ut_utils/topic_sdk_test_setup.h | 4 +++- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 1110c0f038..0055dd8543 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -98,14 +98,22 @@ void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId, con session->Close(TDuration::Seconds(5)); } -TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function handler, const TDuration timeout) { +TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, + std::function handler, + std::optional partition, const TDuration timeout) { TTopicClient client(MakeDriver()); - auto reader = client.CreateReadSession( - TReadSessionSettings() - .AutoPartitioningSupport(true) - .AppendTopics(TTopicReadSettings(topic)) - .ConsumerName(consumer)); + auto topicSettings = TTopicReadSettings(topic); + if (partition) { + topicSettings.AppendPartitionIds(partition.value()); + } + + auto settins = TReadSessionSettings() + .AutoPartitioningSupport(true) + .AppendTopics(topicSettings) + .ConsumerName(consumer); + + auto reader = client.CreateReadSession(settins); TInstant deadlineTime = TInstant::Now() + timeout; diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index fe49f84230..b63c9561af 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -32,7 +32,9 @@ class TTopicSdkTestSetup { std::vector StartPartitionSessionEvents; }; - ReadResult Read(const std::string& topic, const std::string& consumer, std::function handler, const TDuration timeout = TDuration::Seconds(5)); + ReadResult Read(const std::string& topic, const std::string& consumer, + std::function handler, + std::optional partition = std::nullopt, const TDuration timeout = TDuration::Seconds(5)); TStatus Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional sessionId = std::nullopt); TString GetEndpoint() const; From fcdfa437baa2e30ab2aac36b9476e20fda6f786c Mon Sep 17 00:00:00 2001 From: Zarina Tlupova <96023685+zarinatlupova@users.noreply.github.com> Date: Tue, 13 May 2025 16:53:06 +0000 Subject: [PATCH 8/8] [C++ SDK] Add traceparent to TRequestSettings (#17561) --- include/ydb-cpp-sdk/client/types/request_settings.h | 3 +++ src/client/impl/ydb_internal/rpc_request_settings/settings.h | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/include/ydb-cpp-sdk/client/types/request_settings.h b/include/ydb-cpp-sdk/client/types/request_settings.h index 2f2d4dca8a..8d481726e3 100644 --- a/include/ydb-cpp-sdk/client/types/request_settings.h +++ b/include/ydb-cpp-sdk/client/types/request_settings.h @@ -8,6 +8,7 @@ #include #include +#include namespace NYdb::inline V3 { @@ -20,6 +21,7 @@ struct TRequestSettings { FLUENT_SETTING(std::string, RequestType); FLUENT_SETTING(THeader, Header); FLUENT_SETTING(TDuration, ClientTimeout); + FLUENT_SETTING(std::string, TraceParent); TRequestSettings() = default; @@ -29,6 +31,7 @@ struct TRequestSettings { , RequestType_(other.RequestType_) , Header_(other.Header_) , ClientTimeout_(other.ClientTimeout_) + , TraceParent_(other.TraceParent_) {} }; diff --git a/src/client/impl/ydb_internal/rpc_request_settings/settings.h b/src/client/impl/ydb_internal/rpc_request_settings/settings.h index 515d20f642..6f75764d18 100644 --- a/src/client/impl/ydb_internal/rpc_request_settings/settings.h +++ b/src/client/impl/ydb_internal/rpc_request_settings/settings.h @@ -24,6 +24,11 @@ struct TRpcRequestSettings { rpcSettings.TraceId = settings.TraceId_; rpcSettings.RequestType = settings.RequestType_; rpcSettings.Header = settings.Header_; + + if (!settings.TraceParent_.empty()) { + rpcSettings.Header.emplace_back("traceparent", settings.TraceParent_); + } + rpcSettings.PreferredEndpoint = preferredEndpoint; rpcSettings.EndpointPolicy = endpointPolicy; rpcSettings.UseAuth = true;