Skip to content

External data source: convert the private protobuf to the public one on describe RPC #14546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion ydb/core/grpc_services/rpc_describe_external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,92 @@
#include "service_table.h"

#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/public/api/protos/ydb_table.pb.h>

namespace NKikimr::NGRpcService {

using namespace NActors;
using namespace NKikimrSchemeOp;
using namespace NYql;

namespace {

using TProperties = google::protobuf::Map<TProtoStringType, TProtoStringType>;

void Convert(const TServiceAccountAuth& in, TProperties& out) {
out["SERVICE_ACCOUNT_ID"] = in.GetId();
out["SERVICE_ACCOUNT_SECRET_NAME"] = in.GetSecretName();
}

void Convert(const TBasic& in, TProperties& out) {
out["LOGIN"] = in.GetLogin();
out["PASSWORD_SECRET_NAME"] = in.GetPasswordSecretName();
}

void Convert(const TMdbBasic& in, TProperties& out) {
out["SERVICE_ACCOUNT_ID"] = in.GetServiceAccountId();
out["SERVICE_ACCOUNT_SECRET_NAME"] = in.GetServiceAccountSecretName();
out["LOGIN"] = in.GetLogin();
out["PASSWORD_SECRET_NAME"] = in.GetPasswordSecretName();
}

void Convert(const TAws& in, TProperties& out) {
out["AWS_ACCESS_KEY_ID_SECRET_NAME"] = in.GetAwsAccessKeyIdSecretName();
out["AWS_SECRET_ACCESS_KEY_SECRET_NAME"] = in.GetAwsSecretAccessKeySecretName();
out["AWS_REGION"] = in.GetAwsRegion();
}

void Convert(const TToken& in, TProperties& out) {
out["TOKEN_SECRET_NAME"] = in.GetTokenSecretName();
}

void Convert(const TAuth& in, TProperties& out) {
auto& authMethod = out["AUTH_METHOD"];

switch (in.GetIdentityCase()) {
case TAuth::kNone:
authMethod = "NONE";
return;
case TAuth::kServiceAccount:
authMethod = "SERVICE_ACCOUNT";
Convert(in.GetServiceAccount(), out);
return;
case TAuth::kBasic:
authMethod = "BASIC";
Convert(in.GetBasic(), out);
return;
case TAuth::kMdbBasic:
authMethod = "MDB_BASIC";
Convert(in.GetMdbBasic(), out);
return;
case TAuth::kAws:
authMethod = "AWS";
Convert(in.GetAws(), out);
return;
case TAuth::kToken:
authMethod = "TOKEN";
Convert(in.GetToken(), out);
return;
case TAuth::IDENTITY_NOT_SET:
return;
}
}

Ydb::Table::DescribeExternalDataSourceResult Convert(const TDirEntry& inSelf, const TExternalDataSourceDescription& inDesc) {
Ydb::Table::DescribeExternalDataSourceResult out;
ConvertDirectoryEntry(inSelf, out.mutable_self(), true);

out.set_source_type(inDesc.GetSourceType());
out.set_location(inDesc.GetLocation());
auto& properties = *out.mutable_properties();
properties = inDesc.GetProperties().GetProperties();
Convert(inDesc.GetAuth(), properties);
return out;
}

}

using TEvDescribeExternalDataSourceRequest = TGrpcRequestOperationCall<
Ydb::Table::DescribeExternalDataSourceRequest,
Ydb::Table::DescribeExternalDataSourceResponse
Expand Down Expand Up @@ -64,7 +143,7 @@ class TDescribeExternalDataSourceRPC : public TRpcSchemeRequestActor<TDescribeEx

return ReplyWithResult(
Ydb::StatusIds::SUCCESS,
Ydb::Table::DescribeExternalDataSourceResult(), // to do: convert private protobuf to public
Convert(pathDescription.GetSelf(), pathDescription.GetExternalDataSourceDescription()),
ctx
);
}
Expand Down
78 changes: 77 additions & 1 deletion ydb/core/grpc_services/rpc_describe_external_table.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,84 @@
#include "rpc_scheme_base.h"
#include "service_table.h"

#include <ydb/core/external_sources/external_source_factory.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/core/ydb_convert/table_description.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/public/api/protos/ydb_table.pb.h>

#include <library/cpp/json/json_writer.h>
#include <library/cpp/json/writer/json_value.h>

