From 627e7a551b1489988466a8dd701f01f54431fb76 Mon Sep 17 00:00:00 2001 From: Daniil Demin Date: Thu, 13 Feb 2025 15:19:19 +0000 Subject: [PATCH] external data source / tables description: convert private protobuf to public --- .../rpc_describe_external_data_source.cpp | 81 ++++++++++++++++++- .../rpc_describe_external_table.cpp | 78 +++++++++++++++++- ydb/core/ydb_convert/table_description.cpp | 6 +- ydb/core/ydb_convert/table_description.h | 3 +- 4 files changed, 164 insertions(+), 4 deletions(-) diff --git a/ydb/core/grpc_services/rpc_describe_external_data_source.cpp b/ydb/core/grpc_services/rpc_describe_external_data_source.cpp index caa5814f49cd..c68be2fd9d58 100644 --- a/ydb/core/grpc_services/rpc_describe_external_data_source.cpp +++ b/ydb/core/grpc_services/rpc_describe_external_data_source.cpp @@ -2,13 +2,92 @@ #include "service_table.h" #include +#include #include namespace NKikimr::NGRpcService { using namespace NActors; +using namespace NKikimrSchemeOp; using namespace NYql; +namespace { + +using TProperties = google::protobuf::Map; + +void Convert(const TServiceAccountAuth& in, TProperties& out) { + out["SERVICE_ACCOUNT_ID"] = in.GetId(); + out["SERVICE_ACCOUNT_SECRET_NAME"] = in.GetSecretName(); +} + +void Convert(const TBasic& in, TProperties& out) { + out["LOGIN"] = in.GetLogin(); + out["PASSWORD_SECRET_NAME"] = in.GetPasswordSecretName(); +} + +void Convert(const TMdbBasic& in, TProperties& out) { + out["SERVICE_ACCOUNT_ID"] = in.GetServiceAccountId(); + out["SERVICE_ACCOUNT_SECRET_NAME"] = in.GetServiceAccountSecretName(); + out["LOGIN"] = in.GetLogin(); + out["PASSWORD_SECRET_NAME"] = in.GetPasswordSecretName(); +} + +void Convert(const TAws& in, TProperties& out) { + out["AWS_ACCESS_KEY_ID_SECRET_NAME"] = in.GetAwsAccessKeyIdSecretName(); + out["AWS_SECRET_ACCESS_KEY_SECRET_NAME"] = in.GetAwsSecretAccessKeySecretName(); + out["AWS_REGION"] = in.GetAwsRegion(); +} + +void Convert(const TToken& in, TProperties& out) { + out["TOKEN_SECRET_NAME"] = in.GetTokenSecretName(); +} + +void Convert(const TAuth& in, TProperties& out) { + auto& authMethod = out["AUTH_METHOD"]; + + switch (in.GetIdentityCase()) { + case TAuth::kNone: + authMethod = "NONE"; + return; + case TAuth::kServiceAccount: + authMethod = "SERVICE_ACCOUNT"; + Convert(in.GetServiceAccount(), out); + return; + case TAuth::kBasic: + authMethod = "BASIC"; + Convert(in.GetBasic(), out); + return; + case TAuth::kMdbBasic: + authMethod = "MDB_BASIC"; + Convert(in.GetMdbBasic(), out); + return; + case TAuth::kAws: + authMethod = "AWS"; + Convert(in.GetAws(), out); + return; + case TAuth::kToken: + authMethod = "TOKEN"; + Convert(in.GetToken(), out); + return; + case TAuth::IDENTITY_NOT_SET: + return; + } +} + +Ydb::Table::DescribeExternalDataSourceResult Convert(const TDirEntry& inSelf, const TExternalDataSourceDescription& inDesc) { + Ydb::Table::DescribeExternalDataSourceResult out; + ConvertDirectoryEntry(inSelf, out.mutable_self(), true); + + out.set_source_type(inDesc.GetSourceType()); + out.set_location(inDesc.GetLocation()); + auto& properties = *out.mutable_properties(); + properties = inDesc.GetProperties().GetProperties(); + Convert(inDesc.GetAuth(), properties); + return out; +} + +} + using TEvDescribeExternalDataSourceRequest = TGrpcRequestOperationCall< Ydb::Table::DescribeExternalDataSourceRequest, Ydb::Table::DescribeExternalDataSourceResponse @@ -64,7 +143,7 @@ class TDescribeExternalDataSourceRPC : public TRpcSchemeRequestActor #include +#include +#include +#include #include +#include +#include + namespace NKikimr::NGRpcService { using namespace NActors; +using namespace NJson; +using namespace NKikimrSchemeOp; using namespace NYql; +namespace { + +bool Convert(const TColumnDescription& in, Ydb::Table::ColumnMeta& out, TIssues& issues) { + try { + FillColumnDescription(out, in); + } catch (const std::exception& ex) { + issues.AddIssue(TStringBuilder() << "Unable to fill the column description, error: " << ex.what()); + return false; + } + return true; +} + +bool ConvertContent( + const TString& sourceType, + const TString& in, + google::protobuf::Map& out, + TIssues& issues +) { + const auto externalSourceFactory = NExternalSource::CreateExternalSourceFactory({}); + try { + const auto source = externalSourceFactory->GetOrCreate(sourceType); + for (const auto& [key, items] : source->GetParameters(in)) { + TJsonValue json(EJsonValueType::JSON_ARRAY); + for (const auto& item : items) { + json.AppendValue(item); + } + out[key] = WriteJson(json, false); + } + } catch (...) { + issues.AddIssue(TStringBuilder() << "Cannot unpack the content of an external table of type: " << sourceType + << ", error: " << CurrentExceptionMessage() + ); + return false; + } + return true; +} + +std::optional Convert( + const TDirEntry& inSelf, + const TExternalTableDescription& inDesc, + TIssues& issues +) { + Ydb::Table::DescribeExternalTableResult out; + ConvertDirectoryEntry(inSelf, out.mutable_self(), true); + + out.set_source_type(inDesc.GetSourceType()); + out.set_data_source_path(inDesc.GetDataSourcePath()); + out.set_location(inDesc.GetLocation()); + for (const auto& column : inDesc.GetColumns()) { + if (!Convert(column, *out.add_columns(), issues)) { + return std::nullopt; + } + } + if (!ConvertContent(inDesc.GetSourceType(), inDesc.GetContent(), *out.mutable_content(), issues)) { + return std::nullopt; + } + return out; +} + +} + using TEvDescribeExternalTableRequest = TGrpcRequestOperationCall< Ydb::Table::DescribeExternalTableRequest, Ydb::Table::DescribeExternalTableResponse @@ -62,9 +132,15 @@ class TDescribeExternalTableRPC : public TRpcSchemeRequestActor(Ydb::Table::ColumnMeta return columnType; } +void FillColumnDescription(Ydb::Table::ColumnMeta& out, const NKikimrSchemeOp::TColumnDescription& in) { + AddColumn(&out, in); +} + template void FillColumnDescriptionImpl(TYdbProto& out, NKikimrMiniKQL::TType& splitKeyType, const NKikimrSchemeOp::TTableDescription& in) { @@ -1138,7 +1142,7 @@ void FillAttributesImpl(TOutProto& out, const TInProto& in) { } void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out, const NKikimrSchemeOp::TCdcStreamDescription& in) { - + out.set_name(in.GetName()); out.set_virtual_timestamps(in.GetVirtualTimestamps()); out.set_aws_region(in.GetAwsRegion()); diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index e6a38df6b68d..0e897f758fda 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -51,6 +51,7 @@ bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NK Ydb::StatusIds::StatusCode& status, TString& error); // out +void FillColumnDescription(Ydb::Table::ColumnMeta& out, const NKikimrSchemeOp::TColumnDescription& in); void FillColumnDescription(Ydb::Table::DescribeTableResult& out, NKikimrMiniKQL::TType& splitKeyType, const NKikimrSchemeOp::TTableDescription& in); void FillColumnDescription(Ydb::Table::CreateTableRequest& out, @@ -147,7 +148,7 @@ bool FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrS // in bool FillSequenceDescription( - NKikimrSchemeOp::TSequenceDescription& out, const Ydb::Table::SequenceDescription& in, + NKikimrSchemeOp::TSequenceDescription& out, const Ydb::Table::SequenceDescription& in, Ydb::StatusIds::StatusCode& status, TString& error); } // namespace NKikimr