Skip to content

Commit 293d1e8

Browse files
committed
describe actors for external data sources and tables
1 parent b23b0fb commit 293d1e8

File tree

7 files changed

+186
-0
lines changed

7 files changed

+186
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#include "rpc_scheme_base.h"
2+
#include "service_table.h"
3+
4+
#include <ydb/core/base/path.h>
5+
#include <ydb/core/grpc_services/base/base.h>
6+
#include <ydb/public/api/protos/ydb_table.pb.h>
7+
8+
namespace NKikimr::NGRpcService {
9+
10+
using namespace NActors;
11+
12+
using TEvDescribeExternalDataSourceRequest = TGrpcRequestOperationCall<
13+
Ydb::Table::DescribeExternalDataSourceRequest,
14+
Ydb::Table::DescribeExternalDataSourceResponse
15+
>;
16+
17+
class TDescribeExternalDataSourceRPC : public TRpcSchemeRequestActor<TDescribeExternalDataSourceRPC, TEvDescribeExternalDataSourceRequest> {
18+
using TBase = TRpcSchemeRequestActor<TDescribeExternalDataSourceRPC, TEvDescribeExternalDataSourceRequest>;
19+
20+
public:
21+
TDescribeExternalDataSourceRPC(IRequestOpCtx* msg)
22+
: TBase(msg)
23+
{}
24+
25+
void Bootstrap(const TActorContext &ctx) {
26+
TBase::Bootstrap(ctx);
27+
28+
const auto* request = GetProtoRequest();
29+
const auto& path = request->path();
30+
const auto paths = NKikimr::SplitPath(path);
31+
if (paths.empty()) {
32+
Request_->RaiseIssue(NYql::TIssue("Invalid path"));
33+
return Reply(Ydb::StatusIds::BAD_REQUEST, ctx);
34+
}
35+
36+
auto navigate = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
37+
navigate->DatabaseName = CanonizePath(Request_->GetDatabaseName().GetOrElse(""));
38+
auto& entry = navigate->ResultSet.emplace_back();
39+
entry.Path = std::move(paths);
40+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
41+
entry.SyncVersion = true;
42+
43+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate));
44+
Become(&TDescribeExternalDataSourceRPC::StateWork);
45+
}
46+
47+
private:
48+
void StateWork(TAutoPtr<IEventHandle>& ev) {
49+
switch (ev->GetTypeRewrite()) {
50+
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
51+
default: TBase::StateWork(ev);
52+
}
53+
}
54+
55+
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
56+
const auto* navigate = ev->Get()->Request.Get();
57+
58+
if (navigate->ResultSet.size() != 1) {
59+
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
60+
}
61+
const auto& entry = navigate->ResultSet.front();
62+
63+
if (navigate->ErrorCount > 0) {
64+
switch (entry.Status) {
65+
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
66+
case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
67+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
68+
default:
69+
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
70+
}
71+
}
72+
if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
73+
// an invariant is broken: error count is equal to zero, but the status is not ok
74+
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
75+
}
76+
77+
// to do: fill the entire description from the received navigation entry
78+
Ydb::Table::DescribeExternalDataSourceResult description;
79+
const auto& properties = entry.ExternalDataSourceInfo->Description.GetProperties().GetProperties();
80+
description.mutable_properties()->insert(properties.begin(), properties.end());
81+
return ReplyWithResult(Ydb::StatusIds::SUCCESS, description, ctx);
82+
}
83+
};
84+
85+
void DoDescribeExternalDataSourceRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
86+
f.RegisterActor(new TDescribeExternalDataSourceRPC(p.release()));
87+
}
88+
89+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#include "rpc_scheme_base.h"
2+
#include "service_table.h"
3+
4+
#include <ydb/core/base/path.h>
5+
#include <ydb/core/grpc_services/base/base.h>
6+
#include <ydb/public/api/protos/ydb_table.pb.h>
7+
8+
namespace NKikimr::NGRpcService {
9+
10+
using namespace NActors;
11+
12+
using TEvDescribeExternalTableRequest = TGrpcRequestOperationCall<
13+
Ydb::Table::DescribeExternalTableRequest,
14+
Ydb::Table::DescribeExternalTableResponse
15+
>;
16+
17+
class TDescribeExternalTableRPC : public TRpcSchemeRequestActor<TDescribeExternalTableRPC, TEvDescribeExternalTableRequest> {
18+
using TBase = TRpcSchemeRequestActor<TDescribeExternalTableRPC, TEvDescribeExternalTableRequest>;
19+
20+
public:
21+
TDescribeExternalTableRPC(IRequestOpCtx* msg)
22+
: TBase(msg)
23+
{}
24+
25+
void Bootstrap(const TActorContext &ctx) {
26+
TBase::Bootstrap(ctx);
27+
28+
const auto* request = GetProtoRequest();
29+
const auto& path = request->path();
30+
const auto paths = NKikimr::SplitPath(path);
31+
if (paths.empty()) {
32+
Request_->RaiseIssue(NYql::TIssue("Invalid path"));
33+
return Reply(Ydb::StatusIds::BAD_REQUEST, ctx);
34+
}
35+
36+
auto navigate = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
37+
navigate->DatabaseName = CanonizePath(Request_->GetDatabaseName().GetOrElse(""));
38+
auto& entry = navigate->ResultSet.emplace_back();
39+
entry.Path = std::move(paths);
40+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
41+
entry.SyncVersion = true;
42+
43+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate));
44+
Become(&TDescribeExternalTableRPC::StateWork);
45+
}
46+
47+
private:
48+
void StateWork(TAutoPtr<IEventHandle>& ev) {
49+
switch (ev->GetTypeRewrite()) {
50+
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
51+
default: TBase::StateWork(ev);
52+
}
53+
}
54+
55+
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
56+
const auto* navigate = ev->Get()->Request.Get();
57+
58+
if (navigate->ResultSet.size() != 1) {
59+
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
60+
}
61+
const auto& entry = navigate->ResultSet.front();
62+
63+
if (navigate->ErrorCount > 0) {
64+
switch (entry.Status) {
65+
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
66+
case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
67+
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
68+
default:
69+
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
70+
}
71+
}
72+
if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
73+
// an invariant is broken: error count is equal to zero, but the status is not ok
74+
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
75+
}
76+
77+
// to do: fill the description from the received navigation entry
78+
Ydb::Table::DescribeExternalTableResult description;
79+
return ReplyWithResult(Ydb::StatusIds::SUCCESS, description, ctx);
80+
}
81+
};
82+
83+
void DoDescribeExternalTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
84+
f.RegisterActor(new TDescribeExternalTableRPC(p.release()));
85+
}
86+
87+
}

