Skip to content

Commit 27dae47

Browse files
authored
Optimize grpc for pqv0 (#8199) (#8239)
1 parent 99376dd commit 27dae47

File tree

10 files changed

+51
-32
lines changed

10 files changed

+51
-32
lines changed

ydb/core/persqueue/partition_read.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -796,12 +796,11 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
796796
return;
797797
}
798798

799-
if (offset > EndOffset) {
799+
if (offset >= EndOffset) {
800800
ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST,
801801
TStringBuilder() << "Offset more than EndOffset. Offset=" << offset << ", EndOffset=" << EndOffset);
802802
return;
803803
}
804-
Y_ABORT_UNLESS(offset < EndOffset);
805804

806805
ProcessRead(ctx, std::move(info), cookie, false);
807806
}

ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,13 @@ ui64 TPQReadService::TSession::GetCookie() const {
169169
///////////////////////////////////////////////////////////////////////////////
170170

171171

172-
TPQReadService::TPQReadService(NKikimr::NGRpcService::TGRpcPersQueueService* service, grpc::ServerCompletionQueue* cq,
172+
TPQReadService::TPQReadService(NKikimr::NGRpcService::TGRpcPersQueueService* service,
173+
const std::vector<grpc::ServerCompletionQueue*>& cqs,
173174
NActors::TActorSystem* as, const TActorId& schemeCache,
174175
TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
175176
const ui32 maxSessions)
176177
: Service(service)
177-
, CQ(cq)
178+
, CQS(cqs)
178179
, ActorSystem(as)
179180
, SchemeCache(schemeCache)
180181
, Counters(counters)
@@ -244,7 +245,7 @@ void TPQReadService::WaitReadSession() {
244245

245246
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0,0,1,0));
246247

247-
TSessionRef session(new TSession(shared_from_this(), CQ, cookie, SchemeCache, NewSchemeCache, Counters,
248+
TSessionRef session(new TSession(shared_from_this(), CQS[cookie % CQS.size()], cookie, SchemeCache, NewSchemeCache, Counters,
248249
NeedDiscoverClusters, TopicConverterFactory));
249250

250251
{

ydb/services/deprecated/persqueue_v0/grpc_pq_read.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ class TPQReadService : public IPQClustersUpdaterCallback, public std::enable_sha
2828
void OnWriteDone(ui64 size) override;
2929
void DestroyStream(const TString& reason, const NPersQueue::NErrorCode::EErrorCode errorCode) override;
3030
bool IsShuttingDown() const override;
31+
3132
TSession(std::shared_ptr<TPQReadService> proxy,
3233
grpc::ServerCompletionQueue* cq, ui64 cookie, const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
3334
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, bool needDiscoverClusters,
3435
const NPersQueue::TConverterFactoryPtr& converterFactory);
36+
3537
void Start() override;
3638
void SendEvent(NActors::IEventBase* ev);
3739

@@ -60,7 +62,7 @@ class TPQReadService : public IPQClustersUpdaterCallback, public std::enable_sha
6062
public:
6163

6264
TPQReadService(NGRpcService::TGRpcPersQueueService* service,
63-
grpc::ServerCompletionQueue* cq,
65+
const std::vector<grpc::ServerCompletionQueue*>& cqs,
6466
NActors::TActorSystem* as, const NActors::TActorId& schemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
6567
const ui32 maxSessions);
6668

@@ -116,7 +118,7 @@ class TPQReadService : public IPQClustersUpdaterCallback, public std::enable_sha
116118
NKikimr::NGRpcService::TGRpcPersQueueService* Service;
117119

118120
grpc::ServerContext Context;
119-
grpc::ServerCompletionQueue* CQ;
121+
std::vector<grpc::ServerCompletionQueue*> CQS;
120122
NActors::TActorSystem* ActorSystem;
121123
NActors::TActorId SchemeCache;
122124
NActors::TActorId NewSchemeCache;

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ void TReadSessionActor::AnswerForCommitsIfCan(const TActorContext& ctx) {
570570
ui64 diff = result.ByteSize();
571571
BytesInflight_ += diff;
572572
if (BytesInflight) (*BytesInflight) += diff;
573-
Handler->Reply(result);
573+
Handler->Reply(std::move(result));
574574

575575
ui32 commitDurationMs = (ctx.Now() - it->second.StartTime).MilliSeconds();
576576
CommitLatency.IncFor(commitDurationMs, 1);
@@ -941,7 +941,7 @@ void TReadSessionActor::Handle(V1::TEvPQProxy::TEvAuthResultOk::TPtr& ev, const
941941
BytesInflight_ += diff;
942942
if (BytesInflight) (*BytesInflight) += diff;
943943

944-
Handler->Reply(result);
944+
Handler->Reply(std::move(result));
945945

946946
Handler->ReadyForNextRead();
947947

@@ -1101,7 +1101,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const T
11011101
ui64 diff = result.ByteSize();
11021102
BytesInflight_ += diff;
11031103
if (BytesInflight) (*BytesInflight) += diff;
1104-
Handler->Reply(result);
1104+
Handler->Reply(std::move(result));
11051105
} else {
11061106
jt->second->ControlMessages.push_back(result);
11071107
}
@@ -1120,7 +1120,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const T
11201120
ui64 diff = result.ByteSize();
11211121
BytesInflight_ += diff;
11221122
if (BytesInflight) (*BytesInflight) += diff;
1123-
Handler->Reply(result);
1123+
Handler->Reply(std::move(result));
11241124
} else {
11251125
jt->second->ControlMessages.push_back(result);
11261126
}
@@ -1273,7 +1273,7 @@ void TReadSessionActor::CloseSession(const TString& errorReason, const NPersQueu
12731273
ui64 diff = result.ByteSize();
12741274
BytesInflight_ += diff;
12751275
if (BytesInflight) (*BytesInflight) += diff;
1276-
Handler->Reply(result);
1276+
Handler->Reply(std::move(result));
12771277
} else {
12781278
LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " GRps is shutting dows, skip reply");
12791279
}
@@ -1322,7 +1322,7 @@ bool TReadSessionActor::ProcessReleasePartition(const THashMap<std::pair<TString
13221322
ui64 diff = result.ByteSize();
13231323
BytesInflight_ += diff;
13241324
if (BytesInflight) (*BytesInflight) += diff;
1325-
Handler->Reply(result);
1325+
Handler->Reply(std::move(result));
13261326
} else {
13271327
jt->second->ControlMessages.push_back(result);
13281328
}
@@ -1534,7 +1534,7 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo
15341534
ConvertToOldBatch(formedResponse->Response);
15351535
}
15361536
diff -= formedResponse->Response.ByteSize(); // Bytes will be tracked inside handler
1537-
Handler->Reply(formedResponse->Response);
1537+
Handler->Reply(std::move(formedResponse->Response));
15381538
} else {
15391539
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " empty read result " << formedResponse->Guid << ", start new reading");
15401540
}
@@ -1546,7 +1546,7 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo
15461546
ui64 diff = r.ByteSize();
15471547
BytesInflight_ += diff;
15481548
if (BytesInflight) (*BytesInflight) += diff;
1549-
Handler->Reply(r);
1549+
Handler->Reply(std::move(r));
15501550
}
15511551

