Skip to content

YDB C++ SDK Import 9 #504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions include/ydb-cpp-sdk/client/federated_topic/federated_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ using TAsyncDescribeTopicResult = NTopic::TAsyncDescribeTopicResult;
struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFederatedPartitionSession> {
using TPtr = TIntrusivePtr<TFederatedPartitionSession>;

friend class TDeferredCommit;

public:
TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession,
std::shared_ptr<TDbInfo> db,
Expand Down Expand Up @@ -223,10 +225,10 @@ class TDeferredCommit {
void Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent);

//! Add offsets range to set.
void Add(const TFederatedPartitionSession& partitionSession, ui64 startOffset, ui64 endOffset);
void Add(const TFederatedPartitionSession::TPtr& partitionSession, ui64 startOffset, ui64 endOffset);

//! Add offset to set.
void Add(const TFederatedPartitionSession& partitionSession, ui64 offset);
void Add(const TFederatedPartitionSession::TPtr& partitionSession, ui64 offset);

//! Commit all added offsets.
void Commit();
Expand Down Expand Up @@ -399,7 +401,7 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
//! See description in TFederatedEventHandlers class.
FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers);



//! Read policy settings

Expand Down
3 changes: 3 additions & 0 deletions include/ydb-cpp-sdk/client/types/request_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <vector>
#include <utility>
#include <string>

namespace NYdb::inline V3 {

Expand All @@ -20,6 +21,7 @@ struct TRequestSettings {
FLUENT_SETTING(std::string, RequestType);
FLUENT_SETTING(THeader, Header);
FLUENT_SETTING(TDuration, ClientTimeout);
FLUENT_SETTING(std::string, TraceParent);

TRequestSettings() = default;

Expand All @@ -29,6 +31,7 @@ struct TRequestSettings {
, RequestType_(other.RequestType_)
, Header_(other.Header_)
, ClientTimeout_(other.ClientTimeout_)
, TraceParent_(other.TraceParent_)
{}
};

Expand Down
3 changes: 3 additions & 0 deletions include/ydb-cpp-sdk/client/value/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,6 @@ class TValueBuilder : public TValueBuilderBase<TValueBuilder> {
};

} // namespace NYdb

template<>
void Out<NYdb::TUuidValue>(IOutputStream& o, const NYdb::TUuidValue& value);
57 changes: 57 additions & 0 deletions src/api/protos/draft/fq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,61 @@ message Logging {
IamAuth auth = 2;
}

// TIcebergWarehouse represents settings specific to iceberg warehouse
message IcebergWarehouse {
// Iceberg data located in a S3 storage
message S3 {
// Bucket in a storage
// e.g., s3a://iceberg-bucket
optional string bucket = 1 [(Ydb.length).le = 1024];

// Path in a bucket
// e.g., /storage
optional string path = 2 [(Ydb.length).le = 1024];
}

oneof payload {
S3 s3 = 1;
}
}

// TIcebergCatalog represents settings specific to iceberg catalog
message IcebergCatalog {
// Hadoop Iceberg Catalog which is built on top of a storage
message Hadoop {
// Directory where iceberg tables are located. In case of a S3 storage the location
// will be "S3.uri + S3.path + Hadoop.path", e.g., if "Hadoop.path" is equal "warehouse" then
// the final location will be "s3a://iceberg-bucket/storage/warehouse"
optional string directory = 1 [(Ydb.length).le = 1024];
}

// Hive Iceberg Catalog which is based on a Hive Metastore
message HiveMetastore {
// Location of a hive metastore
// e.g., thrift://host:9083/
optional string uri = 1 [(Ydb.length).le = 1024];

// Hive metastore database which holds iceberg namespace
optional string database_name = 2 [(Ydb.length).le = 1024];
}

oneof payload {
Hadoop hadoop = 1;
HiveMetastore hive_metastore = 2;
}
}

message Iceberg {
// credentials to access a warehouse
IamAuth warehouse_auth = 2;

// warehouse config
IcebergWarehouse warehouse = 3;

// catalog config
IcebergCatalog catalog = 4;
}

message ConnectionSetting {
enum ConnectionType {
CONNECTION_TYPE_UNSPECIFIED = 0;
Expand All @@ -537,6 +592,7 @@ message ConnectionSetting {
GREENPLUM_CLUSTER = 7;
MYSQL_CLUSTER = 8;
LOGGING = 9;
ICEBERG = 10;
}

oneof connection {
Expand All @@ -549,6 +605,7 @@ message ConnectionSetting {
GreenplumCluster greenplum_cluster = 7;
MySQLCluster mysql_cluster = 8;
Logging logging = 9;
Iceberg iceberg = 10;
}
}

Expand Down
138 changes: 138 additions & 0 deletions src/client/federated_topic/impl/federated_deferred_commit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#include <ydb-cpp-sdk/client/federated_topic/federated_topic.h>
#include <src/client/topic/impl/read_session_impl.ipp>

#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>

namespace NYdb::inline V3::NFederatedTopic {

std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) {
if (dataReceivedEvent.HasCompressedMessages()) {
const auto& msg = dataReceivedEvent.GetCompressedMessages()[index];
return {msg.GetOffset(), msg.GetOffset() + 1};
}
const auto& msg = dataReceivedEvent.GetMessages()[index];
return {msg.GetOffset(), msg.GetOffset() + 1};
}


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NFederatedTopic::TDeferredCommit

class TDeferredCommit::TImpl {
public:

void Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset);
void Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 offset);

void Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message);
void Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent);

