Skip to content

Commit 99376dd

Browse files
authored
Fix possible write session frozing (#8075) (#8134)
1 parent ac6da46 commit 99376dd

File tree

4 files changed

+34
-12
lines changed

4 files changed

+34
-12
lines changed

ydb/services/deprecated/persqueue_v0/grpc_pq_session.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,10 +306,10 @@ class ISession : public ISessionHandler<TResponse>
306306
protected:
307307
grpc::ServerCompletionQueue* const CQ;
308308
grpc::ServerContext Context;
309-
grpc::ServerAsyncReaderWriter<TResponse, TRequest>
310-
Stream;
311-
private:
309+
grpc::ServerAsyncReaderWriter<TResponse, TRequest> Stream;
310+
312311
TSpinLock Lock;
312+
private:
313313
bool HaveWriteInflight;
314314
bool NeedFinish;
315315
std::atomic<bool> ClientIsDone;

ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,23 @@ void TPQWriteServiceImpl::TSession::OnCreated() { // Start waiting fo
2525
ReplyWithError("proxy overloaded", NPersQueue::NErrorCode::OVERLOAD);
2626
return;
2727
}
28-
TMaybe<TString> localCluster = Proxy->AvailableLocalCluster();
2928
if (NeedDiscoverClusters) {
29+
TMaybe<TString> localCluster = Proxy->AvailableLocalCluster();
3030
if (!localCluster.Defined()) {
3131
ReplyWithError("initializing", NPersQueue::NErrorCode::INITIALIZING);
3232
return;
3333
} else if (localCluster->empty()) {
3434
ReplyWithError("cluster disabled", NPersQueue::NErrorCode::CLUSTER_DISABLED);
3535
return;
36-
} else {
37-
CreateActor(*localCluster);
36+
} else if (!CreateActor(*localCluster)) {
37+
Proxy->ReleaseSession(this);
38+
return;
3839
}
39-
} else {
40-
CreateActor(TString());
40+
} else if (!CreateActor(TString())) {
41+
Proxy->ReleaseSession(this);
42+
return;
4143
}
44+
4245
ReadyForNextRead();
4346
}
4447

@@ -61,9 +64,14 @@ void TPQWriteServiceImpl::TSession::OnRead(const TWriteRequest& request) {
6164
}
6265

6366
void TPQWriteServiceImpl::TSession::OnDone() {
67+
{
68+
TGuard<TSpinLock> lock(Lock);
69+
IsDone = true;
70+
}
6471
SendEvent(new TEvPQProxy::TEvDone());
6572
}
6673

74+
6775
TPQWriteServiceImpl::TSession::TSession(std::shared_ptr<TPQWriteServiceImpl> proxy,
6876
grpc::ServerCompletionQueue* cq, ui64 cookie, const TActorId& schemeCache,
6977
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, bool needDiscoverClusters)
@@ -97,18 +105,30 @@ bool TPQWriteServiceImpl::TSession::IsShuttingDown() const {
97105
return Proxy->IsShuttingDown();
98106
}
99107

100-
void TPQWriteServiceImpl::TSession::CreateActor(const TString &localCluster) {
108+
bool TPQWriteServiceImpl::TSession::CreateActor(const TString &localCluster) {
109+
TGuard<TSpinLock> lock(Lock);
110+
if (IsDone) {
111+
ReplyWithError("is done", NPersQueue::NErrorCode::INITIALIZING);
112+
return false;
113+
}
101114

102115
auto classifier = Proxy->GetClassifier();
103116
ActorId = Proxy->ActorSystem->Register(
104117
new TWriteSessionActor(this, Cookie, SchemeCache, Counters, localCluster,
105118
classifier ? classifier->ClassifyAddress(GetPeerName())
106119
: "unknown"), TMailboxType::Simple, 0
107120
);
121+
return true;
108122
}
109123

110124
void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) {
111-
Proxy->ActorSystem->Send(ActorId, ev);
125+
std::unique_ptr<IEventBase> e;
126+
e.reset(ev);
127+
128+
TGuard<TSpinLock> lock(Lock);
129+
if (ActorId) {
130+
Proxy->ActorSystem->Send(ActorId, e.release());
131+
}
112132
}
113133

114134
///////////////////////////////////////////////////////////////////////////////

ydb/services/deprecated/persqueue_v0/grpc_pq_write.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
3838
bool IsShuttingDown() const override;
3939

4040
private:
41-
void CreateActor(const TString& localCluster);
41+
[[nodiscard]] bool CreateActor(const TString& localCluster);
4242
void SendEvent(NActors::IEventBase* ev);
4343

4444
private:
@@ -52,6 +52,7 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
5252
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
5353

5454
bool NeedDiscoverClusters;
55+
bool IsDone = false;
5556
};
5657
using TSessionRef = TIntrusivePtr<TSession>;
5758

ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ void TWriteSessionActor::CheckFinish(const TActorContext& ctx) {
160160
}
161161

162162
void TWriteSessionActor::Handle(TEvPQProxy::TEvDone::TPtr&, const TActorContext& ctx) {
163+
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie << " got TEvDone");
163164
WritesDone = true;
164165
CheckFinish(ctx);
165166
}
@@ -339,7 +340,7 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo
339340
errorReason = Sprintf("topic '%s' describe error, Status# %s, Marker# PQ1", path.back().c_str(),
340341
ToString(entry.Status).c_str());
341342
CloseSession(errorReason, NPersQueue::NErrorCode::ERROR, ctx);
342-
break;
343+
return;
343344
}
344345
}
345346
if (!entry.PQGroupInfo) {

0 commit comments

Comments
 (0)