15521552
for (const TActorId& p : formedResponse->PartitionsTookPartInRead) {

ydb/services/deprecated/persqueue_v0/grpc_pq_session.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class ISessionHandler : public TAtomicRefCount<ISessionHandler<TResponse>> {
2525
virtual void Finish() = 0;
2626

2727
/// Send reply to client.
28-
virtual void Reply(const TResponse& resp) = 0;
28+
virtual void Reply(TResponse&& resp) = 0;
2929

3030
virtual void ReadyForNextRead() = 0;
3131

@@ -120,7 +120,7 @@ class ISession : public ISessionHandler<TResponse>
120120
Session->Stream.Finish(Status::OK, new TFinishDone(Session));
121121
}
122122
} else {
123-
auto resp = Session->Responses.front();
123+
auto resp = std::move(Session->Responses.front());
124124
Session->Responses.pop();
125125
lock.Release();
126126
ui64 sz = resp.ByteSize();
@@ -253,7 +253,7 @@ class ISession : public ISessionHandler<TResponse>
253253
TResponse response;
254254
response.MutableError()->SetDescription(description);
255255
response.MutableError()->SetCode(code);
256-
Reply(response);
256+
Reply(std::move(response));
257257
Finish();
258258
}
259259

@@ -274,13 +274,13 @@ class ISession : public ISessionHandler<TResponse>
274274
}
275275

