Skip to content

Commit 95f0b21

Browse files
authored
realign mvp core library with arcadia (#11401)
1 parent bad5730 commit 95f0b21

File tree

4 files changed

+234
-8
lines changed

4 files changed

+234
-8
lines changed

ydb/mvp/core/core_ydb.cpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ NJson::TJsonWriterConfig GetDefaultJsonWriterConfig() {
1212

1313
NJson::TJsonReaderConfig NMVP::THandlerActorYdb::JsonReaderConfig;
1414
NJson::TJsonWriterConfig NMVP::THandlerActorYdb::JsonWriterConfig = GetDefaultJsonWriterConfig();
15-
TMap<std::pair<TStringBuf, TStringBuf>, TYdbUnitResources> DefaultUnitResources =
15+
TMap<std::pair<TString, TString>, TYdbUnitResources> DefaultUnitResources =
1616
{
1717
{
1818
{
@@ -105,6 +105,24 @@ std::unique_ptr<NYdb::NTopic::TTopicClient> TYdbLocation::GetTopicClientPtr(TStr
105105
return std::make_unique<NYdb::NTopic::TTopicClient>(GetDriver(endpoint, scheme), settings);
106106
}
107107

108+
std::unique_ptr<NYdb::NReplication::TReplicationClient>
109+
TYdbLocation::GetReplicationClientPtr(TStringBuf endpoint, TStringBuf scheme,
110+
const NYdb::TCommonClientSettings& settings) const {
111+
return std::make_unique<NYdb::NReplication::TReplicationClient>(GetDriver(endpoint, scheme), settings);
112+
}
113+
114+
NYdb::NTable::TTableClient TYdbLocation::GetTableClient(const TRequest& request, const NYdb::NTable::TClientSettings& defaultClientSettings) const {
115+
NYdb::NTable::TClientSettings clientSettings(defaultClientSettings);
116+
TString authToken = request.GetAuthToken();
117+
if (authToken) {
118+
clientSettings.AuthToken(authToken);
119+
}
120+
if (TString database = TYdbLocation::GetDatabaseName(request)) {
121+
clientSettings.Database(database);
122+
}
123+
return GetTableClient(clientSettings);
124+
}
125+
108126
NYdb::NTable::TTableClient TYdbLocation::GetTableClient(const NYdb::NTable::TClientSettings& clientSettings) const {
109127
return NYdb::NTable::TTableClient(GetDriver(), clientSettings);
110128
}
@@ -137,6 +155,10 @@ std::unique_ptr<NYdb::NScripting::TScriptingClient> TYdbLocation::GetScriptingCl
137155
return std::make_unique<NYdb::NScripting::TScriptingClient>(GetDriver(endpoint, scheme), settings);
138156
}
139157

158+
std::unique_ptr<NYdb::NQuery::TQueryClient> TYdbLocation::GetQueryClientPtr(TStringBuf endpoint, TStringBuf scheme, const NYdb::NQuery::TClientSettings& settings) const {
159+
return std::make_unique<NYdb::NQuery::TQueryClient>(GetDriver(endpoint, scheme), settings);
160+
}
161+
140162
NHttp::THttpOutgoingRequestPtr TYdbLocation::CreateHttpMonRequestGet(TStringBuf uri, const TRequest& request) const {
141163
NHttp::THttpOutgoingRequestPtr out;
142164
TString endpoint = GetEndpoint("http-mon", "http");
@@ -410,6 +432,44 @@ TString GetAuthHeaderValue(const TString& tokenName) {
410432
return authHeaderValue;
411433
}
412434

435+
void TryGetLocationFromConfig(TYdbLocation& location, const YAML::Node& config) {
436+
TVector<std::pair<TString, TString>> endpoints;
437+
TVector<TString> dataCenters;
438+
TMap<std::pair<TString, TString>, TYdbUnitResources> unitResources;
439+
440+
if (config["endpoints"]) {
441+
for (const auto& e: config["endpoints"]) {
442+
endpoints.emplace_back(std::make_pair(e.first.as<std::string>(), e.second.as<std::string>()));
443+
}
444+
}
445+
if (config["data_centers"]) {
446+
for (const auto& dc: config["data_centers"]) {
447+
dataCenters.emplace_back(dc.as<std::string>());
448+
}
449+
}
450+
if (config["unit_resources"]) {
451+
for (const auto& yaml_resources: config["unit_resources"]) {
452+
auto resource = TYdbUnitResources(
453+
yaml_resources["cpu"].as<double>(0.0),
454+
yaml_resources["memory"].as<ui64>(0),
455+
yaml_resources["storage"].as<ui64>(0)
456+
);
457+
unitResources[std::make_pair(
458+
yaml_resources["type"].as<std::string>(""),
459+
yaml_resources["kind"].as<std::string>(""))] = resource;
460+
}
461+
}
462+
463+
location.Name = config["name"].as<std::string>("");
464+
location.Environment = config["environment"].as<std::string>("");
465+
location.Endpoints = endpoints;
466+
location.RootDomain = config["root_domain"].as<std::string>("");
467+
location.DataCenters = dataCenters;
468+
location.UnitResources = unitResources;
469+
location.NotificationsEnvironmentId = config["notifications_environment_id"].as<ui32>(0);
470+
location.ServerlessDocumentProxyEndpoint = config["serverless_document_proxy_endpoint"].as<std::string>("");
471+
}
472+
413473
void SetGrpcKeepAlive(NYdbGrpc::TGRpcClientConfig& config) {
414474
config.IntChannelParams[GRPC_ARG_KEEPALIVE_TIME_MS] = 20000;
415475
config.IntChannelParams[GRPC_ARG_KEEPALIVE_TIMEOUT_MS] = 10000;

ydb/mvp/core/core_ydb.h

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
#pragma once
22

33
#include <ydb/mvp/security/simple/security.h>
4+
#include <contrib/libs/yaml-cpp/include/yaml-cpp/yaml.h>
45
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
56
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
67
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
78
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
89
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
10+
#include <ydb/public/sdk/cpp/client/draft/ydb_replication.h>
911
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
12+
#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
1013
#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
1114
#include <ydb/core/viewer/json/json.h>
1215
#include <ydb/library/actors/http/http.h>
@@ -139,7 +142,7 @@ struct TYdbUnitResources {
139142
}
140143
};
141144

142-
extern TMap<std::pair<TStringBuf, TStringBuf>, TYdbUnitResources> DefaultUnitResources;
145+
extern TMap<std::pair<TString, TString>, TYdbUnitResources> DefaultUnitResources;
143146

144147
TString GetAuthHeaderValue(const TString& tokenName);
145148
void SetGrpcKeepAlive(NYdbGrpc::TGRpcClientConfig& config);
@@ -149,8 +152,8 @@ struct TYdbLocation {
149152
TString Environment;
150153
TVector<std::pair<TString, TString>> Endpoints;
151154
TString RootDomain;
152-
TVector<TStringBuf> DataCenters;
153-
const TMap<std::pair<TStringBuf, TStringBuf>, TYdbUnitResources>& UnitResources;
155+
TVector<TString> DataCenters;
156+
TMap<std::pair<TString, TString>, TYdbUnitResources> UnitResources;
154157
ui32 NotificationsEnvironmentId;
155158
bool Disabled = false;
156159
TAtomicSingleton<NYdb::TDriver> Driver;
@@ -176,8 +179,8 @@ struct TYdbLocation {
176179
const TString& environment,
177180
const TVector<std::pair<TString, TString>>& endpoints,
178181
const TString& rootDomain,
179-
const TVector<TStringBuf>& dataCenters,
180-
const TMap<std::pair<TStringBuf, TStringBuf>, TYdbUnitResources>& unitResources,
182+
const TVector<TString>& dataCenters,
183+
const TMap<std::pair<TString, TString>, TYdbUnitResources>& unitResources,
181184
ui32 notificationsEnvironmentId = 0)
182185
: Name(name)
183186
, Environment(environment)
@@ -225,7 +228,7 @@ struct TYdbLocation {
225228
}
226229
}
227230

228-
const TYdbUnitResources& GetUnitResources(TStringBuf type, TStringBuf kind) const {
231+
const TYdbUnitResources& GetUnitResources(const TString& type, const TString& kind) const {
229232
auto it = UnitResources.find({type, kind});
230233
if (it != UnitResources.end()) {
231234
return it->second;
@@ -306,13 +309,16 @@ struct TYdbLocation {
306309
std::unique_ptr<NYdb::NTable::TTableClient> GetTableClientPtr(TStringBuf endpoint, const NYdb::NTable::TClientSettings& settings = NYdb::NTable::TClientSettings()) const;
307310
std::unique_ptr<NYdb::NTable::TTableClient> GetTableClientPtr(TStringBuf endpoint, TStringBuf scheme, const NYdb::NTable::TClientSettings& settings = NYdb::NTable::TClientSettings()) const;
308311
std::unique_ptr<NYdb::NTopic::TTopicClient> GetTopicClientPtr(TStringBuf endpoint, TStringBuf scheme, const NYdb::NTopic::TTopicClientSettings& settings = NYdb::NTopic::TTopicClientSettings()) const;
312+
std::unique_ptr<NYdb::NReplication::TReplicationClient> GetReplicationClientPtr(TStringBuf endpoint, TStringBuf scheme, const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings()) const;
309313

310314
std::unique_ptr<NYdb::NDataStreams::V1::TDataStreamsClient> GetDataStreamsClientPtr(TStringBuf endpoint, TStringBuf scheme, const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings()) const;
311315

316+
NYdb::NTable::TTableClient GetTableClient(const TRequest& request, const NYdb::NTable::TClientSettings& defaultClientSettings = {}) const;
312317
NYdb::NTable::TTableClient GetTableClient(const NYdb::NTable::TClientSettings& clientSettings = {}) const;
313318

314319
NYdb::NScripting::TScriptingClient GetScriptingClient(const TRequest& request) const;
315320
std::unique_ptr<NYdb::NScripting::TScriptingClient> GetScriptingClientPtr(TStringBuf endpoint, TStringBuf scheme, const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings()) const;
321+
std::unique_ptr<NYdb::NQuery::TQueryClient> GetQueryClientPtr(TStringBuf endpoint, TStringBuf scheme, const NYdb::NQuery::TClientSettings& settings = NYdb::NQuery::TClientSettings()) const;
316322

317323
TString GetPath(const TRequest& request) const {
318324
TString path = request.Parameters["path"];
@@ -354,3 +360,6 @@ struct TYdbLocation {
354360
return GRpcClientLow.GetRef();
355361
}
356362
};
363+
364+
TString GetAuthHeaderValue(const TString& tokenName);
365+
void TryGetLocationFromConfig(TYdbLocation& location, const YAML::Node& config);

ydb/mvp/core/core_ydb_impl.h

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,22 @@
77
#include <ydb/library/actors/core/event_local.h>
88
#include <ydb/library/actors/core/actor_bootstrapped.h>
99
#include <library/cpp/string_utils/base64/base64.h>
10+
#include <ydb/public/sdk/cpp/client/draft/ydb_replication.h>
1011
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
1112
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
1213
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
1314
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
15+
#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
1416
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
17+
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
1518
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
19+
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
1620
#include <ydb/public/api/protos/ydb_operation.pb.h>
1721
#include <ydb/public/api/protos/ydb_cms.pb.h>
1822
#include <ydb/public/lib/deprecated/client/grpc_client.h>
1923
#include <ydb/library/yql/public/issue/yql_issue_message.h>
2024
#include <ydb/library/yql/public/issue/yql_issue_manager.h>
25+
#include <ydb/public/api/protos/draft/ydb_replication.pb.h>
2126
#include "appdata.h"
2227
#include "merger.h"
2328
#include "core_ydb.h"
@@ -79,6 +84,9 @@ struct THandlerActorYdb {
7984
EvExplainYqlResponse,
8085
EvDescribeTopicResult,
8186
EvDescribeConsumerResult,
87+
EvDescribeReplicationResult,
88+
EvExecuteQueryIterator,
89+
EvExecuteQueryPart,
8290
EvEnd
8391
};
8492

@@ -134,6 +142,14 @@ struct THandlerActorYdb {
134142
{}
135143
};
136144

145+
struct TEvDescribeReplicationResult : NActors::TEventLocal<TEvDescribeReplicationResult, EvDescribeReplicationResult> {
146+
NYdb::NReplication::TDescribeReplicationResult Result;
147+
148+
TEvDescribeReplicationResult(NYdb::NReplication::TDescribeReplicationResult&& result)
149+
: Result(std::move(result)) {
150+
}
151+
};
152+
137153
struct TEvDataStreamsListResponse : NActors::TEventLocal<TEvDataStreamsListResponse, EvDataStreamsListResponse> {
138154
NYdb::NDataStreams::V1::TListStreamsResult Result;
139155

@@ -447,6 +463,24 @@ struct THandlerActorYdb {
447463
struct TEvRetryRequest : NActors::TEventLocal<TEvRetryRequest, EvRetryRequest> {
448464
};
449465

466+
struct TEvExecuteQueryIterator : NActors::TEventLocal<TEvExecuteQueryIterator, EvExecuteQueryIterator> {
467+
NYdb::NQuery::TExecuteQueryIterator Iterator;
468+
469+
TEvExecuteQueryIterator() = delete;
470+
TEvExecuteQueryIterator(NYdb::NQuery::TExecuteQueryIterator&& iterator)
471+
: Iterator(std::move(iterator))
472+
{}
473+
};
474+
475+
struct TEvExecuteQueryPart : NActors::TEventLocal<TEvExecuteQueryPart, EvExecuteQueryPart> {
476+
NYdb::NQuery::TExecuteQueryPart Part;
477+
478+
TEvExecuteQueryPart() = delete;
479+
TEvExecuteQueryPart(NYdb::NQuery::TExecuteQueryPart&& part)
480+
: Part(std::move(part))
481+
{}
482+
};
483+
450484
struct TEvErrorResponse : NActors::TEventLocal<TEvErrorResponse, EvErrorResponse> {
451485
TString Status;
452486
TString Message;
@@ -611,6 +645,48 @@ struct THandlerActorYdb {
611645
}
612646
}
613647

648+
static NJson::TJsonValue TypedValueToJsonValue(const Ydb::TypedValue& typedValue) {
649+
NYdb::TType type(typedValue.type());
650+
NYdb::TValue value(type, typedValue.value());
651+
NYdb::TValueParser parser(value);
652+
return ColumnValueToJsonValue(parser);
653+
}
654+
655+
static void WriteColumns(NJson::TJsonValue& columns,
656+
const NYdb::NTable::TTableDescription& tableDescription,
657+
const std::function<void(NJson::TJsonValue&, NYdb::TType)>& columnTypeFormatter = ColumnTypeToString) {
658+
auto columnsMeta = tableDescription.GetColumns();
659+
auto columnsKeysMeta = tableDescription.GetPrimaryKeyColumns();
660+
auto proto = NYdb::TProtoAccessor::GetProto(tableDescription);
661+
int columnsSize = std::min<int>(proto.columns_size(), columnsMeta.size());
662+
for (int columnNum = 0; columnNum < columnsSize; ++columnNum) {
663+
const NYdb::TColumn& columnMeta = columnsMeta[columnNum];
664+
const Ydb::Table::ColumnMeta& columnProto = proto.columns(columnNum);
665+
NJson::TJsonValue& column = columns.AppendValue(NJson::TJsonValue());
666+
column["name"] = columnMeta.Name;
667+
columnTypeFormatter(column["type"], columnMeta.Type);
668+
auto itKey = Find(columnsKeysMeta, columnMeta.Name);
669+
if (itKey != columnsKeysMeta.end()) {
670+
column["key"] = true;
671+
column["keyOrder"] = itKey - columnsKeysMeta.begin();
672+
}
673+
if (columnProto.not_null()) {
674+
column["not_null"] = true;
675+
}
676+
switch (columnProto.default_value_case()) {
677+
case Ydb::Table::ColumnMeta::DEFAULT_VALUE_NOT_SET:
678+
case Ydb::Table::ColumnMeta::kEmptyDefault:
679+
break;
680+
case Ydb::Table::ColumnMeta::kFromLiteral:
681+
column["defaultValue"] = TypedValueToJsonValue(columnProto.from_literal());
682+
break;
683+
case Ydb::Table::ColumnMeta::kFromSequence:
684+
column["defaultSequence"] = columnProto.from_sequence().name();
685+
break;
686+
}
687+
}
688+
}
689+
614690
static void WriteIndexes(NJson::TJsonValue& indexes,
615691
const TVector<NYdb::NTable::TIndexDescription>& indexesMeta) {
616692
indexes.SetType(NJson::JSON_ARRAY);
@@ -707,6 +783,27 @@ struct THandlerActorYdb {
707783
}
708784
}
709785

786+
static TString BlackBoxTokenFromSessionId(TStringBuf sessionId, TStringBuf userIp = NKikimr::NSecurity::DefaultUserIp()) {
787+
return NKikimr::NSecurity::BlackBoxTokenFromSessionId(sessionId, userIp);
788+
}
789+
790+
static TString GetAuthToken(NHttp::THttpIncomingRequestPtr request) {
791+
NHttp::THeaders headers(request->Headers);
792+
NHttp::TCookies cookies(headers["Cookie"]);
793+
TStringBuf sessionId = cookies["Session_id"];
794+
if (!sessionId.empty()) {
795+
return BlackBoxTokenFromSessionId(sessionId);
796+
}
797+
TStringBuf authorization = headers["Authorization"];
798+
if (!authorization.empty()) {
799+
TStringBuf scheme = authorization.NextTok(' ');
800+
if (scheme == "OAuth" || scheme == "Bearer") {
801+
return TString(authorization);
802+
}
803+
}
804+
return TString();
805+
}
806+
710807
static void CopyHeader(const NHttp::THeaders& request, NHttp::THeadersBuilder& headers, TStringBuf header) {
711808
if (request.Has(header)) {
712809
headers.Set(header, request[header]);
@@ -763,6 +860,65 @@ struct THandlerActorYdb {
763860
return false;
764861
}
765862

863+
static NHttp::THttpOutgoingResponsePtr CreateStatusResponseForQuery(NHttp::THttpIncomingRequestPtr request, const NYdb::TStatus& status, const TJsonSettings& jsonSettings = TJsonSettings()) {
864+
Ydb::Operations::Operation operation;
865+
operation.set_status(static_cast<Ydb::StatusIds_StatusCode>(status.GetStatus()));
866+
IssuesToMessage(status.GetIssues(), operation.mutable_issues());
867+
return CreateStatusResponseForQuery(request, operation, jsonSettings);
868+
}
869+
870+
static NHttp::THttpOutgoingResponsePtr CreateStatusResponseForQuery(NHttp::THttpIncomingRequestPtr request, const Ydb::Operations::Operation& operation, const TJsonSettings& jsonSettings = TJsonSettings()) {
871+
TStringBuf status = "503";
872+
TStringBuf message = "Service Unavailable";
873+
switch ((int)operation.status()) {
874+
case Ydb::StatusIds::SUCCESS:
875+
case Ydb::StatusIds::SCHEME_ERROR:
876+
case Ydb::StatusIds::GENERIC_ERROR:
877+
case Ydb::StatusIds::TIMEOUT:
878+
case (int)NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED:
879+
status = "200";
880+
message = "OK";
881+
break;
882+
case Ydb::StatusIds::UNAUTHORIZED:
883+
case (int)NYdb::EStatus::CLIENT_UNAUTHENTICATED:
884+
status = "401";
885+
message = "Unauthorized";
886+
break;
887+
case Ydb::StatusIds::BAD_REQUEST:
888+
case Ydb::StatusIds::BAD_SESSION:
889+
case Ydb::StatusIds::PRECONDITION_FAILED:
890+
case Ydb::StatusIds::ALREADY_EXISTS:
891+
case Ydb::StatusIds::SESSION_EXPIRED:
892+
case Ydb::StatusIds::UNDETERMINED:
893+
case Ydb::StatusIds::ABORTED:
894+
case Ydb::StatusIds::UNSUPPORTED:
895+
status = "400";
896+
message = "Bad Request";
897+
break;
898+
case Ydb::StatusIds::NOT_FOUND:
899+
status = "404";
900+
message = "Not Found";
901+
break;
902+
case Ydb::StatusIds::OVERLOADED:
903+
status = "429";
904+
message = "Overloaded";
905+
break;
906+
case Ydb::StatusIds::INTERNAL_ERROR:
907+
status = "500";
908+
message = "Internal Server Error";
909+
break;
910+
case Ydb::StatusIds::UNAVAILABLE:
911+
status = "503";
912+
message = "Service Unavailable";
913+
break;
914+
default:
915+
break;
916+
}
917+
TStringStream stream;
918+
TProtoToJson::ProtoToJson(stream, operation, jsonSettings);
919+
return request->CreateResponse(status, message, "application/json", stream.Str());
920+
}
921+
766922
static NHttp::THttpOutgoingResponsePtr CreateStatusResponse(NHttp::THttpIncomingRequestPtr request, const NYdb::TStatus& status, const TJsonSettings& jsonSettings = TJsonSettings()) {
767923
Ydb::Operations::Operation operation;
768924
operation.set_status(static_cast<Ydb::StatusIds_StatusCode>(status.GetStatus()));
@@ -813,6 +969,7 @@ struct THandlerActorYdb {
813969
message = "Service Unavailable";
814970
break;
815971
case Ydb::StatusIds::TIMEOUT:
972+
case (int)NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED:
816973
status = "504";
817974
message = "Gateway Time-out";
818975
break;

0 commit comments

Comments
 (0)