Skip to content

Commit edb7ba0

Browse files
added query script
1 parent b7f353a commit edb7ba0

12 files changed

+427
-36
lines changed

ydb/core/grpc_services/local_rpc/local_rpc.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -293,23 +293,23 @@ concept TRequestWithOperationParams = requires(TRequest& request) {
293293
};
294294

295295
template<TRequestWithOperationParams TRequest>
296-
void SetRequestSyncOperationMode(TRequest& request) {
297-
request.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::SYNC);
296+
void SetRequestSyncOperationMode(TRequest& request, Ydb::Operations::OperationParams::OperationMode operationMode = Ydb::Operations::OperationParams::SYNC) {
297+
request.mutable_operation_params()->set_operation_mode(operationMode);
298298
}
299299

300300
template<class TRequest>
301-
void SetRequestSyncOperationMode(TRequest&) {
301+
void SetRequestSyncOperationMode(TRequest&, Ydb::Operations::OperationParams::OperationMode) {
302302
// nothing
303303
}
304304

305305
template<typename TRpc>
306306
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database,
307307
const TMaybe<TString>& token, const TMaybe<TString>& requestType,
308-
TActorSystem* actorSystem, bool internalCall = false)
308+
TActorSystem* actorSystem, bool internalCall = false, Ydb::Operations::OperationParams::OperationMode operationMode = Ydb::Operations::OperationParams::SYNC)
309309
{
310310
auto promise = NThreading::NewPromise<typename TRpc::TResponse>();
311311

312-
SetRequestSyncOperationMode(proto);
312+
SetRequestSyncOperationMode(proto, operationMode);
313313

314314
using TCbWrapper = TPromiseWrapper<typename TRpc::TResponse>;
315315
auto req = new TLocalRpcCtx<TRpc, TCbWrapper>(std::move(proto), TCbWrapper(promise), database, token, requestType, internalCall);
@@ -320,8 +320,8 @@ NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest
320320
}
321321

322322
template<typename TRpc>
323-
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, TActorSystem* actorSystem, bool internalCall = false) {
324-
return DoLocalRpc<TRpc>(std::move(proto), database, token, Nothing(), actorSystem, internalCall);
323+
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database, const TMaybe<TString>& token, TActorSystem* actorSystem, bool internalCall = false, Ydb::Operations::OperationParams::OperationMode operationMode = Ydb::Operations::OperationParams::SYNC) {
324+
return DoLocalRpc<TRpc>(std::move(proto), database, token, Nothing(), actorSystem, internalCall, operationMode);
325325
}
326326

327327
template<typename TRpc>

ydb/core/grpc_services/query/rpc_execute_script.cpp

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/base/appdata.h>
44
#include <ydb/library/ydb_issue/issue_helpers.h>
55
#include <ydb/core/grpc_services/base/base.h>
6+
#include <ydb/core/grpc_services/rpc_request_base.h>
67
#include <ydb/core/grpc_services/rpc_kqp_base.h>
78
#include <ydb/core/grpc_services/audit_dml_operations.h>
89
#include <ydb/core/kqp/common/kqp.h>
@@ -71,26 +72,28 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
7172
return {Ydb::StatusIds::SUCCESS, {}};
7273
}
7374

74-
class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
75+
class TExecuteScriptRPC : public TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false> {
7576
public:
77+
using TRpcRequestActorBase = TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false>;
78+
7679
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
7780
return NKikimrServices::TActivity::GRPC_REQ;
7881
}
7982

80-
TExecuteScriptRPC(TEvExecuteScriptRequest* request)
81-
: Request_(request)
83+
TExecuteScriptRPC(IRequestNoOpCtx* request)
84+
: TRpcRequestActorBase(request)
8285
{}
8386

