Skip to content

Commit 68fb393

Browse files
authored
External Data Sources: RPC implementation for the describe methods + ydb scheme describe support (#14509)
1 parent 9b57d61 commit 68fb393

File tree

11 files changed

+242
-0
lines changed

11 files changed

+242
-0
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#include "rpc_scheme_base.h"
2+
#include "service_table.h"
3+
4+
#include <ydb/core/grpc_services/base/base.h>
5+
#include <ydb/public/api/protos/ydb_table.pb.h>
6+
7+
namespace NKikimr::NGRpcService {
8+
9+
using namespace NActors;
10+
using namespace NYql;
11+
12+
using TEvDescribeExternalDataSourceRequest = TGrpcRequestOperationCall<
13+
Ydb::Table::DescribeExternalDataSourceRequest,
14+
Ydb::Table::DescribeExternalDataSourceResponse
15+
>;
16+
17+
class TDescribeExternalDataSourceRPC : public TRpcSchemeRequestActor<TDescribeExternalDataSourceRPC, TEvDescribeExternalDataSourceRequest> {
18+
using TBase = TRpcSchemeRequestActor<TDescribeExternalDataSourceRPC, TEvDescribeExternalDataSourceRequest>;
19+
20+
public:
21+
22+
using TBase::TBase;
23+
24+
void Bootstrap() {
25+
DescribeScheme();
26+
}
27+
28+
private:
29+
30+
void DescribeScheme() {
31+
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
32+
SetAuthToken(ev, *Request_);
33+
SetDatabase(ev.get(), *Request_);
34+
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());
35+
36+
Send(MakeTxProxyID(), ev.release());
37+
Become(&TDescribeExternalDataSourceRPC::StateDescribeScheme);
38+
}
39+
40+
STATEFN(StateDescribeScheme) {
41+
switch (ev->GetTypeRewrite()) {
42+
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
43+
default:
44+
return TBase::StateWork(ev);
45+
}
46+
}
47+
48+
void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
49+
const auto& record = ev->Get()->GetRecord();
50+
const auto& pathDescription = record.GetPathDescription();
51+
52+
if (record.HasReason()) {
53+
Request_->RaiseIssue(TIssue(record.GetReason()));
54+
}
55+
56+
switch (record.GetStatus()) {
57+
case NKikimrScheme::StatusSuccess: {
58+
if (pathDescription.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeExternalDataSource) {
59+
Request_->RaiseIssue(TIssue(
60+
TStringBuilder() << "Unexpected path type: " << pathDescription.GetSelf().GetPathType()
61+
));
62+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
63+
}
64+
65+
return ReplyWithResult(
66+
Ydb::StatusIds::SUCCESS,
67+
Ydb::Table::DescribeExternalDataSourceResult(), // to do: convert private protobuf to public
68+
ctx
69+
);
70+
}
71+
case NKikimrScheme::StatusPathDoesNotExist:
72+
case NKikimrScheme::StatusSchemeError:
73+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
74+
75+
case NKikimrScheme::StatusAccessDenied:
76+
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);
77+
78+
case NKikimrScheme::StatusNotAvailable:
79+
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
80+
81+
default:
82+
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
83+
}
84+
}
85+
};
86+
87+
void DoDescribeExternalDataSourceRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
88+
f.RegisterActor(new TDescribeExternalDataSourceRPC(p.release()));
89+
}
90+
91+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#include "rpc_scheme_base.h"
2+
#include "service_table.h"
3+
4+
#include <ydb/core/grpc_services/base/base.h>
5+
#include <ydb/public/api/protos/ydb_table.pb.h>
6+
7+
namespace NKikimr::NGRpcService {
8+
9+
using namespace NActors;
10+
using namespace NYql;
11+
12+
using TEvDescribeExternalTableRequest = TGrpcRequestOperationCall<
13+
Ydb::Table::DescribeExternalTableRequest,
14+
Ydb::Table::DescribeExternalTableResponse
15+
>;
16+
17+
class TDescribeExternalTableRPC : public TRpcSchemeRequestActor<TDescribeExternalTableRPC, TEvDescribeExternalTableRequest> {
18+
using TBase = TRpcSchemeRequestActor<TDescribeExternalTableRPC, TEvDescribeExternalTableRequest>;
19+
20+
public:
21+
22+
using TBase::TBase;
23+
24+
void Bootstrap() {
25+
DescribeScheme();
26+
}
27+
28+
private:
29+
30+
void DescribeScheme() {
31+
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
32+
SetAuthToken(ev, *Request_);
33+
SetDatabase(ev.get(), *Request_);
34+
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());
35+
36+
Send(MakeTxProxyID(), ev.release());
37+
Become(&TDescribeExternalTableRPC::StateDescribeScheme);
38+
}
39+
40+
STATEFN(StateDescribeScheme) {
41+
switch (ev->GetTypeRewrite()) {
42+
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
43+
default:
44+
return TBase::StateWork(ev);
45+
}
46+
}
47+
48+
void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
49+
const auto& record = ev->Get()->GetRecord();
50+
const auto& pathDescription = record.GetPathDescription();
51+
52+
if (record.HasReason()) {
53+
Request_->RaiseIssue(TIssue(record.GetReason()));
54+
}
55+
56+
switch (record.GetStatus()) {
57+
case NKikimrScheme::StatusSuccess: {
58+
if (pathDescription.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeExternalTable) {
59+
Request_->RaiseIssue(TIssue(
60+
TStringBuilder() << "Unexpected path type: " << pathDescription.GetSelf().GetPathType()
61+
));
62+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
63+
}
64+
65+
return ReplyWithResult(
66+
Ydb::StatusIds::SUCCESS,
67+
Ydb::Table::DescribeExternalTableResult(), // to do: convert private proto to public
68+
ctx
69+
);
70+
}
71+
case NKikimrScheme::StatusPathDoesNotExist:
72+
case NKikimrScheme::StatusSchemeError:
73+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
74+
75+
case NKikimrScheme::StatusAccessDenied:
76+
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);
77+
78+
case NKikimrScheme::StatusNotAvailable:
79+
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
80+
81+
default:
82+
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
83+
}
84+
}
85+
};
86+
87+
void DoDescribeExternalTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
88+
f.RegisterActor(new TDescribeExternalTableRPC(p.release()));
89+
}
90+
91+
}

