Skip to content

add viewer query script handlers #9423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions ydb/core/grpc_services/query/rpc_execute_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/rpc_request_base.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/grpc_services/audit_dml_operations.h>
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
Expand Down Expand Up @@ -72,27 +73,29 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
return {Ydb::StatusIds::SUCCESS, {}};
}

class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
class TExecuteScriptRPC : public TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcherednik have you seen this?

public:
using TRpcRequestActorBase = TRpcRequestActor<TExecuteScriptRPC, TEvExecuteScriptRequest, false>;

static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::GRPC_REQ;
}

TExecuteScriptRPC(TEvExecuteScriptRequest* request)
: Request_(request)
TExecuteScriptRPC(IRequestNoOpCtx* request)
: TRpcRequestActorBase(request)
{}

void Bootstrap() {
NYql::TIssues issues;
const auto& request = *Request_->GetProtoRequest();
const auto& request = GetProtoRequest();

if (request.operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
if (request->operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
issues.AddIssue("ExecuteScript must be asyncronous operation");
return Reply(Ydb::StatusIds::BAD_REQUEST, issues);
}

AuditContextAppend(Request_.get(), request);
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext());
AuditContextAppend(Request.Get(), request);
NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *request, TlsActivationContext->AsActorContext());

Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
if (auto scriptRequest = MakeScriptRequest(issues, status)) {
Expand All @@ -113,7 +116,7 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
)

void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) {
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);
NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *GetProtoRequest(), ev, ctx);

Ydb::Operations::Operation operation;
operation.set_id(ev->Get()->OperationId);
Expand All @@ -126,14 +129,14 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
}

THolder<NKqp::TEvKqp::TEvScriptRequest> MakeScriptRequest(NYql::TIssues& issues, Ydb::StatusIds::StatusCode& status) const {
const auto* req = Request_->GetProtoRequest();
const auto traceId = Request_->GetTraceId();
const auto* req = GetProtoRequest();
const auto traceId = Request->GetTraceId();

auto ev = MakeHolder<NKqp::TEvKqp::TEvScriptRequest>();

SetAuthToken(ev, *Request_);
SetDatabase(ev, *Request_);
SetRlPath(ev, *Request_);
SetAuthToken(ev, *Request);
SetDatabase(ev, *Request);
SetRlPath(ev, *Request);

if (traceId) {
ev->Record.SetTraceId(traceId.GetRef());
Expand Down Expand Up @@ -166,12 +169,9 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {

result.set_status(status);

AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), result);

TString serializedResult;
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);
AuditContextAppend(Request.Get(), GetProtoRequest(), result);

Request_->SendSerializedResult(std::move(serializedResult), status);
TProtoResponseHelper::SendProtoResponse(result, status, Request);

PassAway();
}
Expand All @@ -181,9 +181,6 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
result.set_ready(true);
Reply(status, std::move(result), issues);
}

private:
std::unique_ptr<TEvExecuteScriptRequest> Request_;
};

} // namespace
Expand All @@ -197,6 +194,11 @@ void DoExecuteScript(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider
f.RegisterActor(new TExecuteScriptRPC(req));
}

} // namespace NQuery

template<>
IActor* TEvExecuteScriptRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
return new TExecuteScriptRPC(msg);
}

} // namespace NKikimr::NGRpcService
12 changes: 7 additions & 5 deletions ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
return NKikimrServices::TActivity::GRPC_REQ;
}

TFetchScriptResultsRPC(TEvFetchScriptResultsRequest* request)
TFetchScriptResultsRPC(IRequestNoOpCtx* request)
: TRpcRequestActorBase(request)
{}

Expand Down Expand Up @@ -108,10 +108,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T

result.set_status(status);

TString serializedResult;
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);

Request->SendSerializedResult(std::move(serializedResult), status);
TProtoResponseHelper::SendProtoResponse(result, status, Request);

PassAway();
}
Expand Down Expand Up @@ -154,4 +151,9 @@ void DoFetchScriptResults(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityPro

}

template<>
IActor* TEvFetchScriptResultsRequest::CreateRpcActor(IRequestNoOpCtx* msg) {
return new TFetchScriptResultsRPC(msg);
}

} // namespace NKikimr::NGRpcService
20 changes: 20 additions & 0 deletions ydb/core/viewer/json_handlers_query.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "json_handlers.h"
#include "query_execute_script.h"
#include "query_fetch_script.h"

namespace NKikimr::NViewer {

void InitQueryExecuteScriptJsonHandler(TJsonHandlers& handlers) {
handlers.AddHandler("/query/script/execute", new TJsonHandler<TQueryExecuteScript>(TQueryExecuteScript::GetSwagger()));
}

void InitQueryFetchScriptJsonHandler(TJsonHandlers& handlers) {
handlers.AddHandler("/query/script/fetch", new TJsonHandler<TQueryFetchScript>(TQueryFetchScript::GetSwagger()));
}

void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers) {
InitQueryExecuteScriptJsonHandler(jsonHandlers);
InitQueryFetchScriptJsonHandler(jsonHandlers);
}

} // namespace NKikimr::NViewer
96 changes: 96 additions & 0 deletions ydb/core/viewer/query_execute_script.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once
#include "json_local_rpc.h"
#include <ydb/core/grpc_services/rpc_calls.h>
#include <ydb/core/viewer/yaml/yaml.h>
#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h>
#include <ydb/public/api/grpc/ydb_operation_v1.grpc.pb.h>