ydb/core/grpc_services/service_table.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ void DoCopyTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvide
1616
void DoCopyTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1717
void DoRenameTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1818
void DoDescribeTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
19+
void DoDescribeExternalDataSourceRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
20+
void DoDescribeExternalTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1921
void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2022
void DoDeleteSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
2123
void DoKeepAliveRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);

ydb/core/grpc_services/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ SRCS(
3737
rpc_describe_path.cpp
3838
rpc_describe_table.cpp
3939
rpc_describe_table_options.cpp
40+
rpc_describe_external_data_source.cpp
41+
rpc_describe_external_table.cpp
4042
rpc_drop_coordination_node.cpp
4143
rpc_drop_table.cpp
4244
rpc_discovery.cpp

ydb/core/jaeger_tracing/request_discriminator.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ extern const THashMap<TStringBuf, ERequestType> NameToRequestType = {
4242
{"Table.StreamExecuteScanQuery", ERequestType::TABLE_STREAMEXECUTESCANQUERY},
4343
{"Table.StreamReadTable", ERequestType::TABLE_STREAMREADTABLE},
4444
{"Table.ReadRows", ERequestType::TABLE_READROWS},
45+
{"Table.DescribeExternalDataSource", ERequestType::TABLE_DESCRIBEEXTERNALDATASOURCE},
46+
{"Table.DescribeExternalTable", ERequestType::TABLE_DESCRIBEEXTERNALTABLE},
4547

4648
{"Query.ExecuteQuery", ERequestType::QUERY_EXECUTEQUERY},
4749
{"Query.ExecuteScript", ERequestType::QUERY_EXECUTESCRIPT},

ydb/core/jaeger_tracing/request_discriminator.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ enum class ERequestType: size_t {
4747
TABLE_STREAMEXECUTESCANQUERY,
4848
TABLE_STREAMREADTABLE,
4949
TABLE_READROWS,
50+
TABLE_DESCRIBEEXTERNALDATASOURCE,
51+
TABLE_DESCRIBEEXTERNALTABLE,
5052

5153
QUERY_EXECUTEQUERY,
5254
QUERY_EXECUTESCRIPT,

ydb/services/ydb/ydb_table.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
8585
ADD_REQUEST_LIMIT(CreateTable, DoCreateTableRequest, Rps, CREATETABLE)
8686
ADD_REQUEST_LIMIT(DropTable, DoDropTableRequest, Rps, DROPTABLE)
8787
ADD_REQUEST_LIMIT(DescribeTable, DoDescribeTableRequest, Rps, DESCRIBETABLE)
88+
ADD_REQUEST_LIMIT(DescribeExternalDataSource, DoDescribeExternalDataSourceRequest, Rps, DESCRIBEEXTERNALDATASOURCE)
89+
ADD_REQUEST_LIMIT(DescribeExternalTable, DoDescribeExternalTableRequest, Rps, DESCRIBEEXTERNALTABLE)
8890
ADD_REQUEST_LIMIT(CopyTable, DoCopyTableRequest, Rps, COPYTABLE)
8991
ADD_REQUEST_LIMIT(CopyTables, DoCopyTablesRequest, Rps, COPYTABLES)
9092
ADD_REQUEST_LIMIT(RenameTables, DoRenameTablesRequest, Rps, RENAMETABLES)

0 commit comments

Comments
 (0)