Skip to content

Commit d17ef57

Browse files
committed
Intermediate changes
commit_hash:8b7eb71badc9f2fcd168ee34e8c379b35577eccb
1 parent 37e325a commit d17ef57

File tree

11 files changed

+272
-14
lines changed

11 files changed

+272
-14
lines changed

library/cpp/containers/dense_hash/dense_hash.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "fwd.h"
44

5+
#include <util/generic/bitops.h>
56
#include <util/generic/utility.h>
67
#include <util/generic/vector.h>
78
#include <util/generic/mapfindptr.h>

library/cpp/yt/stockpile/stockpile.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
#pragma once
22

3+
#include <library/cpp/yt/cpu_clock/clock.h>
4+
5+
#include <library/cpp/yt/misc/enum.h>
6+
37
#include <util/system/types.h>
48

59
#include <util/generic/size_literals.h>
@@ -10,6 +14,14 @@ namespace NYT {
1014

1115
////////////////////////////////////////////////////////////////////////////////
1216

17+
DEFINE_ENUM(EStockpileStrategy,
18+
((FixedBreaks) (0))
19+
((FlooredLoad) (1))
20+
((ProgressiveBackoff) (2))
21+
);
22+
23+
////////////////////////////////////////////////////////////////////////////////
24+
1325
struct TStockpileOptions
1426
{
1527
static constexpr i64 DefaultBufferSize = 4_GBs;
@@ -18,11 +30,18 @@ struct TStockpileOptions
1830
static constexpr int DefaultThreadCount = 4;
1931
int ThreadCount = DefaultThreadCount;
2032

33+
static constexpr EStockpileStrategy DefaultStrategy = EStockpileStrategy::FixedBreaks;
34+
EStockpileStrategy Strategy = DefaultStrategy;
35+
2136
static constexpr TDuration DefaultPeriod = TDuration::MilliSeconds(10);
2237
TDuration Period = DefaultPeriod;
2338
};
2439

25-
void ConfigureStockpile(const TStockpileOptions& options);
40+
////////////////////////////////////////////////////////////////////////////////
41+
42+
void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed);
43+
44+
void RunDetachedStockpileThreads(TStockpileOptions options);
2645

2746
////////////////////////////////////////////////////////////////////////////////
2847

library/cpp/yt/stockpile/stockpile_linux.cpp

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,117 @@
11
#include "stockpile.h"
22

3+
#include "library/cpp/yt/logging/logger.h"
4+
35
#include <thread>
46
#include <mutex>
57

68
#include <sys/mman.h>
79

810
#include <util/system/thread.h>
11+
#include <string.h>
912

1013
namespace NYT {
1114

1215
////////////////////////////////////////////////////////////////////////////////
1316

17+
static const auto Logger = NLogging::TLogger("Stockpile");
18+
19+
constexpr int MADV_STOCKPILE = 0x59410004;
20+
21+
////////////////////////////////////////////////////////////////////////////////
22+
1423
namespace {
1524

16-
void RunStockpile(const TStockpileOptions& options)
25+
void RunWithFixedBreaks(i64 bufferSize, TDuration period)
1726
{
18-
TThread::SetCurrentThreadName("Stockpile");
27+
auto returnCode = ::madvise(nullptr, bufferSize, MADV_STOCKPILE);
28+
YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" returned %v", strerror(returnCode));
29+
Sleep(period);
30+
}
1931

20-
constexpr int MADV_STOCKPILE = 0x59410004;
32+
void RunWithCappedLoad(i64 bufferSize, TDuration period)
33+
{
34+
auto started = GetApproximateCpuInstant();
35+
auto returnCode = ::madvise(nullptr, bufferSize, MADV_STOCKPILE);
36+
YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" returned %v", strerror(returnCode));
37+
auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started);
2138

22-
while (true) {
23-
::madvise(nullptr, options.BufferSize, MADV_STOCKPILE);
24-
Sleep(options.Period);
39+
if (duration < period) {
40+
Sleep(period - duration);
41+
}
42+
}
43+
44+
std::pair<i64, TDuration> RunWithBackoffs(
45+
i64 adjustedBufferSize,
46+
TDuration adjustedPeriod,
47+
const TStockpileOptions& options,
48+
i64 pageSize)
49+
{
50+
int returnCode = ::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE);
51+
YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" returned %v", strerror(returnCode));
52+
53+
switch(returnCode) {
54+
case 0:
55+
Sleep(options.Period);
56+
return {options.BufferSize, options.Period};
57+
58+
case ENOMEM:
59+
if (adjustedBufferSize / 2 >= pageSize) {
60+
// Immediately make an attempt to reclaim half as much.
61+
adjustedBufferSize = adjustedBufferSize / 2;
62+
} else {
63+
// Unless there is not even a single reclaimable page.
64+
Sleep(options.Period);
65+
}
66+
return {adjustedBufferSize, options.Period};
67+
68+
case EAGAIN:
69+
case EINTR:
70+
Sleep(adjustedPeriod);
71+
return {options.BufferSize, adjustedPeriod + options.Period};
72+
73+
default:
74+
Sleep(options.Period);
75+
return {options.BufferSize, options.Period};
2576
}
2677
}
2778