276276
/// Send reply to client.
277-
void Reply(const TResponse& resp) override {
277+
void Reply(TResponse&& resp) override {
278278
{
279279
TGuard<TSpinLock> lock(Lock);
280280
if (NeedFinish) //ignore responses after finish
281281
return;
282282
if (HaveWriteInflight || !Responses.empty()) {
283-
Responses.push(resp);
283+
Responses.push(std::move(resp));
284284
return;
285285
} else {
286286
HaveWriteInflight = true;

ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,10 @@ void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) {
134134
///////////////////////////////////////////////////////////////////////////////
135135

136136

137-
TPQWriteServiceImpl::TPQWriteServiceImpl(grpc::ServerCompletionQueue* cq,
137+
TPQWriteServiceImpl::TPQWriteServiceImpl(const std::vector<grpc::ServerCompletionQueue*>& cqs,
138138
NActors::TActorSystem* as, const TActorId& schemeCache,
139139
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions)
140-
: CQ(cq)
140+
: CQS(cqs)
141141
, ActorSystem(as)
142142
, SchemeCache(schemeCache)
143143
, Counters(counters)
@@ -182,7 +182,7 @@ void TPQWriteServiceImpl::WaitWriteSession() {
182182

183183
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0,0,1,0));
184184

185-
TSessionRef session(new TSession(shared_from_this(), CQ, cookie, SchemeCache, Counters, NeedDiscoverClusters));
185+
TSessionRef session(new TSession(shared_from_this(), CQS[cookie % CQS.size()], cookie, SchemeCache, Counters, NeedDiscoverClusters));
186186
{
187187
with_lock (Lock) {
188188
Sessions[cookie] = session;

ydb/services/deprecated/persqueue_v0/grpc_pq_write.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
5757
using TSessionRef = TIntrusivePtr<TSession>;
5858

5959
public:
60-
TPQWriteServiceImpl(grpc::ServerCompletionQueue* cq,
60+
TPQWriteServiceImpl(const std::vector<grpc::ServerCompletionQueue*>& cqs,
6161
NActors::TActorSystem* as, const NActors::TActorId& schemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
6262
const ui32 maxSessions);
6363
virtual ~TPQWriteServiceImpl() = default;
@@ -95,7 +95,7 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
9595

9696
private:
9797
grpc::ServerContext Context;
98-
grpc::ServerCompletionQueue* CQ;
98+
const std::vector<grpc::ServerCompletionQueue*>& CQS;
9999

100100
NActors::TActorSystem* ActorSystem;
101101
NActors::TActorId SchemeCache;
@@ -124,10 +124,10 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
124124
class TPQWriteService : public TPQWriteServiceImpl {
125125
public:
126126
TPQWriteService(NPersQueue::PersQueueService::AsyncService* service,
127-
grpc::ServerCompletionQueue* cq,
127+
const std::vector<grpc::ServerCompletionQueue*>& cqs,
128128
NActors::TActorSystem* as, const NActors::TActorId& schemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
129129
const ui32 maxSessions)
130-
: TPQWriteServiceImpl(cq, as, schemeCache, counters, maxSessions)
130+
: TPQWriteServiceImpl(cqs, as, schemeCache, counters, maxSessions)
131131
, Service(service)
132132
{}
133133

ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ void TWriteSessionActor::CloseSession(const TString& errorReason, const NPersQue
537537
"session error cookie: " << Cookie << " reason: \"" << errorReason << "\" code: "
538538
<< EErrorCode_Name(errorCode) << " sessionId: " << OwnerCookie);
539539

540-
Handler->Reply(result);
540+
Handler->Reply(std::move(result));
541541
} else {
542542
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session closed cookie: " << Cookie << " sessionId: " << OwnerCookie);
543543
}
@@ -574,7 +574,7 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev
574574
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session inited cookie: " << Cookie << " partition: " << Partition
575575
<< " MaxSeqNo: " << maxSeqNo << " sessionId: " << OwnerCookie);
576576

577-
Handler->Reply(response);
577+
Handler->Reply(std::move(response));
578578

579579
State = ES_INITED;
580580

@@ -683,7 +683,7 @@ void TWriteSessionActor::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr&
683683
addAck(resp.GetCmdWriteResult(cmdWriteResultIndex), ack, ack->MutableStat());
684684
++cmdWriteResultIndex;
685685
}
686-
Handler->Reply(result);
686+
Handler->Reply(std::move(result));
687687
}
688688

689689
ui64 diff = writeRequest->ByteSize;

ydb/services/deprecated/persqueue_v0/persqueue.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,24 @@ TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system,
2020
, SchemeCache(schemeCache)
2121
{ }
2222

23+
void TGRpcPersQueueService::InitService(const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs, NYdbGrpc::TLoggerPtr logger, size_t index) {
24+
CQS.reserve(cqs.size());
25+
for (auto& cq: cqs) {
26+
CQS.push_back(cq.get());
27+
}
28+
29+
CQ = CQS[index % cqs.size()];
30+
31+
// note that we might call an overloaded InitService(), and not the one from this class
32+
InitService(CQ, logger);
33+
}
34+
2335
void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrpc::TLoggerPtr logger) {
2436
CQ = cq;
2537
if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) {
26-
WriteService.reset(new NGRpcProxy::TPQWriteService(GetService(), CQ, ActorSystem, SchemeCache, Counters, PersQueueWriteSessionsMaxCount));
38+
WriteService.reset(new NGRpcProxy::TPQWriteService(GetService(), CQS, ActorSystem, SchemeCache, Counters, PersQueueWriteSessionsMaxCount));
2739
WriteService->InitClustersUpdater();
28-
ReadService.reset(new NGRpcProxy::TPQReadService(this, CQ, ActorSystem, SchemeCache, Counters, PersQueueReadSessionsMaxCount));
40+
ReadService.reset(new NGRpcProxy::TPQReadService(this, CQS, ActorSystem, SchemeCache, Counters, PersQueueReadSessionsMaxCount));
2941
SetupIncomingRequests(logger);
3042
}
3143
}

ydb/services/deprecated/persqueue_v0/persqueue.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ class TGRpcPersQueueService
2222
public:
2323
TGRpcPersQueueService(NActors::TActorSystem* system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache);
2424

25+
void InitService(
26+
const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs,
27+
NYdbGrpc::TLoggerPtr logger,
28+
size_t index) override;
2529
void InitService(grpc::ServerCompletionQueue* cq, NYdbGrpc::TLoggerPtr logger) override;
2630
void SetGlobalLimiterHandle(NYdbGrpc::TGlobalLimiter* limiter) override;
2731
void StopService() noexcept override;
@@ -35,6 +39,7 @@ class TGRpcPersQueueService
3539
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
3640

3741
NActors::TActorSystem* ActorSystem;
42+
std::vector<grpc::ServerCompletionQueue*> CQS;
3843
grpc::ServerCompletionQueue* CQ = nullptr;
3944

4045
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;

0 commit comments

Comments
 (0)