Skip to content

Commit a8b10ce

Browse files
author
pechatnov
committed
YT: Fix race on vptr in TServiceBase
commit_hash:d96419b9ca6a790844a064b3da7710e45964159f
1 parent db0b8a1 commit a8b10ce

File tree

2 files changed

+26
-5
lines changed

2 files changed

+26
-5
lines changed

yt/yt/core/rpc/service_detail.cpp

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,8 +1654,6 @@ TServiceBase::TServiceBase(
16541654
Profiler_.AddFuncGauge("/authentication_queue_size", MakeStrong(this), [this] {
16551655
return AuthenticationQueueSize_.load(std::memory_order::relaxed);
16561656
});
1657-
1658-
ServiceLivenessChecker_->Start();
16591657
}
16601658

16611659
const TServiceId& TServiceBase::GetServiceId() const
@@ -2431,6 +2429,25 @@ void TServiceBase::DecrementActiveRequestCount()
24312429
void TServiceBase::InitContext(IServiceContext* /*context*/)
24322430
{ }
24332431

2432+
void TServiceBase::StartServiceLivenessChecker()
2433+
{
2434+
// Fast path.
2435+
if (ServiceLivenessCheckerStarted_.load(std::memory_order::relaxed)) {
2436+
return;
2437+
}
2438+
if (ServiceLivenessCheckerStarted_.exchange(true)) {
2439+
return;
2440+
}
2441+
2442+
if (auto checker = ServiceLivenessChecker_.Acquire()) {
2443+
checker->Start();
2444+
// There may be concurrent ServiceLivenessChecker_.Exchange() call in Stop().
2445+
if (!ServiceLivenessChecker_.Acquire()) {
2446+
YT_UNUSED_FUTURE(checker->Stop());
2447+
}
2448+
}
2449+
}
2450+
24342451
void TServiceBase::RegisterDiscoverRequest(const TCtxDiscoverPtr& context)
24352452
{
24362453
auto payload = GetDiscoverRequestPayload(context);
@@ -2440,6 +2457,7 @@ void TServiceBase::RegisterDiscoverRequest(const TCtxDiscoverPtr& context)
24402457
auto it = DiscoverRequestsByPayload_.find(payload);
24412458
if (it == DiscoverRequestsByPayload_.end()) {
24422459
readerGuard.Release();
2460+
StartServiceLivenessChecker();
24432461
auto writerGuard = WriterGuard(DiscoverRequestsByPayloadLock_);
24442462
DiscoverRequestsByPayload_[payload].Insert(context, 0);
24452463
} else {
@@ -2706,8 +2724,9 @@ TFuture<void> TServiceBase::Stop()
27062724
}
27072725
}
27082726

2709-
YT_UNUSED_FUTURE(ServiceLivenessChecker_->Stop());
2710-
2727+
if (auto checker = ServiceLivenessChecker_.Exchange(nullptr)) {
2728+
YT_UNUSED_FUTURE(checker->Stop());
2729+
}
27112730
return StopResult_.ToFuture();
27122731
}
27132732

yt/yt/core/rpc/service_detail.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -987,7 +987,8 @@ class TServiceBase
987987

988988
std::atomic<bool> EnableErrorCodeCounter_ = false;
989989

990-
const NConcurrency::TPeriodicExecutorPtr ServiceLivenessChecker_;
990+
std::atomic<bool> ServiceLivenessCheckerStarted_ = false;
991+
TAtomicIntrusivePtr<NConcurrency::TPeriodicExecutor> ServiceLivenessChecker_;
991992

992993
using TDiscoverRequestSet = TConcurrentHashMap<TCtxDiscoverPtr, int>;
993994
THashMap<TString, TDiscoverRequestSet> DiscoverRequestsByPayload_;
@@ -1075,6 +1076,7 @@ class TServiceBase
10751076
void IncrementActiveRequestCount();
10761077
void DecrementActiveRequestCount();
10771078

1079+
void StartServiceLivenessChecker();
10781080
void RegisterDiscoverRequest(const TCtxDiscoverPtr& context);
10791081
void ReplyDiscoverRequest(const TCtxDiscoverPtr& context, bool isUp);
10801082

0 commit comments

Comments
 (0)