Skip to content

Commit 10c0003

Browse files
authored
Add gRPC API for recommender (#8654)
1 parent 6583b1b commit 10c0003

File tree

10 files changed

+209
-8
lines changed

10 files changed

+209
-8
lines changed

ydb/core/base/hive.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ namespace NKikimr {
4949
EvUpdateTabletsObject,
5050
EvUpdateDomain,
5151
EvRequestTabletDistribution,
52+
EvRequestScaleRecommendation,
5253

5354
// replies
5455
EvBootTabletReply = EvBootTablet + 512,
@@ -84,6 +85,7 @@ namespace NKikimr {
8485
EvUpdateTabletsObjectReply,
8586
EvUpdateDomainReply,
8687
EvResponseTabletDistribution,
88+
EvResponseScaleRecommendation,
8789

8890
EvEnd
8991
};
@@ -876,6 +878,12 @@ namespace NKikimr {
876878

877879
struct TEvResponseTabletDistribution : TEventPB<TEvResponseTabletDistribution,
878880
NKikimrHive::TEvResponseTabletDistribution, EvResponseTabletDistribution> {};
881+
882+
struct TEvRequestScaleRecommendation : TEventPB<TEvRequestScaleRecommendation,
883+
NKikimrHive::TEvRequestScaleRecommendation, EvRequestScaleRecommendation> {};
884+
885+
struct TEvResponseScaleRecommendation : TEventPB<TEvResponseScaleRecommendation,
886+
NKikimrHive::TEvResponseScaleRecommendation, EvResponseScaleRecommendation> {};
879887
};
880888

881889
IActor* CreateDefaultHive(const TActorId &tablet, TTabletStorageInfo *info);
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
#include "service_cms.h"
2+
3+
#include <ydb/core/base/path.h>
4+
#include <ydb/core/base/tablet_pipe.h>
5+
#include <ydb/core/grpc_services/base/base.h>
6+
#include <ydb/core/grpc_services/rpc_request_base.h>
7+
#include <ydb/core/mind/hive/hive.h>
8+
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
9+
10+
#include <ydb/public/api/protos/ydb_cms.pb.h>
11+
12+
namespace NKikimr::NGRpcService {
13+
14+
using TEvGetScaleRecommendationRequest = TGrpcRequestNoOperationCall<Ydb::Cms::GetScaleRecommendationRequest, Ydb::Cms::GetScaleRecommendationResponse>;
15+
16+
class TGetScaleRecommendationRPC : public TRpcRequestActor<TGetScaleRecommendationRPC, TEvGetScaleRecommendationRequest, false> {
17+
public:
18+
using TRpcRequestActor::TRpcRequestActor;
19+
20+
void Bootstrap(const TActorContext&) {
21+
ResolveDatabase(GetProtoRequest()->path());
22+
this->Become(&TThis::StateWork);
23+
}
24+
25+
STATEFN(StateWork) {
26+
switch (ev->GetTypeRewrite()) {
27+
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
28+
hFunc(TEvHive::TEvResponseScaleRecommendation, Handle);
29+
30+
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
31+
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
32+
}
33+
}
34+
35+
void ResolveDatabase(const TString& databaseName) {
36+
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
37+
request->DatabaseName = databaseName;
38+
39+
auto& entry = request->ResultSet.emplace_back();
40+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
41+
entry.Path = NKikimr::SplitPath(databaseName);
42+
43+
this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
44+
}
45+
46+
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
47+
const auto& request = ev->Get()->Request;
48+
49+
if (request->ResultSet.empty()) {
50+
return this->Reply(Ydb::StatusIds::SCHEME_ERROR, NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR);
51+
}
52+
53+
const auto& entry = request->ResultSet.front();
54+
55+
if (request->ErrorCount > 0) {
56+
switch (entry.Status) {
57+
case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
58+
break;
59+
case NSchemeCache::TSchemeCacheNavigate::EStatus::AccessDenied:
60+
return this->Reply(Ydb::StatusIds::UNAUTHORIZED, NKikimrIssues::TIssuesIds::ACCESS_DENIED);
61+
case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
62+
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
63+
return this->Reply(Ydb::StatusIds::SCHEME_ERROR, NKikimrIssues::TIssuesIds::PATH_NOT_EXIST);
64+
case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError:
65+
case NSchemeCache::TSchemeCacheNavigate::EStatus::RedirectLookupError:
66+
return this->Reply(Ydb::StatusIds::UNAVAILABLE, NKikimrIssues::TIssuesIds::RESOLVE_LOOKUP_ERROR);
67+
default:
68+
return this->Reply(Ydb::StatusIds::SCHEME_ERROR, NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR);
69+
}
70+
}
71+
72+
if (!this->CheckAccess(CanonizePath(entry.Path), entry.SecurityObject, NACLib::GenericList)) {
73+
return;
74+
}
75+
76+
auto domainInfo = entry.DomainInfo;
77+
if (!domainInfo || !domainInfo->Params.HasHive()) {
78+
return this->Reply(Ydb::StatusIds::INTERNAL_ERROR, NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR);
79+
}
80+
81+
SendRequest(domainInfo->DomainKey, domainInfo->Params.GetHive());
82+
}
83+
84+
void SendRequest(TPathId domainKey, ui64 hiveId) {
85+
if (!PipeClient) {
86+
NTabletPipe::TClientConfig config;
87+
config.RetryPolicy = {.RetryLimitCount = 3};
88+
PipeClient = this->RegisterWithSameMailbox(NTabletPipe::CreateClient(this->SelfId(), hiveId, config));
89+
}
90+
91+
auto ev = std::make_unique<TEvHive::TEvRequestScaleRecommendation>();
92+
ev->Record.MutableDomainKey()->SetSchemeShard(domainKey.OwnerId);
93+
ev->Record.MutableDomainKey()->SetPathId(domainKey.LocalPathId);
94+
NTabletPipe::SendData(this->SelfId(), PipeClient, ev.release());
95+
}
96+
97+
void Handle(TEvHive::TEvResponseScaleRecommendation::TPtr& ev) {
98+
TResponse response;
99+
ui32 recommendedNodes = ev->Get()->Record.GetRecommendedNodes();
100+
response.mutable_recommended_resources()->add_computational_units()->set_count(recommendedNodes);
101+
return this->Reply(response);
102+
}
103+
104+
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
105+
if (ev->Get()->Status != NKikimrProto::OK) {
106+
DeliveryProblem();
107+
}
108+
}
109+
110+
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
111+
DeliveryProblem();
112+
}
113+
114+
void DeliveryProblem() {
115+
this->Reply(Ydb::StatusIds::UNAVAILABLE);
116+
}
117+
118+
void PassAway() override {
119+
NTabletPipe::CloseAndForgetClient(this->SelfId(), PipeClient);
120+
IActor::PassAway();
121+
}
122+
123+
private:
124+
TActorId PipeClient;
125+
126+
};
127+
128+
void DoGetScaleRecommendationRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
129+
f.RegisterActor(new TGetScaleRecommendationRPC(p.release()));
130+
}
131+
132+
} // namespace NKikimr::NGRpcService

