Skip to content

Commit c3cadd1

Browse files
authored
Grpc monitoring merge (#11224)
1 parent 2073f27 commit c3cadd1

File tree

5 files changed

+54
-28
lines changed

5 files changed

+54
-28
lines changed

ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <ydb/library/security/ydb_credentials_provider_factory.h>
1212

1313
#include <ydb/public/lib/fq/scope.h>
14+
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
1415
#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
1516
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
1617

@@ -273,6 +274,18 @@ class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDa
273274
};
274275
};
275276

277+
bool IsValidLoadControlConfig(const NConfig::TLoadControlConfig& config) {
278+
if (!config.GetEnable()) {
279+
return true;
280+
}
281+
282+
const auto& databaseConnection = config.GetDatabaseConnection();
283+
if (!databaseConnection.GetDatabase()) {
284+
return false;
285+
}
286+
return databaseConnection.GetEndpoint() || config.GetMonitoringEndpoint();
287+
}
288+
276289
}
277290

278291
class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrapped<TComputeDatabaseControlPlaneServiceActor> {
@@ -307,7 +320,7 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
307320
switch (controlPlane.type_case()) {
308321
case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
309322
case NConfig::TYdbComputeControlPlane::kSingle:
310-
CreateSingleClientActors(controlPlane.GetSingle());
323+
CreateSingleClientActors();
311324
break;
312325
case NConfig::TYdbComputeControlPlane::kCms:
313326
CreateCmsClientActors(controlPlane.GetCms(), controlPlane.GetDatabasesCacheReloadPeriod());
@@ -326,11 +339,13 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
326339
if (connection.GetCertificateFile()) {
327340
settings.CertificateRootCA = StripString(TFileInput(connection.GetCertificateFile()).ReadAll());
328341
}
342+
settings.Headers[NYdb::YDB_DATABASE_HEADER] = connection.GetDatabase();
329343
return settings;
330344
}
331345

332-
static NGrpcActorClient::TGrpcClientSettings CreateGrpcClientSettings(const auto& connection) {
346+
static NGrpcActorClient::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) {
333347
NGrpcActorClient::TGrpcClientSettings settings;
348+
const auto& connection = config.GetExecutionConnection();
334349
settings.Endpoint = connection.GetEndpoint();
335350
settings.EnableSsl = connection.GetUseSsl();
336351
if (connection.GetCertificateFile()) {
@@ -340,23 +355,21 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
340355
return settings;
341356
}
342357

343-
static NGrpcActorClient::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) {
344-
return CreateGrpcClientSettings(config.GetControlPlaneConnection());
345-
}
346-
347-
void CreateSingleClientActors(const NConfig::TYdbComputeControlPlane::TSingle& singleConfig) {
358+
void CreateSingleClientActors() {
348359
auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig();
349-
if (globalLoadConfig.GetEnable()) {
350-
TActorId clientActor;
351-
auto monitoringEndpoint = globalLoadConfig.GetMonitoringEndpoint();
352-
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider();
353-
if (monitoringEndpoint) {
354-
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, singleConfig.GetConnection().GetDatabase(), credentialsProvider).release());
355-
} else {
356-
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), credentialsProvider).release());
357-
}
358-
MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release());
360+
if (!globalLoadConfig.GetEnable() || !IsValidLoadControlConfig(globalLoadConfig)) {
361+
return;
362+
}
363+
TActorId clientActor;
364+
auto monitoringEndpoint = globalLoadConfig.GetMonitoringEndpoint();
365+
const auto& databaseConnection = globalLoadConfig.GetDatabaseConnection();
366+
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(databaseConnection))->CreateProvider();
367+
if (monitoringEndpoint) {
368+
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, databaseConnection.GetDatabase(), credentialsProvider).release());
369+
} else {
370+
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(databaseConnection), credentialsProvider).release());
359371
}
372+
MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release());
360373
}
361374