2879
} // namespace
2980

30-
void ConfigureStockpile(const TStockpileOptions& options)
81+
void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed)
82+
{
83+
TThread::SetCurrentThreadName("Stockpile");
84+
85+
const i64 pageSize = sysconf(_SC_PAGESIZE);
86+
auto bufferSize = options.BufferSize;
87+
auto period = options.Period;
88+
89+
while (!shouldProceed || shouldProceed->load()) {
90+
switch (options.Strategy) {
91+
case EStockpileStrategy::FixedBreaks:
92+
RunWithFixedBreaks(options.BufferSize, options.Period);
93+
break;
94+
95+
case EStockpileStrategy::FlooredLoad:
96+
RunWithCappedLoad(options.BufferSize, options.Period);
97+
break;
98+
99+
case EStockpileStrategy::ProgressiveBackoff:
100+
std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period, options, pageSize);
101+
break;
102+
103+
default:
104+
YT_ABORT();
105+
}
106+
}
107+
}
108+
109+
void RunDetachedStockpileThreads(TStockpileOptions options)
31110
{
32111
static std::once_flag OnceFlag;
33-
std::call_once(OnceFlag, [options] {
34-
for (int i = 0; i < options.ThreadCount; i++) {
35-
std::thread(RunStockpile, options).detach();
112+
std::call_once(OnceFlag, [options = std::move(options)] {
113+
for (int i = 0; i < options.ThreadCount; ++i) {
114+
std::thread(RunStockpileThread, options, nullptr).detach();
36115
}
37116
});
38117
}

library/cpp/yt/stockpile/stockpile_other.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ namespace NYT {
44

55
////////////////////////////////////////////////////////////////////////////////
66

7-
void ConfigureStockpile(const TStockpileOptions& /*options*/)
7+
void RunStockpileThread(TStockpileOptions /*options*/, std::atomic<bool>* /*shouldProceed*/)
8+
{ }
9+
10+
void RunDetachedStockpileThreads(TStockpileOptions /*options*/)
811
{ }
912

1013
////////////////////////////////////////////////////////////////////////////////

yt/yt/library/program/config.cpp

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,52 @@ void TTCMallocConfig::Register(TRegistrar registrar)
5454
void TStockpileConfig::Register(TRegistrar registrar)
5555
{
5656
registrar.BaseClassParameter("buffer_size", &TThis::BufferSize)
57-
.Default(DefaultBufferSize);
57+
.Default(DefaultBufferSize)
58+
.GreaterThan(0);
5859
registrar.BaseClassParameter("thread_count", &TThis::ThreadCount)
5960
.Default(DefaultThreadCount);
61+
registrar.BaseClassParameter("strategy", &TThis::Strategy)
62+
.Default(DefaultStrategy);
6063
registrar.BaseClassParameter("period", &TThis::Period)
6164
.Default(DefaultPeriod);
6265
}
6366

67+
TStockpileConfigPtr TStockpileConfig::ApplyDynamic(const TStockpileDynamicConfigPtr& dynamicConfig) const
68+
{
69+
auto mergedConfig = CloneYsonStruct(MakeStrong(this));
70+
71+
if (dynamicConfig->BufferSize) {
72+
mergedConfig->BufferSize = *dynamicConfig->BufferSize;
73+
}
74+
if (dynamicConfig->ThreadCount) {
75+
mergedConfig->ThreadCount = *dynamicConfig->ThreadCount;
76+
}
77+
if (dynamicConfig->Strategy) {
78+
mergedConfig->Strategy = *dynamicConfig->Strategy;
79+
}
80+
if (dynamicConfig->Period) {
81+
mergedConfig->Period = *dynamicConfig->Period;
82+
}
83+
84+
return mergedConfig;
85+
}
86+
87+
////////////////////////////////////////////////////////////////////////////////
88+
89+
void TStockpileDynamicConfig::Register(TRegistrar registrar)
90+
{
91+
registrar.BaseClassParameter("buffer_size", &TThis::BufferSize)
92+
.Optional()
93+
.GreaterThan(0);
94+
registrar.BaseClassParameter("thread_count", &TThis::ThreadCount)
95+
.Optional()
96+
.GreaterThan(0);
97+
registrar.BaseClassParameter("strategy", &TThis::Strategy)
98+
.Optional();
99+
registrar.BaseClassParameter("period", &TThis::Period)
100+
.Optional();
101+
}
102+
64103
////////////////////////////////////////////////////////////////////////////////
65104

66105
void THeapProfilerConfig::Register(TRegistrar registrar)

yt/yt/library/program/config.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ class TStockpileConfig
101101
, public NYTree::TYsonStruct
102102
{
103103
public:
104+
TStockpileConfigPtr ApplyDynamic(const TStockpileDynamicConfigPtr& dynamicConfig) const;
105+
104106
REGISTER_YSON_STRUCT(TStockpileConfig);
105107

106108
static void Register(TRegistrar registrar);
@@ -110,6 +112,24 @@ DEFINE_REFCOUNTED_TYPE(TStockpileConfig)
110112

111113
////////////////////////////////////////////////////////////////////////////////
112114

115+
class TStockpileDynamicConfig
116+
: public NYTree::TYsonStruct
117+
{
118+
public:
119+
std::optional<i64> BufferSize;
120+
std::optional<int> ThreadCount;
121+
std::optional<EStockpileStrategy> Strategy;
122+
std::optional<TDuration> Period;
123+
124+
REGISTER_YSON_STRUCT(TStockpileDynamicConfig);
125+
126+
static void Register(TRegistrar registrar);
127+
};
128+
129+
DEFINE_REFCOUNTED_TYPE(TStockpileDynamicConfig)
130+
131+
////////////////////////////////////////////////////////////////////////////////
132+
113133
class THeapProfilerConfig
114134
: public NYTree::TYsonStruct
115135
{
@@ -178,6 +198,7 @@ class TSingletonsDynamicConfig
178198
NTracing::TJaegerTracerDynamicConfigPtr Jaeger;
179199
NTracing::TTracingTransportConfigPtr TracingTransport;
180200
TTCMallocConfigPtr TCMalloc;
201+
TStockpileDynamicConfigPtr Stockpile;
181202
NYson::TProtobufInteropDynamicConfigPtr ProtobufInterop;
182203

183204
REGISTER_YSON_STRUCT(TSingletonsDynamicConfig);

yt/yt/library/program/helpers.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "helpers.h"
22
#include "config.h"
33
#include "private.h"
4+
#include "stockpile.h"
45

56
#include <yt/yt/core/ytalloc/bindings.h>
67

@@ -243,7 +244,7 @@ void ConfigureSingletons(const TSingletonsConfigPtr& config)
243244

244245
ConfigureTCMalloc(config->TCMalloc);
245246

246-
ConfigureStockpile(*config->Stockpile);
247+
TStockpileManager::Get()->Reconfigure(*config->Stockpile);
247248

248249
if (config->EnableRefCountedTrackerProfiling) {
249250
EnableRefCountedTrackerProfiling();
@@ -305,6 +306,10 @@ void ReconfigureSingletons(const TSingletonsConfigPtr& config, const TSingletons
305306
ConfigureTCMalloc(config->TCMalloc);
306307
}
307308

309+
if (dynamicConfig->Stockpile) {
310+
TStockpileManager::Get()->Reconfigure(*config->Stockpile->ApplyDynamic(dynamicConfig->Stockpile));
311+
}
312+
308313
NYson::SetProtobufInteropConfig(config->ProtobufInterop->ApplyDynamic(dynamicConfig->ProtobufInterop));
309314
}
310315

yt/yt/library/program/public.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ DECLARE_REFCOUNTED_CLASS(TBuildInfo)
1010
DECLARE_REFCOUNTED_CLASS(TRpcConfig)
1111
DECLARE_REFCOUNTED_CLASS(TTCMallocConfig)
1212
DECLARE_REFCOUNTED_CLASS(TStockpileConfig)
13+
DECLARE_REFCOUNTED_CLASS(TStockpileDynamicConfig)
1314
DECLARE_REFCOUNTED_CLASS(TSingletonsConfig)
1415
DECLARE_REFCOUNTED_CLASS(TSingletonsDynamicConfig)
1516
DECLARE_REFCOUNTED_CLASS(TDiagnosticDumpConfig)

yt/yt/library/program/stockpile.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#include "stockpile.h"
2+
3+
namespace NYT {
4+
5+
////////////////////////////////////////////////////////////////////////////////
6+
7+
TStockpileManager* TStockpileManager::Get()
8+
{
9+
return Singleton<TStockpileManager>();
10+
}
11+
12+
void TStockpileManager::Reconfigure(TStockpileOptions options)
13+
{
14+
auto guard = Guard(SpinLock_);
15+
16+
ThreadState_->ShouldProceed.store(false);
17+
18+
for (auto& thread : Threads_) {
19+
thread->join();
20+
}
21+
22+
Threads_.clear();
23+
ThreadState_ = New<TStockpileThreadState>();
24+
ThreadState_->ShouldProceed.store(true, std::memory_order_release);
25+
26+
for (int threadIndex = 0; threadIndex < options.ThreadCount; ++threadIndex) {
27+
Threads_.push_back(std::make_unique<std::thread>([options, state = ThreadState_] {
28+
RunStockpileThread(options, &state->ShouldProceed);
29+
}));
30+
}
31+
}
32+
33+
TStockpileManager::~TStockpileManager()
34+
{
35+
ThreadState_->ShouldProceed.store(false);
36+
37+
for (auto& thread : Threads_) {
38+
thread->detach();
39+
}
40+
}
41+
42+
////////////////////////////////////////////////////////////////////////////////
43+
44+
} // namespace NYT

0 commit comments

Comments
 (0)