ydb/core/grpc_services/service_cms.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace NKikimr {
66
namespace NGRpcService {
77

88
class IRequestOpCtx;
9+
class IRequestNoOpCtx;
910
class IFacilityProvider;
1011

1112
void DoCreateTenantRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
@@ -14,6 +15,7 @@ void DoGetTenantStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityP
1415
void DoListTenantsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1516
void DoRemoveTenantRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
1617
void DoDescribeTenantOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
18+
void DoGetScaleRecommendationRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
1719

1820
}
1921
}

ydb/core/grpc_services/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ SRCS(
4343
rpc_fq_internal.cpp
4444
rpc_fq.cpp
4545
rpc_get_operation.cpp
46+
rpc_get_scale_recommendation.cpp
4647
rpc_get_shard_locations.cpp
4748
rpc_import.cpp
4849
rpc_import_data.cpp

ydb/core/mind/hive/hive_impl.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3054,6 +3054,7 @@ void THive::ProcessEvent(std::unique_ptr<IEventHandle> event) {
30543054
hFunc(TEvPrivate::TEvDeleteNode, Handle);
30553055
hFunc(TEvHive::TEvRequestTabletDistribution, Handle);
30563056
hFunc(TEvPrivate::TEvUpdateDataCenterFollowers, Handle);
3057+
hFunc(TEvHive::TEvRequestScaleRecommendation, Handle);
30573058
}
30583059
}
30593060