void Commit();

private:
static void Add(const TFederatedPartitionSession::TPtr& partitionStream, TDisjointIntervalTree<ui64>& offsetSet, ui64 startOffset, ui64 endOffset);

private:
// Partition stream -> offsets set.
std::unordered_map<TFederatedPartitionSession::TPtr, TDisjointIntervalTree<ui64>, THash<TFederatedPartitionSession::TPtr>> Offsets;
};

TDeferredCommit::TDeferredCommit() {
}

TDeferredCommit::TDeferredCommit(TDeferredCommit&&) = default;

TDeferredCommit& TDeferredCommit::operator=(TDeferredCommit&&) = default;

TDeferredCommit::~TDeferredCommit() {
}

#define GET_IMPL() \
if (!Impl) { \
Impl = std::make_unique<TImpl>(); \
} \
Impl

void TDeferredCommit::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset) {
GET_IMPL()->Add(partitionStream, startOffset, endOffset);
}

void TDeferredCommit::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 offset) {
GET_IMPL()->Add(partitionStream, offset);
}

void TDeferredCommit::Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
GET_IMPL()->Add(message);
}

void TDeferredCommit::Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent) {
GET_IMPL()->Add(dataReceivedEvent);
}

#undef GET_IMPL

void TDeferredCommit::Commit() {
if (Impl) {
Impl->Commit();
}
}

void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
Y_ASSERT(message.GetFederatedPartitionSession());
Add(message.GetFederatedPartitionSession(), message.GetOffset());
}

void TDeferredCommit::TImpl::Add(const TFederatedPartitionSession::TPtr& partitionStream, TDisjointIntervalTree<ui64>& offsetSet, ui64 startOffset, ui64 endOffset) {
if (offsetSet.Intersects(startOffset, endOffset)) {
ThrowFatalError(TStringBuilder() << "Commit set already has some offsets from half-interval ["
<< startOffset << "; " << endOffset
<< ") for partition stream with id " << partitionStream->GetPartitionSessionId());
} else {
offsetSet.InsertInterval(startOffset, endOffset);
}
}

void TDeferredCommit::TImpl::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 startOffset, ui64 endOffset) {
Y_ASSERT(partitionStream);
Add(partitionStream, Offsets[partitionStream], startOffset, endOffset);
}

void TDeferredCommit::TImpl::Add(const TFederatedPartitionSession::TPtr& partitionStream, ui64 offset) {
Y_ASSERT(partitionStream);
auto& offsetSet = Offsets[partitionStream];
if (offsetSet.Has(offset)) {
ThrowFatalError(TStringBuilder() << "Commit set already has offset " << offset
<< " for partition stream with id " << partitionStream->GetPartitionSessionId());
} else {
offsetSet.Insert(offset);
}
}

