Skip to content

Commit 3b3c1b6

Browse files
added query script
1 parent af0844f commit 3b3c1b6

File tree

11 files changed

+422
-38
lines changed

11 files changed

+422
-38
lines changed

ydb/core/grpc_services/local_rpc/local_rpc.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -293,23 +293,23 @@ concept TRequestWithOperationParams = requires(TRequest& request) {
293293
};
294294

295295
template<TRequestWithOperationParams TRequest>
296-
void SetRequestSyncOperationMode(TRequest& request) {
297-
request.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::SYNC);
296+
void SetRequestSyncOperationMode(TRequest& request, Ydb::Operations::OperationParams::OperationMode operationMode = Ydb::Operations::OperationParams::SYNC) {
297+
request.mutable_operation_params()->set_operation_mode(operationMode);
298298
}
299299

300300
template<class TRequest>
301-
void SetRequestSyncOperationMode(TRequest&) {
301+
void SetRequestSyncOperationMode(TRequest&, Ydb::Operations::OperationParams::OperationMode) {
302302
// nothing
303303
}
304304

305305
template<typename TRpc>
306306
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database,
307307
const TMaybe<TString>& token, const TMaybe<TString>& requestType,
308-
TActorSystem* actorSystem, bool internalCall = false)
308+
TActorSystem* actorSystem, bool internalCall = false, Ydb::Operations::OperationParams::OperationMode operationMode = Ydb::Operations::OperationParams::SYNC)
309309
{
310310
auto promise = NThreading::NewPromise<typename TRpc::TResponse>();
311311

312-
SetRequestSyncOperationMode(proto);
312+
SetRequestSyncOperationMode(proto, operationMode);
313313

314314
using TCbWrapper = TPromiseWrapper<typename TRpc::TResponse>;
315315
auto req = new TLocalRpcCtx<TRpc, TCbWrapper>(std::move(proto), TCbWrapper(promise), database, token, requestType, internalCall);
@@ -320,8 +320,8 @@ NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest
320320
}
321321

322322
template<typename TRpc>
323-
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, TActorSystem* actorSystem, bool internalCall = false) {
324-
return DoLocalRpc<TRpc>(std::move(proto), database, token, Nothing(), actorSystem, internalCall);
323+
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, TActorSystem* actorSystem, bool internalCall = false, Ydb::Operations::OperationParams::OperationMode operationMode = Ydb::Operations::OperationParams::SYNC) {
324+
return DoLocalRpc<TRpc>(std::move(proto), database, token, Nothing(), actorSystem, internalCall, operationMode);
325325
}
326326

327327
template<typename TRpc>

ydb/core/grpc_services/query/rpc_execute_script.cpp

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/base/appdata.h>
44
#include <ydb/library/ydb_issue/issue_helpers.h>
55
#include <ydb/core/grpc_services/base/base.h>
6+
#include <ydb/core/grpc_services/rpc_request_base.h>
67
#include <ydb/core/grpc_services/rpc_kqp_base.h>
78
#include <ydb/core/grpc_services/audit_dml_operations.h>
89
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
@@ -72,27 +73,29 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
7273
return {Ydb::StatusIds::SUCCESS, {}};
7374
}
7475

75-
class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
76+
class TExecuteScriptRPC : public TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false> {
7677
public:
78+
using TRpcRequestActorBase = TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false>;
79+
7780
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
7881
return NKikimrServices::TActivity::GRPC_REQ;
7982
}
8083

81-
TExecuteScriptRPC(TEvExecuteScriptRequest* request)
82-
: Request_(request)
84+
TExecuteScriptRPC(IRequestNoOpCtx* request)
85+
: TRpcRequestActorBase(request)
8386
{}
8487

