diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp index 3c573606a726..97eeb4ef7294 100644 --- a/ydb/core/grpc_services/query/rpc_execute_script.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -72,27 +73,29 @@ std::tuple FillKqpRequest( return {Ydb::StatusIds::SUCCESS, {}}; } -class TExecuteScriptRPC : public TActorBootstrapped { +class TExecuteScriptRPC : public TRpcRequestActor { public: + using TRpcRequestActorBase = TRpcRequestActor; + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::GRPC_REQ; } - TExecuteScriptRPC(TEvExecuteScriptRequest* request) - : Request_(request) + TExecuteScriptRPC(IRequestNoOpCtx* request) + : TRpcRequestActorBase(request) {} void Bootstrap() { NYql::TIssues issues; - const auto& request = *Request_->GetProtoRequest(); + const auto& request = GetProtoRequest(); - if (request.operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) { + if (request->operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) { issues.AddIssue("ExecuteScript must be asyncronous operation"); return Reply(Ydb::StatusIds::BAD_REQUEST, issues); } - AuditContextAppend(Request_.get(), request); - NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext()); + AuditContextAppend(Request.Get(), request); + NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *request, TlsActivationContext->AsActorContext()); Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS; if (auto scriptRequest = MakeScriptRequest(issues, status)) { @@ -113,7 +116,7 @@ class TExecuteScriptRPC : public TActorBootstrapped { ) void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) { - NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx); + NDataIntegrity::LogIntegrityTrails(Request->GetTraceId(), *GetProtoRequest(), ev, ctx); Ydb::Operations::Operation operation; operation.set_id(ev->Get()->OperationId); @@ -126,14 +129,14 @@ class TExecuteScriptRPC : public TActorBootstrapped { } THolder MakeScriptRequest(NYql::TIssues& issues, Ydb::StatusIds::StatusCode& status) const { - const auto* req = Request_->GetProtoRequest(); - const auto traceId = Request_->GetTraceId(); + const auto* req = GetProtoRequest(); + const auto traceId = Request->GetTraceId(); auto ev = MakeHolder(); - SetAuthToken(ev, *Request_); - SetDatabase(ev, *Request_); - SetRlPath(ev, *Request_); + SetAuthToken(ev, *Request); + SetDatabase(ev, *Request); + SetRlPath(ev, *Request); if (traceId) { ev->Record.SetTraceId(traceId.GetRef()); @@ -166,12 +169,9 @@ class TExecuteScriptRPC : public TActorBootstrapped { result.set_status(status); - AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), result); - - TString serializedResult; - Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult); + AuditContextAppend(Request.Get(), GetProtoRequest(), result); - Request_->SendSerializedResult(std::move(serializedResult), status); + TProtoResponseHelper::SendProtoResponse(result, status, Request); PassAway(); } @@ -181,9 +181,6 @@ class TExecuteScriptRPC : public TActorBootstrapped { result.set_ready(true); Reply(status, std::move(result), issues); } - -private: - std::unique_ptr Request_; }; } // namespace @@ -197,6 +194,11 @@ void DoExecuteScript(std::unique_ptr p, const IFacilityProvider f.RegisterActor(new TExecuteScriptRPC(req)); } +} // namespace NQuery + +template<> +IActor* TEvExecuteScriptRequest::CreateRpcActor(IRequestNoOpCtx* msg) { + return new TExecuteScriptRPC(msg); } } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp index 5f729d990731..0ccc0094e520 100644 --- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp +++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp @@ -35,7 +35,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActorSendSerializedResult(std::move(serializedResult), status); + TProtoResponseHelper::SendProtoResponse(result, status, Request); PassAway(); } @@ -154,4 +151,9 @@ void DoFetchScriptResults(std::unique_ptr p, const IFacilityPro } +template<> +IActor* TEvFetchScriptResultsRequest::CreateRpcActor(IRequestNoOpCtx* msg) { + return new TFetchScriptResultsRPC(msg); +} + } // namespace NKikimr::NGRpcService diff --git a/ydb/core/viewer/json_handlers_query.cpp b/ydb/core/viewer/json_handlers_query.cpp new file mode 100644 index 000000000000..a79c05b54c1f --- /dev/null +++ b/ydb/core/viewer/json_handlers_query.cpp @@ -0,0 +1,20 @@ +#include "json_handlers.h" +#include "query_execute_script.h" +#include "query_fetch_script.h" + +namespace NKikimr::NViewer { + +void InitQueryExecuteScriptJsonHandler(TJsonHandlers& handlers) { + handlers.AddHandler("/query/script/execute", new TJsonHandler(TQueryExecuteScript::GetSwagger())); +} + +void InitQueryFetchScriptJsonHandler(TJsonHandlers& handlers) { + handlers.AddHandler("/query/script/fetch", new TJsonHandler(TQueryFetchScript::GetSwagger())); +} + +void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers) { + InitQueryExecuteScriptJsonHandler(jsonHandlers); + InitQueryFetchScriptJsonHandler(jsonHandlers); +} + +} // namespace NKikimr::NViewer diff --git a/ydb/core/viewer/query_execute_script.h b/ydb/core/viewer/query_execute_script.h new file mode 100644 index 000000000000..96f0fb09907a --- /dev/null +++ b/ydb/core/viewer/query_execute_script.h @@ -0,0 +1,96 @@ +#pragma once +#include "json_local_rpc.h" +#include +#include +#include +#include + +namespace NKikimr { + +namespace NRpcService { + +template<> +void SetRequestSyncOperationMode(Ydb::Query::ExecuteScriptRequest& request) { + request.mutable_operation_params()->set_operation_mode(Ydb::Operations::OperationParams::ASYNC); +} + +} + +namespace NViewer { + +using TQueryExecuteScriptRpc = TJsonLocalRpc>; + +class TQueryExecuteScript : public TQueryExecuteScriptRpc { +public: + using TBase = TQueryExecuteScriptRpc; + + TQueryExecuteScript(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) + : TBase(viewer, ev) + { + AllowedMethods = {HTTP_METHOD_POST}; + } + + static YAML::Node GetSwagger() { + YAML::Node node = YAML::Load(R"___( + post: + tags: + - script query + summary: Execute script + description: Execute script + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + database: + type: string + required: true + script_content: + type: object + properties: + text: + type: string + description: query text + required: true + syntax: + type: string + description: | + syntax: + * `SYNTAX_YQL_V1` + * `SYNTAX_PG` + required: false + exec_mode: + type: string + description: | + exec_mode: + * `EXEC_MODE_PARSE` + * `EXEC_MODE_VALIDATE` + * `EXEC_MODE_EXPLAIN` + * `EXEC_MODE_EXECUTE` + required: true + responses: + 200: + description: OK + content: + application/json: + schema: {} + 400: + description: Bad Request + 403: + description: Forbidden + 504: + description: Gateway Timeout + )___"); + node["get"]["responses"]["200"]["content"]["application/json"]["schema"] = TProtoToYaml::ProtoToYamlSchema(); + return node; + } +}; + +} +} diff --git a/ydb/core/viewer/query_fetch_script.h b/ydb/core/viewer/query_fetch_script.h new file mode 100644 index 000000000000..6a64d230b00f --- /dev/null +++ b/ydb/core/viewer/query_fetch_script.h @@ -0,0 +1,76 @@ +#pragma once +#include "json_local_rpc.h" +#include +#include +#include + +namespace NKikimr::NViewer { + +using TQueryFetchScriptRpc = TJsonLocalRpc>; + +class TQueryFetchScript : public TQueryFetchScriptRpc { +public: + using TBase = TQueryFetchScriptRpc; + + TQueryFetchScript(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev) + : TBase(viewer, ev) + { + AllowedMethods = {HTTP_METHOD_GET}; + } + + static YAML::Node GetSwagger() { + YAML::Node node = YAML::Load(R"___( + get: + tags: + - script query + summary: Get operation + description: Check status for a given operation + parameters: + - name: database + in: query + description: database name + required: true + type: string + - name: operation_id + in: query + description: operation id + required: true + type: string + - name: result_set_index + in: query + description: result set index + required: false + type: string + - name: fetch_token + in: query + description: fetch token + required: false + type: string + - name: rows_limit + in: query + description: rows limit (less than 1000 allowed) + required: false + type: string + responses: + 200: + description: OK + content: + application/json: + schema: {} + 400: + description: Bad Request + 403: + description: Forbidden + 504: + description: Gateway Timeout + )___"); + node["get"]["responses"]["200"]["content"]["application/json"]["schema"] = TProtoToYaml::ProtoToYamlSchema(); + return node; + } +}; + +} diff --git a/ydb/core/viewer/viewer.cpp b/ydb/core/viewer/viewer.cpp index 51f382b2c430..b32cc5c0eaad 100644 --- a/ydb/core/viewer/viewer.cpp +++ b/ydb/core/viewer/viewer.cpp @@ -32,6 +32,7 @@ extern void InitViewerBrowseJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitPDiskJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitVDiskJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitOperationJsonHandlers(TJsonHandlers& jsonHandlers); +extern void InitQueryJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitSchemeJsonHandlers(TJsonHandlers& jsonHandlers); extern void InitStorageJsonHandlers(TJsonHandlers& jsonHandlers); @@ -119,6 +120,13 @@ class TViewer : public TActorBootstrapped, public IViewer { .UseAuth = true, .AllowedSIDs = monitoringAllowedSIDs, }); + mon->RegisterActorPage({ + .RelPath = "query", + .ActorSystem = ctx.ExecutorThread.ActorSystem, + .ActorId = ctx.SelfID, + .UseAuth = true, + .AllowedSIDs = monitoringAllowedSIDs, + }); mon->RegisterActorPage({ .RelPath = "scheme", .ActorSystem = ctx.ExecutorThread.ActorSystem, @@ -144,6 +152,7 @@ class TViewer : public TActorBootstrapped, public IViewer { InitVDiskJsonHandlers(JsonHandlers); InitStorageJsonHandlers(JsonHandlers); InitOperationJsonHandlers(JsonHandlers); + InitQueryJsonHandlers(JsonHandlers); InitSchemeJsonHandlers(JsonHandlers); InitViewerBrowseJsonHandlers(JsonHandlers); diff --git a/ydb/core/viewer/viewer_ut.cpp b/ydb/core/viewer/viewer_ut.cpp index 696ad9918465..20d7c731b81b 100644 --- a/ydb/core/viewer/viewer_ut.cpp +++ b/ydb/core/viewer/viewer_ut.cpp @@ -26,6 +26,7 @@ #include #include +#include using namespace NKikimr; using namespace NViewer; @@ -1831,4 +1832,107 @@ Y_UNIT_TEST_SUITE(Viewer) { auto resultSets = json["Databases"].GetArray(); UNIT_ASSERT_EQUAL_C(1, resultSets.size(), response); } + + static const ui32 ROWS_N = 15; + static const ui32 ROWS_LIMIT = 5; + + TString PostExecuteScript(TKeepAliveHttpClient& httpClient, TString query) { + TStringStream requestBody; + requestBody + << "{ \"database\": \"/Root\"," + << " \"script_content\": {" + << " \"text\": \"" << query << "\"}," + << " \"exec_mode\": \"EXEC_MODE_EXECUTE\" }"; + TStringStream responseStream; + TKeepAliveHttpClient::THeaders headers; + headers["Content-Type"] = "application/json"; + headers["Authorization"] = "test_ydb_token"; + const TKeepAliveHttpClient::THttpCode statusCode = httpClient.DoPost(TStringBuilder() + << "/query/script/execute?timeout=600000" + << "&database=%2FRoot", requestBody.Str(), &responseStream, headers); + const TString response = responseStream.ReadAll(); + UNIT_ASSERT_EQUAL_C(statusCode, HTTP_OK, statusCode << ": " << response); + return response; + } + + TString GetOperation(TKeepAliveHttpClient& httpClient, TString id) { + TStringStream requestBody; + TStringStream responseStream; + TKeepAliveHttpClient::THeaders headers; + headers["Content-Type"] = "application/json"; + headers["Authorization"] = "test_ydb_token"; + id = std::regex_replace(id.c_str(), std::regex("/"), "%2F"); + const TKeepAliveHttpClient::THttpCode statusCode = httpClient.DoGet(TStringBuilder() + << "/operation/get?timeout=600000&id=" << id + << "&database=%2FRoot", &responseStream, headers); + const TString response = responseStream.ReadAll(); + UNIT_ASSERT_EQUAL_C(statusCode, HTTP_OK, statusCode << ": " << response); + return response; + } + + TString GetFetchScript(TKeepAliveHttpClient& httpClient, TString id) { + TStringStream requestBody; + TStringStream responseStream; + TKeepAliveHttpClient::THeaders headers; + headers["Content-Type"] = "application/json"; + headers["Authorization"] = "test_ydb_token"; + id = std::regex_replace(id.c_str(), std::regex("/"), "%2F"); + const TKeepAliveHttpClient::THttpCode statusCode = httpClient.DoGet(TStringBuilder() + << "/query/script/fetch?timeout=600000&operation_id=" << id + << "&database=%2FRoot" + << "&rows_limit=" << ROWS_LIMIT, &responseStream, headers); + const TString response = responseStream.ReadAll(); + UNIT_ASSERT_EQUAL_C(statusCode, HTTP_OK, statusCode << ": " << response); + return response; + } + + Y_UNIT_TEST(QueryExecuteScript) { + TPortManager tp; + ui16 port = tp.GetPort(2134); + ui16 grpcPort = tp.GetPort(2135); + ui16 monPort = tp.GetPort(8765); + auto settings = TServerSettings(port); + settings.InitKikimrRunConfig() + .SetNodeCount(1) + .SetUseRealThreads(true) + .SetDomainName("Root") + .SetUseSectorMap(true) + .SetMonitoringPortOffset(monPort, true); + + TServer server(settings); + server.EnableGRpc(grpcPort); + TClient client(settings); + client.InitRootScheme(); + + TTestActorRuntime& runtime = *server.GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE); + + TKeepAliveHttpClient httpClient("localhost", monPort); + + PostQuery(httpClient, "CREATE TABLE `/Root/Test` (Key Uint64, Value String, PRIMARY KEY (Key));", "execute-query"); + for (ui32 i = 1; i <= ROWS_N; ++i) { + PostQuery(httpClient, TStringBuilder() << "INSERT INTO `/Root/Test` (Key, Value) VALUES (" << i << ", 'testvalue');", "execute-query"); + } + + NJson::TJsonReaderConfig jsonCfg; + NJson::TJsonValue json; + + TString response = PostExecuteScript(httpClient, "SELECT * FROM `/Root/Test`;"); + NJson::ReadJsonTree(response, &jsonCfg, &json, /* throwOnError = */ true); + UNIT_ASSERT_EQUAL_C(json["status"].GetString(), "SUCCESS", response); + TString id = json["id"].GetString(); + + Sleep(TDuration::MilliSeconds(1000)); + + response = GetOperation(httpClient, id); + NJson::ReadJsonTree(response, &jsonCfg, &json, /* throwOnError = */ true); + UNIT_ASSERT_EQUAL_C(json["issues"].GetArray().size(), 0, response); + + response = GetFetchScript(httpClient, id); + NJson::ReadJsonTree(response, &jsonCfg, &json, /* throwOnError = */ true); + UNIT_ASSERT_EQUAL_C(json["status"].GetString(), "SUCCESS", response); + auto rows = json["result_set"].GetMap().at("rows").GetArray(); + UNIT_ASSERT_EQUAL_C(rows.size(), ROWS_LIMIT, response); + } + } diff --git a/ydb/core/viewer/ya.make b/ydb/core/viewer/ya.make index 69056b3bfbd7..14fe60c1e148 100644 --- a/ydb/core/viewer/ya.make +++ b/ydb/core/viewer/ya.make @@ -21,6 +21,7 @@ SRCS( json_handlers.h json_handlers_browse.cpp json_handlers_operation.cpp + json_handlers_query.cpp json_handlers_pdisk.cpp json_handlers_scheme.cpp json_handlers_storage.cpp