@@ -3157,6 +3158,7 @@ STFUNC(THive::StateWork) {
31573158
fFunc(TEvPrivate::TEvDeleteNode::EventType, EnqueueIncomingEvent);
31583159
fFunc(TEvHive::TEvRequestTabletDistribution::EventType, EnqueueIncomingEvent);
31593160
fFunc(TEvPrivate::TEvUpdateDataCenterFollowers::EventType, EnqueueIncomingEvent);
3161+
fFunc(TEvHive::TEvRequestScaleRecommendation::EventType, EnqueueIncomingEvent);
31603162
hFunc(TEvPrivate::TEvProcessIncomingEvent, Handle);
31613163
default:
31623164
if (!HandleDefaultEvents(ev, SelfId())) {
@@ -3458,6 +3460,11 @@ void THive::Handle(TEvPrivate::TEvUpdateDataCenterFollowers::TPtr& ev) {
34583460
Execute(CreateUpdateDcFollowers(ev->Get()->DataCenter));
34593461
}
34603462

3463+
void THive::Handle(TEvHive::TEvRequestScaleRecommendation::TPtr& ev) {
3464+
auto response = std::make_unique<TEvHive::TEvResponseScaleRecommendation>();
3465+
Send(ev->Sender, response.release());
3466+
}
3467+
34613468
TVector<TNodeId> THive::GetNodesForWhiteboardBroadcast(size_t maxNodesToReturn) {
34623469
TVector<TNodeId> nodes;
34633470
TNodeId selfNodeId = SelfId().NodeId();

ydb/core/mind/hive/hive_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
578578
void Handle(TEvPrivate::TEvDeleteNode::TPtr& ev);
579579
void Handle(TEvHive::TEvRequestTabletDistribution::TPtr& ev);
580580
void Handle(TEvPrivate::TEvUpdateDataCenterFollowers::TPtr& ev);
581+
void Handle(TEvHive::TEvRequestScaleRecommendation::TPtr& ev);
581582

582583
protected:
583584
void RestartPipeTx(ui64 tabletId);

ydb/core/protos/hive.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,3 +589,11 @@ message TEvResponseTabletDistribution {
589589
}
590590
repeated TNode Nodes = 1;
591591
}
592+
593+
message TEvRequestScaleRecommendation {
594+
optional NKikimrSubDomains.TDomainKey DomainKey = 1;
595+
}
596+
597+
message TEvResponseScaleRecommendation {
598+
optional uint32 RecommendedNodes = 1;
599+
}

ydb/public/api/grpc/ydb_cms_v1.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,7 @@ service CmsService {
2727

2828
// Describe supported database options.
2929
rpc DescribeDatabaseOptions(Cms.DescribeDatabaseOptionsRequest) returns (Cms.DescribeDatabaseOptionsResponse);
30+
31+
// Get resources scale recommendation for database.
32+
rpc GetScaleRecommendation(Cms.GetScaleRecommendationRequest) returns (Cms.GetScaleRecommendationResponse);
3033
}

ydb/public/api/protos/ydb_cms.proto

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ option cc_enable_arenas = true;
44
package Ydb.Cms;
55
option java_package = "com.yandex.ydb.cms";
66

7+
import "ydb/public/api/protos/ydb_issue_message.proto";
78
import "ydb/public/api/protos/ydb_operation.proto";
9+
import "ydb/public/api/protos/ydb_status_codes.proto";
810

911
// A set of uniform storage units.
1012
// Single storage unit can be thought of as a reserved part of a RAID.
@@ -104,6 +106,23 @@ message DatabaseQuotas {
104106
repeated StorageQuotas storage_quotas = 6;
105107
}
106108

109+
// A policy that is used for resource scale recommendation. If multiple are used,
110+
// recommender combines them to recommend the largest scale.
111+
message ScaleRecommenderPolicy {
112+
// Policy that tracks metric and reactively recommend to adjust resources scale
113+
// to keep metric close to the specified target value.
114+
message TargetTrackingPolicy {
115+
oneof target {
116+
// A percentage of compute resources' average CPU utilization.
117+
uint32 average_cpu_utilization_percent = 1;
118+
}
119+
}
120+
121+
oneof policy {
122+
TargetTrackingPolicy target_tracking_policy = 1;
123+
}
124+
}
125+
107126
// Request to create a new database. For successfull creation
108127
// specified database shouldn't exist. At least one storage
109128
// unit should be requested for the database.
@@ -129,6 +148,8 @@ message CreateDatabaseRequest {
129148
string idempotency_key = 9;
130149
// Optional quotas for the database
131150
DatabaseQuotas database_quotas = 10;
151+
// Optional scale recommender policies
152+
repeated ScaleRecommenderPolicy scale_recommender_policies = 11;
132153
}
133154

134155
message CreateDatabaseResponse {
@@ -179,6 +200,8 @@ message GetDatabaseStatusResult {
179200
SchemaOperationQuotas schema_operation_quotas = 9;
180201
// Current quotas for the database
181202
DatabaseQuotas database_quotas = 10;
203+
// Current scale recommender policies
204+
repeated ScaleRecommenderPolicy scale_recommender_policies = 11;
182205
}
183206

184207
// Change resources allocated for database.
@@ -207,6 +230,8 @@ message AlterDatabaseRequest {
207230
DatabaseQuotas database_quotas = 11;
208231
// Alter attributes. Leave the value blank to drop an attribute.
209232
map<string, string> alter_attributes = 12;
233+
// Alter scale recommender policies.
234+
repeated ScaleRecommenderPolicy scale_recommender_policies = 13;
210235
}
211236

212237
message AlterDatabaseResponse {
@@ -271,3 +296,16 @@ message DescribeDatabaseOptionsResult {
271296
repeated AvailabilityZoneDescription availability_zones = 2;
272297
repeated ComputationalUnitDescription computational_units = 3;
273298
}
299+
300+
// Get resources scale recommendation for database.
301+
message GetScaleRecommendationRequest {
302+
// Required. Full path to database's home dir.
303+
string path = 1;
304+
}
305+
306+
message GetScaleRecommendationResponse {
307+
StatusIds.StatusCode status = 1;
308+
repeated Ydb.Issue.IssueMessage issues = 2;
309+
// Recommended resources scale to be allocated for database.
310+
Resources recommended_resources = 3;
311+
}

ydb/services/cms/grpc_service.cpp

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,24 @@ void TGRpcCmsService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
1414
#ifdef ADD_REQUEST
1515
#error ADD_REQUEST macro already defined
1616
#endif
17-
#define ADD_REQUEST(NAME, CB) \
17+
#define ADD_REQUEST(NAME, CB, TCALL) \
1818
MakeIntrusive<TGRpcRequest<Cms::NAME##Request, Cms::NAME##Response, TGRpcCmsService>> \
1919
(this, &Service_, CQ_, \
2020
[this](NYdbGrpc::IRequestContextBase *ctx) { \
2121
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
2222
ActorSystem_->Send(GRpcRequestProxyId_, \
23-
new TGrpcRequestOperationCall<Cms::NAME##Request, Cms::NAME##Response> \
23+
new TCALL<Cms::NAME##Request, Cms::NAME##Response> \
2424
(ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \
2525
}, &Cms::V1::CmsService::AsyncService::Request ## NAME, \
2626
#NAME, logger, getCounterBlock("cms", #NAME))->Run();
2727

28-
ADD_REQUEST(CreateDatabase, DoCreateTenantRequest)
29-
ADD_REQUEST(AlterDatabase, DoAlterTenantRequest)
30-
ADD_REQUEST(GetDatabaseStatus, DoGetTenantStatusRequest)
31-
ADD_REQUEST(ListDatabases, DoListTenantsRequest)
32-
ADD_REQUEST(RemoveDatabase, DoRemoveTenantRequest)
33-
ADD_REQUEST(DescribeDatabaseOptions, DoDescribeTenantOptionsRequest)
28+
ADD_REQUEST(CreateDatabase, DoCreateTenantRequest, TGrpcRequestOperationCall)
29+
ADD_REQUEST(AlterDatabase, DoAlterTenantRequest, TGrpcRequestOperationCall)
30+
ADD_REQUEST(GetDatabaseStatus, DoGetTenantStatusRequest, TGrpcRequestOperationCall)
31+
ADD_REQUEST(ListDatabases, DoListTenantsRequest, TGrpcRequestOperationCall)
32+
ADD_REQUEST(RemoveDatabase, DoRemoveTenantRequest, TGrpcRequestOperationCall)
33+
ADD_REQUEST(DescribeDatabaseOptions, DoDescribeTenantOptionsRequest, TGrpcRequestOperationCall)
34+
ADD_REQUEST(GetScaleRecommendation, DoGetScaleRecommendationRequest, TGrpcRequestNoOperationCall)
3435

3536
#undef ADD_REQUEST
3637
}

0 commit comments

Comments
 (0)