8588
void Bootstrap() {
8689
NYql::TIssues issues;
87-
const auto& request = *Request_->GetProtoRequest();
90+
const auto& request = GetProtoRequest();
8891

89-
if (request.operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
92+
if (request->operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
9093
issues.AddIssue("ExecuteScript must be asyncronous operation");
9194
return Reply(Ydb::StatusIds::BAD_REQUEST, issues);
9295
}
9396

94-
AuditContextAppend(Request_.get(), request);
95-
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext());
97+
AuditContextAppend(Request.Get(), request);
98+
NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *request, TlsActivationContext->AsActorContext());
9699

97100
Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
98101
if (auto scriptRequest = MakeScriptRequest(issues, status)) {
@@ -113,7 +116,7 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
113116
)
114117

115118
void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) {
116-
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);
119+
NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *GetProtoRequest(), ev, ctx);
117120

118121
Ydb::Operations::Operation operation;
119122
operation.set_id(ev->Get()->OperationId);
@@ -126,14 +129,14 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
126129
}
127130

128131
THolder<NKqp::TEvKqp::TEvScriptRequest> MakeScriptRequest(NYql::TIssues& issues, Ydb::StatusIds::StatusCode& status) const {
129-
const auto* req = Request_->GetProtoRequest();
130-
const auto traceId = Request_->GetTraceId();
132+
const auto* req = GetProtoRequest();
133+
const auto traceId = Request->GetTraceId();
131134

132135
auto ev = MakeHolder<NKqp::TEvKqp::TEvScriptRequest>();
133136

134-
SetAuthToken(ev, *Request_);
135-
SetDatabase(ev, *Request_);
136-
SetRlPath(ev, *Request_);
137+
SetAuthToken(ev, *Request);
138+
SetDatabase(ev, *Request);
139+
SetRlPath(ev, *Request);
137140

138141
if (traceId) {
139142
ev->Record.SetTraceId(traceId.GetRef());
@@ -166,12 +169,9 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
166169

167170
result.set_status(status);
168171

169-
AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), result);
170-
171-
TString serializedResult;
172-
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);
172+
AuditContextAppend(Request.Get(), GetProtoRequest(), result);
173173

174-
Request_->SendSerializedResult(std::move(serializedResult), status);
174+
TProtoResponseHelper::SendProtoResponse(result, status, Request);
175175

176176
PassAway();
177177
}
@@ -181,9 +181,6 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
181181
result.set_ready(true);
182182
Reply(status, std::move(result), issues);
183183
}
184-
185-
private:
186-
std::unique_ptr<TEvExecuteScriptRequest> Request_;
187184
};
188185

189186
} // namespace
@@ -197,6 +194,11 @@ void DoExecuteScript(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider
197194
f.RegisterActor(new TExecuteScriptRPC(req));
198195
}
199196

197+
} // namespace NQuery
198+
199+
template<>
200+
IActor* TEvExecuteScriptRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
201+
return new TExecuteScriptRPC(msg);
200202
}
201203

202204
} // namespace NKikimr::NGRpcService

ydb/core/grpc_services/query/rpc_fetch_script_results.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
3535
return NKikimrServices::TActivity::GRPC_REQ;
3636
}
3737

38-
TFetchScriptResultsRPC(TEvFetchScriptResultsRequest* request)
38+
TFetchScriptResultsRPC(IRequestNoOpCtx* request)
3939
: TRpcRequestActorBase(request)
4040
{}
4141

@@ -108,10 +108,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
108108

109109
result.set_status(status);
110110

111-
TString serializedResult;
112-
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);
113-
114-
Request->SendSerializedResult(std::move(serializedResult), status);
111+
TProtoResponseHelper::SendProtoResponse(result, status, Request);
115112

116113
PassAway();
117114
}
@@ -154,4 +151,9 @@ void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityPro
154151

155152
}
156153

