Skip to content

Commit 7277322

Browse files
authored
Limit inflight coordination service session requests (#9168)
1 parent cead838 commit 7277322

File tree

4 files changed

+61
-31
lines changed

4 files changed

+61
-31
lines changed

ydb/core/driver_lib/run/run.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -785,7 +785,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
785785

786786
if (hasKesus) {
787787
server.AddService(new NKesus::TKesusGRpcService(ActorSystem.Get(), Counters,
788-
grpcRequestProxies[0], hasKesus.IsRlAllowed()));
788+
AppData->InFlightLimiterRegistry, grpcRequestProxies[0], hasKesus.IsRlAllowed()));
789789
}
790790

791791
if (hasPQ) {

ydb/core/testlib/test_client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ namespace Tests {
421421
GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxies[0], true));
422422
GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxies[0], true));
423423
GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxies[0]));
424-
GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxies[0], true));
424+
GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxies[0], true));
425425
GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxies[0], true));
426426
GRpcServer->AddService(new NGRpcService::TGRpcDiscoveryService(system, counters, grpcRequestProxies[0], true));
427427
GRpcServer->AddService(new NGRpcService::TGRpcYdbClickhouseInternalService(system, counters, appData.InFlightLimiterRegistry, grpcRequestProxies[0], true));

ydb/services/kesus/grpc_service.cpp

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
namespace NKikimr {
2323
namespace NKesus {
2424

25+
// Note: this is an extremely high default to avoid breaking clients
26+
// TODO: make it configurable
27+
static constexpr i64 DEFAULT_MAX_SESSIONS_INFLIGHT = 100000;
28+
2529
////////////////////////////////////////////////////////////////////////////////
2630

2731
class TGRpcSessionActor
@@ -613,29 +617,42 @@ class TGRpcSessionActor
613617

614618
////////////////////////////////////////////////////////////////////////////////
615619

620+
TKesusGRpcService::TKesusGRpcService(
621+
NActors::TActorSystem* system,
622+
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
623+
TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> limiterRegistry,
624+
const NActors::TActorId& proxyId,
625+
bool rlAllowed)
626+
: TBase(system, counters, proxyId, rlAllowed)
627+
, LimiterRegistry_(limiterRegistry)
628+
{}
629+
616630
void TKesusGRpcService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
617-
auto getCounterBlock = NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
618631
using NGRpcService::TRateLimiterMode;
632+
auto getCounterBlock = NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
633+
auto getLimiter = CreateLimiterCb(LimiterRegistry_);
619634

620635
#ifdef ADD_REQUEST
621636
#error ADD_REQUEST macro is already defined
622637
#endif
623638

624639
#define ADD_REQUEST(NAME, IN, OUT, CB) \
625-
MakeIntrusive<NGRpcService::TGRpcRequest<Ydb::Coordination::IN, Ydb::Coordination::OUT, TKesusGRpcService>>( \
626-
this, \
627-
&Service_, \
628-
CQ_, \
629-
[this](NYdbGrpc::IRequestContextBase* reqCtx) { \
630-
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, reqCtx->GetPeer()); \
631-
ActorSystem_->Send(GRpcRequestProxyId_, \
632-
new NGRpcService::TGrpcRequestOperationCall<Ydb::Coordination::IN, Ydb::Coordination::OUT> \
633-
(reqCtx, &CB, NGRpcService::TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \
634-
}, \
635-
&Ydb::Coordination::V1::CoordinationService::AsyncService::Request ## NAME, \
636-
"Coordination/" #NAME, \
637-
logger, \
638-
getCounterBlock("coordination", #NAME))->Run();
640+
for (auto* cq : CQS) { \
641+
MakeIntrusive<NGRpcService::TGRpcRequest<Ydb::Coordination::IN, Ydb::Coordination::OUT, TKesusGRpcService>>( \
642+
this, \
643+
&Service_, \
644+
cq, \
645+
[this](NYdbGrpc::IRequestContextBase* reqCtx) { \
646+
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, reqCtx->GetPeer()); \
647+
ActorSystem_->Send(GRpcRequestProxyId_, \
648+
new NGRpcService::TGrpcRequestOperationCall<Ydb::Coordination::IN, Ydb::Coordination::OUT> \
649+
(reqCtx, &CB, NGRpcService::TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \
650+
}, \
651+
&Ydb::Coordination::V1::CoordinationService::AsyncService::Request ## NAME, \
652+
"Coordination/" #NAME, \
653+
logger, \
654+
getCounterBlock("coordination", #NAME))->Run(); \
655+
}
639656

640657
ADD_REQUEST(CreateNode, CreateNodeRequest, CreateNodeResponse, NGRpcService::DoCreateCoordinationNode);
641658
ADD_REQUEST(AlterNode, AlterNodeRequest, AlterNodeResponse, NGRpcService::DoAlterCoordinationNode);
@@ -644,19 +661,21 @@ void TKesusGRpcService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
644661

645662
#undef ADD_REQUEST
646663

647-
TGRpcSessionActor::TGRpcRequest::Start(
648-
this,
649-
this->GetService(),
650-
CQ_,
651-
&Ydb::Coordination::V1::CoordinationService::AsyncService::RequestSession,
652-
[this](TIntrusivePtr<TGRpcSessionActor::IContext> context) {
653-
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, context->GetPeerName());
654-
ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCoordinationSessionRequest(context));
655-
},
656-
*ActorSystem_,
657-
"Coordination/Session",
658-
getCounterBlock("coordination", "Session", true),
659-
/* TODO: limiter */ nullptr);
664+
for (auto* cq : CQS) {
665+
TGRpcSessionActor::TGRpcRequest::Start(
666+
this,
667+
this->GetService(),
668+
cq,
669+
&Ydb::Coordination::V1::CoordinationService::AsyncService::RequestSession,
670+
[this](TIntrusivePtr<TGRpcSessionActor::IContext> context) {
671+
NGRpcService::ReportGrpcReqToMon(*ActorSystem_, context->GetPeerName());
672+
ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCoordinationSessionRequest(context));
673+
},
674+
*ActorSystem_,
675+
"Coordination/Session",
676+
getCounterBlock("coordination", "Session", true),
677+
getLimiter("CoordinationService", "Session", DEFAULT_MAX_SESSIONS_INFLIGHT));
678+
}
660679
}
661680

662681
} // namespace NKesus

ydb/services/kesus/grpc_service.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <util/generic/hash_set.h>
1010

1111
#include <ydb/core/grpc_services/base/base_service.h>
12+
#include <ydb/core/grpc_services/grpc_helper.h>
1213

1314

1415
namespace NKikimr {
@@ -17,14 +18,24 @@ namespace NKesus {
1718
class TKesusGRpcService
1819
: public ::NKikimr::NGRpcService::TGrpcServiceBase<Ydb::Coordination::V1::CoordinationService>
1920
{
21+
using TBase = ::NKikimr::NGRpcService::TGrpcServiceBase<Ydb::Coordination::V1::CoordinationService>;
22+
2023
class TContextBase;
2124
class TSessionContext;
2225

2326
public:
24-
using ::NKikimr::NGRpcService::TGrpcServiceBase<Ydb::Coordination::V1::CoordinationService>::TGrpcServiceBase;
27+
TKesusGRpcService(
28+
NActors::TActorSystem* system,
29+
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
30+
TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> limiterRegistry,
31+
const NActors::TActorId& proxyId,
32+
bool rlAllowed);
2533

2634
private:
2735
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
36+
37+
private:
38+
TIntrusivePtr<NGRpcService::TInFlightLimiterRegistry> LimiterRegistry_;
2839
};
2940

3041
}

0 commit comments

Comments
 (0)