362375
void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig, const TString& databasesCacheReloadPeriod) {
@@ -370,14 +383,15 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
370383
const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
371384
? config.GetLoadControlConfig()
372385
: globalLoadConfig;
373-
if (loadConfig.GetEnable()) {
386+
if (loadConfig.GetEnable() && IsValidLoadControlConfig(loadConfig)) {
374387
TActorId clientActor;
375388
auto monitoringEndpoint = loadConfig.GetMonitoringEndpoint();
376-
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider();
389+
const auto& databaseConnection = loadConfig.GetDatabaseConnection();
390+
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(databaseConnection))->CreateProvider();
377391
if (monitoringEndpoint) {
378-
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, config.GetControlPlaneConnection().GetDatabase(), credentialsProvider).release());
392+
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, databaseConnection.GetDatabase(), credentialsProvider).release());
379393
} else {
380-
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), credentialsProvider).release());
394+
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(databaseConnection), credentialsProvider).release());
381395
}
382396
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, databaseCounters).release());
383397
}
@@ -394,14 +408,15 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
394408
const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
395409
? config.GetLoadControlConfig()
396410
: globalLoadConfig;
397-
if (loadConfig.GetEnable()) {
411+
if (loadConfig.GetEnable() && IsValidLoadControlConfig(loadConfig)) {
398412
TActorId clientActor;
399413
auto monitoringEndpoint = loadConfig.GetMonitoringEndpoint();
400-
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider();
414+
const auto& databaseConnection = loadConfig.GetDatabaseConnection();
415+
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(databaseConnection))->CreateProvider();
401416
if (monitoringEndpoint) {
402-
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, config.GetControlPlaneConnection().GetDatabase(), credentialsProvider).release());
417+
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, databaseConnection.GetDatabase(), credentialsProvider).release());
403418
} else {
404-
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), credentialsProvider).release());
419+
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(databaseConnection), credentialsProvider).release());
405420
}
406421
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, databaseCounters).release());
407422
}

ydb/core/fq/libs/compute/ydb/control_plane/ya.make

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,23 @@ SRCS(
1212

1313
PEERDIR(
1414
library/cpp/json
15-
ydb/library/actors/core
16-
ydb/library/actors/protos
17-
ydb/library/grpc/actor_client
1815
ydb/core/fq/libs/compute/ydb/synchronization_service
1916
ydb/core/fq/libs/control_plane_storage/proto
2017
ydb/core/fq/libs/quota_manager/proto
2118
ydb/core/protos
19+
ydb/library/actors/core
20+
ydb/library/actors/protos
2221
ydb/library/db_pool/protos
22+
ydb/library/grpc/actor_client
2323
ydb/library/yql/public/issue
2424
ydb/library/yql/utils
2525
ydb/library/yql/utils/actors
2626
ydb/public/api/grpc
2727
ydb/public/api/grpc/draft
2828
ydb/public/lib/operation_id/protos
29+
ydb/public/sdk/cpp/client/resources
30+
ydb/public/sdk/cpp/client/ydb_operation
31+
ydb/public/sdk/cpp/client/ydb_query
2932
)
3033

3134
YQL_LAST_ABI_VERSION()

ydb/core/fq/libs/config/protos/compute.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ message TLoadControlConfig {
2727
bool Strict = 7; // default false, whether to deny execution in load level unavailable
2828
uint32 CpuNumber = 8;
2929
string MonitoringEndpoint = 9; // if defined, will be used as REST API instead of default GRPC
30+
TYdbStorageConfig DatabaseConnection = 10;
3031
}
3132

3233
message TComputeDatabaseConfig {

ydb/library/grpc/actor_client/grpc_service_client.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class TGrpcServiceClient {
2222
using TServiceConnection = NYdbGrpc::TServiceConnection<TGrpcService>;
2323

2424
NYdbGrpc::TGRpcClientConfig Config;
25+
std::unordered_map<TString, TString> Headers;
2526
NYdbGrpc::TGRpcClientLow Client;
2627
std::unique_ptr<TServiceConnection> Connection;
2728

@@ -93,6 +94,9 @@ class TGrpcServiceClient {
9394
if (requestId) {
9495
meta.Aux.push_back({"x-request-id", requestId});
9596
}
97+
for (auto [k ,v]: Headers) {
98+
meta.Aux.push_back({k, v});
99+
}
96100

97101
NYdbGrpc::TResponseCallback<TResponseType> callback =
98102
[actorSystem = NActors::TActivationContext::ActorSystem(), prefix = Prefix(requestId), request = ev](NYdbGrpc::TGrpcStatus&& status, TResponseType&& response) -> void {
@@ -129,6 +133,7 @@ class TGrpcServiceClient {
129133

130134
TGrpcServiceClient(const NGrpcActorClient::TGrpcClientSettings& settings)
131135
: Config(InitGrpcConfig(settings))
136+
, Headers(settings.Headers)
132137
{}
133138
};
134139

ydb/library/grpc/actor_client/grpc_service_settings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22
#include <util/generic/string.h>
33
#include <util/system/types.h>
4+
#include <unordered_map>
45

56
namespace NGrpcActorClient {
67

@@ -12,6 +13,7 @@ struct TGrpcClientSettings {
1213
ui32 GrpcKeepAlivePingInterval = 5000;
1314
bool EnableSsl = false;
1415
ui64 RequestTimeoutMs = 10000; // 10 seconds
16+
std::unordered_map<TString, TString> Headers;
1517
};
1618

1719
} // namespace NGrpcActorClient

0 commit comments

Comments
 (0)