154+
template<>
155+
IActor* TEvFetchScriptResultsRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
156+
return new TFetchScriptResultsRPC(msg);
157+
}
158+
157159
} // namespace NKikimr::NGRpcService
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#include "json_handlers.h"
2+
#include "query_execute_script.h"
3+
#include "query_fetch_script.h"
4+
5+
namespace NKikimr::NViewer {
6+
7+
void InitQueryExecuteScriptJsonHandler(TJsonHandlers& handlers) {
8+
handlers.AddHandler("/query/script/execute", new TJsonHandler<TQueryExecuteScript>(TQueryExecuteScript::GetSwagger()));
9+
}
10+
11+
void InitQueryFetchScriptJsonHandler(TJsonHandlers& handlers) {
12+
handlers.AddHandler("/query/script/fetch", new TJsonHandler<TQueryFetchScript>(TQueryFetchScript::GetSwagger()));
13+
}
14+
15+
void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers) {
16+
InitQueryExecuteScriptJsonHandler(jsonHandlers);
17+
InitQueryFetchScriptJsonHandler(jsonHandlers);
18+
}
19+
20+
} // namespace NKikimr::NViewer

ydb/core/viewer/json_local_rpc.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TPr
4848
ui32 Timeout = 0;
4949
TString Database;
5050
NThreading::TFuture<TProtoResponse> RpcFuture;
51+
Ydb::Operations::OperationParams::OperationMode OperationMode = Ydb::Operations::OperationParams::SYNC;
5152

5253
public:
5354
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -147,7 +148,7 @@ class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TPr
147148
}
148149