void TDeferredCommit::TImpl::Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent) {
const TFederatedPartitionSession::TPtr& partitionStream = dataReceivedEvent.GetFederatedPartitionSession();
Y_ASSERT(partitionStream);
auto& offsetSet = Offsets[partitionStream];
auto [startOffset, endOffset] = GetMessageOffsetRange(dataReceivedEvent, 0);
for (size_t i = 1; i < dataReceivedEvent.GetMessagesCount(); ++i) {
auto msgOffsetRange = GetMessageOffsetRange(dataReceivedEvent, i);
if (msgOffsetRange.first == endOffset) {
endOffset= msgOffsetRange.second;
} else {
Add(partitionStream, offsetSet, startOffset, endOffset);
startOffset = msgOffsetRange.first;
endOffset = msgOffsetRange.second;
}
}
Add(partitionStream, offsetSet, startOffset, endOffset);
}

void TDeferredCommit::TImpl::Commit() {
for (auto&& [partitionStream, offsetRanges] : Offsets) {
for (auto&& [startOffset, endOffset] : offsetRanges) {
static_cast<NTopic::TPartitionStreamImpl<false>*>(partitionStream.Get()->PartitionSession.Get())->Commit(startOffset, endOffset);
}
}
Offsets.clear();
}

}
5 changes: 5 additions & 0 deletions src/client/impl/ydb_internal/rpc_request_settings/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ struct TRpcRequestSettings {
rpcSettings.TraceId = settings.TraceId_;
rpcSettings.RequestType = settings.RequestType_;
rpcSettings.Header = settings.Header_;

if (!settings.TraceParent_.empty()) {
rpcSettings.Header.emplace_back("traceparent", settings.TraceParent_);
}

rpcSettings.PreferredEndpoint = preferredEndpoint;
rpcSettings.EndpointPolicy = endpointPolicy;
rpcSettings.UseAuth = true;
Expand Down
12 changes: 12 additions & 0 deletions src/client/topic/codecs/codecs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,16 @@ std::unique_ptr<IOutputStream> TUnsupportedCodec::CreateCoder(TBuffer&, int) con
throw yexception() << "use of unsupported codec";
}

class TCommonCodecsProvider {
public:
TCommonCodecsProvider() {
TCodecMap::GetTheCodecMap().Set((uint32_t)ECodec::GZIP, std::make_unique<TGzipCodec>());
TCodecMap::GetTheCodecMap().Set((uint32_t)ECodec::ZSTD, std::make_unique<TZstdCodec>());
}
};

namespace {
TCommonCodecsProvider COMMON_CODECS_PROVIDER;
}

}; // namespace NYdb::NTopic
9 changes: 0 additions & 9 deletions src/client/topic/impl/topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,6 @@

namespace NYdb::inline V3::NTopic {

class TCommonCodecsProvider {
public:
TCommonCodecsProvider() {
TCodecMap::GetTheCodecMap().Set((uint32_t)ECodec::GZIP, std::make_unique<TGzipCodec>());
TCodecMap::GetTheCodecMap().Set((uint32_t)ECodec::ZSTD, std::make_unique<TZstdCodec>());
}
};
TCommonCodecsProvider COMMON_CODECS_PROVIDER;

TDescribeTopicResult::TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result)
: TStatus(std::move(status))
, TopicDescription_(std::move(result))
Expand Down
2 changes: 2 additions & 0 deletions src/client/topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2980,6 +2980,7 @@ Y_UNIT_TEST_F(Sinks_Oltp_WriteToTopicAndTable_5, TFixtureSinks)

Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_1, TFixtureSinks)
{
return; // https://github.com/ydb-platform/ydb/issues/17271
CreateTopic("topic_A");
CreateColumnTable("/Root/table_A");

Expand All @@ -3002,6 +3003,7 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_1, TFixtureSinks)

Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_2, TFixtureSinks)
{
return; // https://github.com/ydb-platform/ydb/issues/17271
CreateTopic("topic_A");
CreateTopic("topic_B");

Expand Down
Loading
Loading