namespace NKikimr {

namespace NRpcService {

template<>
void SetRequestSyncOperationMode<Ydb::Query::ExecuteScriptRequest>(Ydb::Query::ExecuteScriptRequest& request) {
request.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::ASYNC);
}

}

namespace NViewer {

using TQueryExecuteScriptRpc = TJsonLocalRpc<Ydb::Query::ExecuteScriptRequest,
Ydb::Operations::Operation,
Ydb::Operations::Operation,
Ydb::Query::V1::QueryService,
NKikimr::NGRpcService::TGrpcRequestNoOperationCall<Ydb::Query::ExecuteScriptRequest, Ydb::Operations::Operation>>;

class TQueryExecuteScript : public TQueryExecuteScriptRpc {
public:
using TBase = TQueryExecuteScriptRpc;

TQueryExecuteScript(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
: TBase(viewer, ev)
{
AllowedMethods = {HTTP_METHOD_POST};
}

static YAML::Node GetSwagger() {
YAML::Node node = YAML::Load(R"___(
post:
tags:
- script query
summary: Execute script
description: Execute script
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
database:
type: string
required: true
script_content:
type: object
properties:
text:
type: string
description: query text
required: true
syntax:
type: string
description: |
syntax:
* `SYNTAX_YQL_V1`
* `SYNTAX_PG`
required: false
exec_mode:
type: string
description: |
exec_mode:
* `EXEC_MODE_PARSE`
* `EXEC_MODE_VALIDATE`
* `EXEC_MODE_EXPLAIN`
* `EXEC_MODE_EXECUTE`
required: true
responses:
200:
description: OK
content:
application/json:
schema: {}
400:
description: Bad Request
403:
description: Forbidden
504:
description: Gateway Timeout
)___");
node["get"]["responses"]["200"]["content"]["application/json"]["schema"] = TProtoToYaml::ProtoToYamlSchema<Ydb::Operations::Operation>();
return node;
}
};

}
}
76 changes: 76 additions & 0 deletions ydb/core/viewer/query_fetch_script.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#pragma once
#include "json_local_rpc.h"
#include <ydb/core/grpc_services/rpc_calls.h>
#include <ydb/core/viewer/yaml/yaml.h>
#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h>

namespace NKikimr::NViewer {

using TQueryFetchScriptRpc = TJsonLocalRpc<Ydb::Query::FetchScriptResultsRequest,
Ydb::Query::FetchScriptResultsResponse,
Ydb::Query::FetchScriptResultsResponse,
Ydb::Query::V1::QueryService,
NKikimr::NGRpcService::TGrpcRequestNoOperationCall<Ydb::Query::FetchScriptResultsRequest, Ydb::Query::FetchScriptResultsResponse>>;

class TQueryFetchScript : public TQueryFetchScriptRpc {
public:
using TBase = TQueryFetchScriptRpc;

TQueryFetchScript(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
: TBase(viewer, ev)
{
AllowedMethods = {HTTP_METHOD_GET};
}

static YAML::Node GetSwagger() {
YAML::Node node = YAML::Load(R"___(
get:
tags:
- script query
summary: Get operation
description: Check status for a given operation
parameters:
- name: database
in: query
description: database name
required: true
type: string
- name: operation_id
in: query
description: operation id
required: true
type: string
- name: result_set_index
in: query
description: result set index
required: false
type: string
- name: fetch_token
in: query
description: fetch token
required: false
type: string
- name: rows_limit
in: query
description: rows limit (less than 1000 allowed)
required: false
type: string
responses:
200:
description: OK
content:
application/json:
schema: {}
400:
description: Bad Request
403:
description: Forbidden
504:
description: Gateway Timeout
)___");
node["get"]["responses"]["200"]["content"]["application/json"]["schema"] = TProtoToYaml::ProtoToYamlSchema<Ydb::Query::FetchScriptResultsResponse>();
return node;
}
};

}
9 changes: 9 additions & 0 deletions ydb/core/viewer/viewer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern void InitViewerBrowseJsonHandlers(TJsonHandlers& jsonHandlers);
extern void InitPDiskJsonHandlers(TJsonHandlers& jsonHandlers);
extern void InitVDiskJsonHandlers(TJsonHandlers& jsonHandlers);
extern void InitOperationJsonHandlers(TJsonHandlers& jsonHandlers);
extern void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers);
extern void InitSchemeJsonHandlers(TJsonHandlers& jsonHandlers);
extern void InitStorageJsonHandlers(TJsonHandlers& jsonHandlers);

Expand Down Expand Up @@ -119,6 +120,13 @@ class TViewer : public TActorBootstrapped<TViewer>, public IViewer {
.UseAuth = true,
.AllowedSIDs = monitoringAllowedSIDs,
});
mon->RegisterActorPage({
.RelPath = "query",
.ActorSystem = ctx.ExecutorThread.ActorSystem,
.ActorId = ctx.SelfID,
.UseAuth = true,
.AllowedSIDs = monitoringAllowedSIDs,
});
mon->RegisterActorPage({
.RelPath = "scheme",
.ActorSystem = ctx.ExecutorThread.ActorSystem,
Expand All @@ -144,6 +152,7 @@ class TViewer : public TActorBootstrapped<TViewer>, public IViewer {
InitVDiskJsonHandlers(JsonHandlers);
InitStorageJsonHandlers(JsonHandlers);
InitOperationJsonHandlers(JsonHandlers);
InitQueryJsonHandlers(JsonHandlers);
InitSchemeJsonHandlers(JsonHandlers);
InitViewerBrowseJsonHandlers(JsonHandlers);

Expand Down
Loading
Loading