149150
void SendGrpcRequest() {
150-
RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem());
151+
RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem(), false, OperationMode);
151152
RpcFuture.Subscribe([actorId = TBase::SelfId(), actorSystem = TlsActivationContext->ActorSystem()]
152153
(const NThreading::TFuture<TProtoResponse>& future) {
153154
auto& response = future.GetValueSync();

ydb/core/viewer/operation_list.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,17 @@ class TOperationList : public TOperationListRpc {
3030
} else {
3131
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "field 'database' is required"));
3232
}
33-
3433
if (params.Has("kind")) {
3534
Request.set_kind(params.Get("kind"));
3635
} else {
3736
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "field 'kind' is required"));
3837
}
39-
4038
if (params.Has("page_size")) {
4139
Request.set_page_size(FromStringWithDefault<ui32>(params.Get("page_size"), 0));
4240
}
43-
4441
if (params.Has("page_token")) {
4542
Request.set_page_token(params.Get("page_token"));
4643
}
47-
4844
TBase::Bootstrap();
4945
}
5046

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
#pragma once
2+
#include "json_local_rpc.h"
3+
#include <ydb/core/grpc_services/rpc_calls.h>
4+
#include <ydb/core/viewer/yaml/yaml.h>
5+
#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h>
6+
#include <ydb/public/api/grpc/ydb_operation_v1.grpc.pb.h>
7+
8+
namespace NKikimr::NViewer {
9+
10+
using TQueryExecuteScriptRpc = TJsonLocalRpc<Ydb::Query::ExecuteScriptRequest,
11+
Ydb::Operations::Operation,
12+
Ydb::Operations::Operation,
13+
Ydb::Query::V1::QueryService,
14+
NKikimr::NGRpcService::TGrpcRequestNoOperationCall<Ydb::Query::ExecuteScriptRequest, Ydb::Operations::Operation>>;
15+
16+
class TQueryExecuteScript : public TQueryExecuteScriptRpc {
17+
public:
18+
using TBase = TQueryExecuteScriptRpc;
19+
20+
TQueryExecuteScript(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
21+
: TBase(viewer, ev)
22+
{
23+
OperationMode = Ydb::Operations::OperationParams::ASYNC;
24+
}
25+
26+
void Bootstrap() override {
27+
if (Event->Get()->Request.GetMethod() != HTTP_METHOD_POST) {
28+
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "Only POST method is allowed"));
29+
}
30+
31+
TStringBuf content = Event->Get()->Request.GetPostContent();
32+
static NJson::TJsonReaderConfig JsonConfig;
33+
NJson::TJsonValue params;
34+
bool success = NJson::ReadJsonTree(content, &JsonConfig, &params);
35+
if (!success) {
36+
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "Empty content received"));
37+
}
38+
if (params.Has("database")) {
39+
Database = params["database"].GetStringRobust();
40+
} else {
41+
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "field 'database' is required"));
42+
}
43+
Request.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::ASYNC);
44+
if (params.Has("query")) {
45+
auto query = params["query"].GetStringRobust();
46+
Request.mutable_script_content()->set_text(query);
47+
auto syntax = params["syntax"].GetStringRobust();
48+
if (syntax == "yql_v1") {
49+
Request.mutable_script_content()->set_syntax(Ydb::Query::SYNTAX_YQL_V1);
50+
} else if (syntax == "pg") {
51+
Request.mutable_script_content()->set_syntax(Ydb::Query::SYNTAX_PG);
52+
} else {
53+
Request.mutable_script_content()->set_syntax(Ydb::Query::SYNTAX_UNSPECIFIED);
54+
}
55+
} else {
56+
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "field 'query' is required"));
57+
}
58+
59+
if (params.Has("stats")) {
60+
auto Stats = params["stats"].GetStringRobust();
61+
if (Stats == "none"){
62+
Request.set_stats_mode(Ydb::Query::STATS_MODE_NONE);
63+
} else if (Stats == "basic") {
64+
Request.set_stats_mode(Ydb::Query::STATS_MODE_BASIC);
65+
} else if (Stats == "profile") {
66+
Request.set_stats_mode(Ydb::Query::STATS_MODE_FULL);
67+
} else if (Stats == "full") {
68+
Request.set_stats_mode(Ydb::Query::STATS_MODE_PROFILE);
69+
}
70+
} else {
71+
Request.set_stats_mode(Ydb::Query::STATS_MODE_NONE);
72+
}
73+
74+
if (params.Has("exec_mode")) {
75+
auto execMode = params["exec_mode"].GetStringRobust();
76+
if (execMode == "parse") {
77+
Request.set_exec_mode(Ydb::Query::EXEC_MODE_PARSE);
78+
} else if (execMode == "validate") {
79+
Request.set_exec_mode(Ydb::Query::EXEC_MODE_VALIDATE);
80+
} else if (execMode == "explain") {
81+
Request.set_exec_mode(Ydb::Query::EXEC_MODE_EXPLAIN);
82+
} else if (execMode== "execute") {
83+
Request.set_exec_mode(Ydb::Query::EXEC_MODE_EXECUTE);
84+
}
85+
} else {
86+
Request.set_exec_mode(Ydb::Query::EXEC_MODE_EXECUTE);
87+
}
88+
89+
TBase::Bootstrap();
90+
}
91+
92+
static YAML::Node GetSwagger() {
93+
YAML::Node node = YAML::Load(R"___(
94+
get:
95+
tags:
96+
- operation
97+
summary: Execute script
98+
description: Execute script
99+
parameters:
100+
- name: database
101+
in: query
102+
description: database name
103+
required: true
104+
type: string
105+
- name: query
106+
in: query
107+
description: query
108+
required: true
109+
type: string
110+
- name: syntax
111+
in: query
112+
description: syntax
113+
required: false
114+
type: string
115+
- name: stats
116+
in: query
117+
description: stats
118+
required: false
119+
type: string
120+
- name: exec_mode
121+
in: query
122+
description: exec_mode
123+
required: false
124+
type: string
125+
responses:
126+
200:
127+
description: OK
128+
content:
129+
application/json:
130+
schema: {}
131+
400:
132+
description: Bad Request
133+
403:
134+
description: Forbidden
135+
504:
136+
description: Gateway Timeout
137+
)___");
138+
node["get"]["responses"]["200"]["content"]["application/json"]["schema"] = TProtoToYaml::ProtoToYamlSchema<Ydb::Operations::Operation>();
139+
return node;
140+
}
141+
};
142+
143+
}

0 commit comments

Comments
 (0)