ydb/core/grpc_services/service_table.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ void DoCopyTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvide
1616
void DoCopyTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1717
void DoRenameTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1818
void DoDescribeTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
19+
void DoDescribeExternalDataSourceRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
20+
void DoDescribeExternalTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1921
void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2022
void DoDeleteSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2123
void DoKeepAliveRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);

ydb/core/grpc_services/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ SRCS(
3737
rpc_describe_path.cpp
3838
rpc_describe_table.cpp
3939
rpc_describe_table_options.cpp
40+
rpc_describe_external_data_source.cpp
41+
rpc_describe_external_table.cpp
4042
rpc_drop_coordination_node.cpp
4143
rpc_drop_table.cpp
4244
rpc_discovery.cpp

ydb/core/jaeger_tracing/request_discriminator.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ extern const THashMap<TStringBuf, ERequestType> NameToRequestType = {
4242
{"Table.StreamExecuteScanQuery", ERequestType::TABLE_STREAMEXECUTESCANQUERY},
4343
{"Table.StreamReadTable", ERequestType::TABLE_STREAMREADTABLE},
4444
{"Table.ReadRows", ERequestType::TABLE_READROWS},
45+
{"Table.DescribeExternalDataSource", ERequestType::TABLE_DESCRIBEEXTERNALDATASOURCE},
46+
{"Table.DescribeExternalTable", ERequestType::TABLE_DESCRIBEEXTERNALTABLE},
4547

4648
{"Query.ExecuteQuery", ERequestType::QUERY_EXECUTEQUERY},
4749
{"Query.ExecuteScript", ERequestType::QUERY_EXECUTESCRIPT},

ydb/core/jaeger_tracing/request_discriminator.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ enum class ERequestType: size_t {
4747
TABLE_STREAMEXECUTESCANQUERY,
4848
TABLE_STREAMREADTABLE,
4949
TABLE_READROWS,
50+
TABLE_DESCRIBEEXTERNALDATASOURCE,
51+
TABLE_DESCRIBEEXTERNALTABLE,
5052

5153
QUERY_EXECUTEQUERY,
5254
QUERY_EXECUTESCRIPT,

ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,10 @@ int TCommandDescribe::PrintPathResponse(TDriver& driver, const NScheme::TDescrib
283283
return DescribeReplication(driver);
284284
case NScheme::ESchemeEntryType::View:
285285
return DescribeView(driver);
286+
case NScheme::ESchemeEntryType::ExternalDataSource:
287+
return DescribeExternalDataSource(driver);
288+
case NScheme::ESchemeEntryType::ExternalTable:
289+
return DescribeExternalTable(driver);
286290
default:
287291
return DescribeEntryDefault(entry);
288292
}
@@ -615,6 +619,36 @@ int TCommandDescribe::DescribeView(const TDriver& driver) {
615619
return PrintDescription(this, OutputFormat, result, &TCommandDescribe::PrintViewResponsePretty);
616620
}
617621

622+
int TCommandDescribe::PrintExternalDataSourceResponsePretty(const NYdb::NTable::TExternalDataSourceDescription& description) const {
623+
// to do
624+
return EXIT_SUCCESS;
625+
}
626+
627+
int TCommandDescribe::DescribeExternalDataSource(const TDriver& driver) {
628+
NTable::TTableClient client(driver);
629+
const auto sessionResult = client.CreateSession().ExtractValueSync();
630+
NStatusHelpers::ThrowOnErrorOrPrintIssues(sessionResult);
631+
const auto description = sessionResult.GetSession().DescribeExternalDataSource(Path).ExtractValueSync();
632+
NStatusHelpers::ThrowOnErrorOrPrintIssues(description);
633+
634+
return PrintDescription(this, OutputFormat, description.GetExternalDataSourceDescription(), &TCommandDescribe::PrintExternalDataSourceResponsePretty);
635+
}
636+
637+
int TCommandDescribe::PrintExternalTableResponsePretty(const NYdb::NTable::TExternalTableDescription& description) const {
638+
// to do
639+
return EXIT_SUCCESS;
640+
}
641+
642+
int TCommandDescribe::DescribeExternalTable(const TDriver& driver) {
643+
NTable::TTableClient client(driver);
644+
const auto sessionResult = client.CreateSession().ExtractValueSync();
645+
NStatusHelpers::ThrowOnErrorOrPrintIssues(sessionResult);
646+
const auto result = sessionResult.GetSession().DescribeExternalTable(Path).ExtractValueSync();
647+
NStatusHelpers::ThrowOnErrorOrPrintIssues(result);
648+
649+
return PrintDescription(this, OutputFormat, result.GetExternalTableDescription(), &TCommandDescribe::PrintExternalTableResponsePretty);
650+
}
651+
618652
namespace {
619653
void PrintColumns(const NTable::TTableDescription& tableDescription) {
620654
if (!tableDescription.GetTableColumns().size()) {

ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ class TCommandDescribe : public TYdbOperationCommand, public TCommandWithPath, p
105105
int DescribeView(const TDriver& driver);
106106
int PrintViewResponsePretty(const NYdb::NView::TDescribeViewResult& result) const;
107107

108+
int DescribeExternalDataSource(const TDriver& driver);
109+
int PrintExternalDataSourceResponsePretty(const NYdb::NTable::TExternalDataSourceDescription& result) const;
110+
111+
int DescribeExternalTable(const TDriver& driver);
112+
int PrintExternalTableResponsePretty(const NYdb::NTable::TExternalTableDescription& result) const;
113+
108114
int TryTopicConsumerDescribeOrFail(NYdb::TDriver& driver, const NScheme::TDescribePathResult& result);
109115
std::pair<TString, TString> ParseTopicConsumer() const;
110116
int PrintConsumerResponsePretty(const NYdb::NTopic::TConsumerDescription& description) const;

ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class TProtoAccessor {
4343
static ::google::protobuf::Map<TStringType, Ydb::TypedValue>* GetProtoMapPtr(TParams& params);
4444
static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats);
4545
static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription);
46+
static const Ydb::Table::DescribeExternalDataSourceResult& GetProto(const NTable::TExternalDataSourceDescription&);
47+
static const Ydb::Table::DescribeExternalTableResult& GetProto(const NTable::TExternalTableDescription&);
4648
static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription);
4749
static const Ydb::Topic::DescribeConsumerResult& GetProto(const NYdb::NTopic::TConsumerDescription& consumerDescription);
4850
static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult);

ydb/public/sdk/cpp/src/client/table/proto_accessor.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ const Ydb::Table::DescribeTableResult& TProtoAccessor::GetProto(const NTable::TT
1212
return tableDescription.GetProto();
1313
}
1414

15+
const Ydb::Table::DescribeExternalDataSourceResult& TProtoAccessor::GetProto(const NTable::TExternalDataSourceDescription& description) {
16+
return description.GetProto();
17+
}
18+
19+
const Ydb::Table::DescribeExternalTableResult& TProtoAccessor::GetProto(const NTable::TExternalTableDescription& description) {
20+
return description.GetProto();
21+
}
22+
1523
NTable::TQueryStats TProtoAccessor::FromProto(const Ydb::TableStats::QueryStats& queryStats) {
1624
return NTable::TQueryStats(queryStats);
1725
}

0 commit comments

Comments
 (0)