namespace NKikimr::NGRpcService {

using namespace NActors;
using namespace NJson;
using namespace NKikimrSchemeOp;
using namespace NYql;

namespace {

bool Convert(const TColumnDescription& in, Ydb::Table::ColumnMeta& out, TIssues& issues) {
try {
FillColumnDescription(out, in);
} catch (const std::exception& ex) {
issues.AddIssue(TStringBuilder() << "Unable to fill the column description, error: " << ex.what());
return false;
}
return true;
}

bool ConvertContent(
const TString& sourceType,
const TString& in,
google::protobuf::Map<TProtoStringType, TProtoStringType>& out,
TIssues& issues
) {
const auto externalSourceFactory = NExternalSource::CreateExternalSourceFactory({});
try {
const auto source = externalSourceFactory->GetOrCreate(sourceType);
for (const auto& [key, items] : source->GetParameters(in)) {
TJsonValue json(EJsonValueType::JSON_ARRAY);
for (const auto& item : items) {
json.AppendValue(item);
}
out[key] = WriteJson(json, false);
}
} catch (...) {
issues.AddIssue(TStringBuilder() << "Cannot unpack the content of an external table of type: " << sourceType
<< ", error: " << CurrentExceptionMessage()
);
return false;
}
return true;
}

std::optional<Ydb::Table::DescribeExternalTableResult> Convert(
const TDirEntry& inSelf,
const TExternalTableDescription& inDesc,
TIssues& issues
) {
Ydb::Table::DescribeExternalTableResult out;
ConvertDirectoryEntry(inSelf, out.mutable_self(), true);

out.set_source_type(inDesc.GetSourceType());
out.set_data_source_path(inDesc.GetDataSourcePath());
out.set_location(inDesc.GetLocation());
for (const auto& column : inDesc.GetColumns()) {
if (!Convert(column, *out.add_columns(), issues)) {
return std::nullopt;
}
}
if (!ConvertContent(inDesc.GetSourceType(), inDesc.GetContent(), *out.mutable_content(), issues)) {
return std::nullopt;
}
return out;
}

}

using TEvDescribeExternalTableRequest = TGrpcRequestOperationCall<
Ydb::Table::DescribeExternalTableRequest,
Ydb::Table::DescribeExternalTableResponse
Expand Down Expand Up @@ -62,9 +132,15 @@ class TDescribeExternalTableRPC : public TRpcSchemeRequestActor<TDescribeExterna
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

TIssues issues;
const auto description = Convert(pathDescription.GetSelf(), pathDescription.GetExternalTableDescription(), issues);
if (!description) {
Reply(Ydb::StatusIds::INTERNAL_ERROR, issues, ctx);
return;
}
return ReplyWithResult(
Ydb::StatusIds::SUCCESS,
Ydb::Table::DescribeExternalTableResult(), // to do: convert private proto to public
*description,
ctx
);
}
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/ydb_convert/table_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ Ydb::Type* AddColumn<NKikimrSchemeOp::TColumnDescription>(Ydb::Table::ColumnMeta
return columnType;
}

void FillColumnDescription(Ydb::Table::ColumnMeta& out, const NKikimrSchemeOp::TColumnDescription& in) {
AddColumn(&out, in);
}

template <typename TYdbProto>
void FillColumnDescriptionImpl(TYdbProto& out,
NKikimrMiniKQL::TType& splitKeyType, const NKikimrSchemeOp::TTableDescription& in) {
Expand Down Expand Up @@ -1138,7 +1142,7 @@ void FillAttributesImpl(TOutProto& out, const TInProto& in) {
}
void FillChangefeedDescription(Ydb::Table::ChangefeedDescription& out,
const NKikimrSchemeOp::TCdcStreamDescription& in) {

out.set_name(in.GetName());
out.set_virtual_timestamps(in.GetVirtualTimestamps());
out.set_aws_region(in.GetAwsRegion());
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/ydb_convert/table_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NK
Ydb::StatusIds::StatusCode& status, TString& error);

// out
void FillColumnDescription(Ydb::Table::ColumnMeta& out, const NKikimrSchemeOp::TColumnDescription& in);
void FillColumnDescription(Ydb::Table::DescribeTableResult& out,
NKikimrMiniKQL::TType& splitKeyType, const NKikimrSchemeOp::TTableDescription& in);
void FillColumnDescription(Ydb::Table::CreateTableRequest& out,
Expand Down Expand Up @@ -147,7 +148,7 @@ bool FillSequenceDescription(Ydb::Table::CreateTableRequest& out, const NKikimrS

// in
bool FillSequenceDescription(
NKikimrSchemeOp::TSequenceDescription& out, const Ydb::Table::SequenceDescription& in,
NKikimrSchemeOp::TSequenceDescription& out, const Ydb::Table::SequenceDescription& in,
Ydb::StatusIds::StatusCode& status, TString& error);

} // namespace NKikimr
Loading