8487
void Bootstrap() {
8588
NYql::TIssues issues;
86-
const auto& request = *Request_->GetProtoRequest();
89+
const auto& request = GetProtoRequest();
8790

88-
if (request.operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
91+
if (request->operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
8992
issues.AddIssue("ExecuteScript must be asyncronous operation");
9093
return Reply(Ydb::StatusIds::BAD_REQUEST, issues);
9194
}
9295

93-
AuditContextAppend(Request_.get(), request);
96+
AuditContextAppend(Request.Get(), request);
9497

9598
Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
9699
if (auto scriptRequest = MakeScriptRequest(issues, status)) {
@@ -122,14 +125,14 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
122125
}
123126

124127
THolder<NKqp::TEvKqp::TEvScriptRequest> MakeScriptRequest(NYql::TIssues& issues, Ydb::StatusIds::StatusCode& status) const {
125-
const auto* req = Request_->GetProtoRequest();
126-
const auto traceId = Request_->GetTraceId();
128+
const auto* req = GetProtoRequest();
129+
const auto traceId = Request->GetTraceId();
127130

128131
auto ev = MakeHolder<NKqp::TEvKqp::TEvScriptRequest>();
129132

130-
SetAuthToken(ev, *Request_);
131-
SetDatabase(ev, *Request_);
132-
SetRlPath(ev, *Request_);
133+
SetAuthToken(ev, *Request);
134+
SetDatabase(ev, *Request);
135+
SetRlPath(ev, *Request);
133136

134137
if (traceId) {
135138
ev->Record.SetTraceId(traceId.GetRef());
@@ -162,12 +165,9 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
162165

163166
result.set_status(status);
164167

165-
AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), result);
166-
167-
TString serializedResult;
168-
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);
168+
AuditContextAppend(Request.Get(), GetProtoRequest(), result);
169169

170-
Request_->SendSerializedResult(std::move(serializedResult), status);
170+
TProtoResponseHelper::SendProtoResponse(result, status, Request);
171171

172172
PassAway();
173173
}
@@ -177,9 +177,6 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
177177
result.set_ready(true);
178178
Reply(status, std::move(result), issues);
179179
}
180-
181-
private:
182-
std::unique_ptr<TEvExecuteScriptRequest> Request_;
183180
};
184181

185182
} // namespace
@@ -193,6 +190,11 @@ void DoExecuteScript(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider
193190
f.RegisterActor(new TExecuteScriptRPC(req));
194191
}
195192

193+
} // namespace NQuery
194+
195+
template<>
196+
IActor* TEvExecuteScriptRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
197+
return new TExecuteScriptRPC(msg);
196198
}
197199

198200
} // namespace NKikimr::NGRpcService

ydb/core/grpc_services/query/rpc_fetch_script_results.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
3535
return NKikimrServices::TActivity::GRPC_REQ;
3636
}
3737

38-
TFetchScriptResultsRPC(TEvFetchScriptResultsRequest* request)
38+
TFetchScriptResultsRPC(IRequestNoOpCtx* request)
3939
: TRpcRequestActorBase(request)
4040
{}
4141

@@ -108,10 +108,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
108108

109109
result.set_status(status);
110110

111-
TString serializedResult;
112-
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);
113-
114-
Request->SendSerializedResult(std::move(serializedResult), status);
111+
TProtoResponseHelper::SendProtoResponse(result, status, Request);
115112

116113
PassAway();
117114
}
@@ -154,4 +151,9 @@ void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityPro
154151

155152
}
156153

154+
template<>
155+
IActor* TEvFetchScriptResultsRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
156+
return new TFetchScriptResultsRPC(msg);
157+
}
158+
157159
} // namespace NKikimr::NGRpcService

ydb/core/grpc_services/rpc_request_base.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ class TRpcRequestActor: public TActorBootstrapped<TDerived> {
130130
: Request(ev)
131131
, DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData())))
132132
{
133+
Cerr << "Actual type of msg ev: " << typeid(*ev).name() << Endl;
133134
if (const auto& userToken = Request->GetSerializedToken()) {
134135
UserToken = MakeHolder<NACLib::TUserToken>(userToken);
135136
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#include "json_handlers.h"
2+
#include "query_execute_script.h"
3+
#include "query_fetch_script.h"
4+
5+
namespace NKikimr::NViewer {
6+
7+
void InitQueryExecuteScriptJsonHandler(TJsonHandlers& handlers) {
8+
handlers.AddHandler("/query/script/execute", new TJsonHandler<TQueryExecuteScript>(TQueryExecuteScript::GetSwagger()));
9+
}
10+
11+
void InitQueryFetchScriptJsonHandler(TJsonHandlers& handlers) {
12+
handlers.AddHandler("/query/script/fetch", new TJsonHandler<TQueryFetchScript>(TQueryFetchScript::GetSwagger()));
13+
}
14+
15+
void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers) {
16+
InitQueryExecuteScriptJsonHandler(jsonHandlers);
17+
InitQueryFetchScriptJsonHandler(jsonHandlers);
18+
}
19+
20+
} // namespace NKikimr::NViewer

ydb/core/viewer/json_local_rpc.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TPr
4848
ui32 Timeout = 0;
4949
TString Database;
5050
NThreading::TFuture<TProtoResponse> RpcFuture;
51+
Ydb::Operations::OperationParams::OperationMode OperationMode = Ydb::Operations::OperationParams::SYNC;
5152

5253
public:
5354
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -147,21 +148,27 @@ class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TPr
147148
}
148149

149150
void SendGrpcRequest() {
150-
RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem());
151+
RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem(), false, OperationMode);
151152
RpcFuture.Subscribe([actorId = TBase::SelfId(), actorSystem = TlsActivationContext->ActorSystem()]
152153
(const NThreading::TFuture<TProtoResponse>& future) {
153154
auto& response = future.GetValueSync();
154155
auto result = MakeHolder<TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>>();
155156
if constexpr (TRpcEv::IsOp) {
157+
Cerr << "aaaa IsOp" << Endl;
158+
Cerr << "aaaa response.operation().ready() " << response.operation().ready() << Endl;
159+
Cerr << "aaaa response.operation().status() " << (response.operation().status() == Ydb::StatusIds::SUCCESS) << Endl;
160+
Cerr << "aaaa response.ShortDebugString() " << response.ShortDebugString() << Endl;
156161
if (response.operation().ready() && response.operation().status() == Ydb::StatusIds::SUCCESS) {
157162
TProtoResult rs;
158163
response.operation().result().UnpackTo(&rs);
164+
Cerr << "aaaa rs " << rs.ShortDebugString() << Endl;
159165
result->Message = std::move(rs);
160166
}
161167
NYql::TIssues issues;
162168
NYql::IssuesFromMessage(response.operation().issues(), issues);
163169
result->Status = NYdb::TStatus(NYdb::EStatus(response.operation().status()), std::move(issues));
164170
} else {
171+
Cerr << "aaaa noOp" << Endl;
165172
result->Message = response;
166173
NYql::TIssues issues;
167174
NYql::IssuesFromMessage(response.issues(), issues);

ydb/core/viewer/operation_list.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,17 @@ class TOperationList : public TOperationListRpc {
3030
} else {
3131
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "field 'database' is required"));
3232
}
33-
3433
if (params.Has("kind")) {
3534
Request.set_kind(params.Get("kind"));
3635
} else {
3736
return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", "field 'kind' is required"));
3837
}
39-
4038
if (params.Has("page_size")) {
4139
Request.set_page_size(FromStringWithDefault<ui32>(params.Get("page_size"), 0));
4240
}
43-
4441
if (params.Has("page_token")) {
4542
Request.set_page_token(params.Get("page_token"));
4643
}
47-
4844
TBase::Bootstrap();
4945
}
5046

0 commit comments

Comments
 (0)