Skip to content

Commit 009889b

Browse files
authored
Apply old Resource Broker config (#8087)
1 parent 8631961 commit 009889b

File tree

6 files changed

+137
-35
lines changed

6 files changed

+137
-35
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2047,8 +2047,29 @@ void TMemoryControllerInitializer::InitializeServices(
20472047
NActors::TActorSystemSetup* setup,
20482048
const NKikimr::TAppData* appData)
20492049
{
2050-
auto config = appData->MemoryControllerConfig;
2051-
auto* actor = NMemory::CreateMemoryController(TDuration::Seconds(1), ProcessMemoryInfoProvider, config, appData->Counters);
2050+
NMemory::TResourceBrokerConfig resourceBrokerSelfConfig; // for backward compatibility
2051+
auto mergeResourceBrokerConfigs = [&](const NKikimrResourceBroker::TResourceBrokerConfig& resourceBrokerConfig) {
2052+
if (resourceBrokerConfig.HasResourceLimit() && resourceBrokerConfig.GetResourceLimit().HasMemory()) {
2053+
resourceBrokerSelfConfig.LimitBytes = resourceBrokerConfig.GetResourceLimit().GetMemory();
2054+
}
2055+
for (const auto& queue : resourceBrokerConfig.GetQueues()) {
2056+
if (queue.GetName() == NLocalDb::KqpResourceManagerQueue) {
2057+
if (queue.HasLimit() && queue.GetLimit().HasMemory()) {
2058+
resourceBrokerSelfConfig.QueryExecutionLimitBytes = queue.GetLimit().GetMemory();
2059+
}
2060+
}
2061+
}
2062+
};
2063+
if (Config.HasBootstrapConfig() && Config.GetBootstrapConfig().HasResourceBroker()) {
2064+
mergeResourceBrokerConfigs(Config.GetBootstrapConfig().GetResourceBroker());
2065+
}
2066+
if (Config.HasResourceBrokerConfig()) {
2067+
mergeResourceBrokerConfigs(Config.GetResourceBrokerConfig());
2068+
}
2069+
2070+
auto* actor = NMemory::CreateMemoryController(TDuration::Seconds(1), ProcessMemoryInfoProvider,
2071+
Config.GetMemoryControllerConfig(), resourceBrokerSelfConfig,
2072+
appData->Counters);
20522073
setup->LocalServices.emplace_back(
20532074
NMemory::MakeMemoryControllerId(0),
20542075
TActorSetupCmd(actor, TMailboxType::HTSwap, appData->BatchPoolId)

ydb/core/memory_controller/memory_controller.cpp

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,6 @@ using namespace NActors;
3333
using namespace NResourceBroker;
3434
using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr;
3535

36-
struct TResourceBrokerLimits {
37-
ui64 LimitBytes;
38-
ui64 QueryExecutionLimitBytes;
39-
40-
auto operator<=>(const TResourceBrokerLimits&) const = default;
41-
42-
TString ToString() const noexcept {
43-
TStringBuilder result;
44-
result << "LimitBytes: " << LimitBytes;
45-
result << " QueryExecutionLimitBytes: " << QueryExecutionLimitBytes;
46-
return result;
47-
}
48-
};
49-
5036
class TMemoryConsumer : public IMemoryConsumer {
5137
public:
5238
TMemoryConsumer(EMemoryConsumerKind kind, TActorId actorId)
@@ -109,12 +95,14 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
10995
TDuration interval,
11096
TIntrusiveConstPtr<IProcessMemoryInfoProvider> processMemoryInfoProvider,
11197
const NKikimrConfig::TMemoryControllerConfig& config,
98+
const TResourceBrokerConfig& resourceBrokerConfig,
11299
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters)
113100
: Interval(interval)
114101
, MemTables(std::make_shared<TMemTableMemoryConsumersCollection>(counters,
115102
Consumers.emplace(EMemoryConsumerKind::MemTable, MakeIntrusive<TMemoryConsumer>(EMemoryConsumerKind::MemTable, TActorId{})).first->second))
116103
, ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider))
117104
, Config(config)
105+
, ResourceBrokerSelfConfig(resourceBrokerConfig)
118106
, Counters(counters)
119107
{}
120108

@@ -127,7 +115,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
127115

128116
HandleWakeup(ctx);
129117

130-
LOG_INFO_S(ctx, NKikimrServices::MEMORY_CONTROLLER, "Bootstrapped");
118+
LOG_INFO_S(ctx, NKikimrServices::MEMORY_CONTROLLER, "Bootstrapped with config " << Config.ShortDebugString());
131119
}
132120

133121
private:
@@ -148,7 +136,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
148136

149137
void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev, const TActorContext& ctx) {
150138
Config.Swap(ev->Get()->Record.MutableConfig()->MutableMemoryControllerConfig());
151-
LOG_INFO_S(ctx, NKikimrServices::MEMORY_CONTROLLER, "Config updated " << Config.DebugString());
139+
LOG_INFO_S(ctx, NKikimrServices::MEMORY_CONTROLLER, "Config updated " << Config.ShortDebugString());
152140
}
153141

154142
void HandleWakeup(const TActorContext& ctx) noexcept {
@@ -158,7 +146,9 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
158146
ui64 hardLimitBytes = GetHardLimitBytes(Config, processMemoryInfo, hasMemTotalHardLimit);
159147
ui64 softLimitBytes = GetSoftLimitBytes(Config, hardLimitBytes);
160148
ui64 targetUtilizationBytes = GetTargetUtilizationBytes(Config, hardLimitBytes);
161-
ui64 activitiesLimitBytes = GetActivitiesLimitBytes(Config, hardLimitBytes);
149+
ui64 activitiesLimitBytes = ResourceBrokerSelfConfig.LimitBytes
150+
? ResourceBrokerSelfConfig.LimitBytes // for backward compatibility
151+
: GetActivitiesLimitBytes(Config, hardLimitBytes);
162152

163153
TVector<TConsumerState> consumers(::Reserve(Consumers.size()));
164154
ui64 consumersConsumption = 0;
@@ -264,7 +254,9 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
264254
memoryStats.SetConsumersLimit(consumersLimitBytes);
265255

266256
ui64 queryExecutionConsumption = TAlignedPagePool::GetGlobalPagePoolSize();
267-
ui64 queryExecutionLimitBytes = GetQueryExecutionLimitBytes(Config, hardLimitBytes);
257+
ui64 queryExecutionLimitBytes = ResourceBrokerSelfConfig.QueryExecutionLimitBytes
258+
? ResourceBrokerSelfConfig.QueryExecutionLimitBytes // for backward compatibility
259+
: GetQueryExecutionLimitBytes(Config, hardLimitBytes);
268260
LOG_INFO_S(ctx, NKikimrServices::MEMORY_CONTROLLER, "Consumer QueryExecution state:"
269261
<< " Consumption: " << queryExecutionConsumption << " Limit: " << queryExecutionLimitBytes);
270262
Counters->GetCounter("Consumer/QueryExecution/Consumption")->Set(queryExecutionConsumption);
@@ -273,7 +265,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
273265
memoryStats.SetQueryExecutionLimit(queryExecutionLimitBytes);
274266

275267
// Note: for now ResourceBroker and its queues aren't MemoryController consumers and don't share limits with other caches
276-
ApplyResourceBrokerLimits({
268+
ApplyResourceBrokerConfig({
277269
activitiesLimitBytes,
278270
queryExecutionLimitBytes
279271
});
@@ -363,22 +355,22 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
363355
}
364356
}
365357

366-
void ApplyResourceBrokerLimits(TResourceBrokerLimits limits) {
367-
if (limits == CurrentResourceBrokerLimits) {
358+
void ApplyResourceBrokerConfig(TResourceBrokerConfig config) {
359+
if (config == CurrentResourceBrokerConfig) {
368360
return;
369361
}
370362

371363
TAutoPtr<TEvResourceBroker::TEvConfigure> configure = new TEvResourceBroker::TEvConfigure();
372364
configure->Merge = true;
373-
configure->Record.MutableResourceLimit()->SetMemory(limits.LimitBytes);
365+
configure->Record.MutableResourceLimit()->SetMemory(config.LimitBytes);
374366

375367
auto queue = configure->Record.AddQueues();
376368
queue->SetName(NLocalDb::KqpResourceManagerQueue);
377-
queue->MutableLimit()->SetMemory(limits.QueryExecutionLimitBytes);
369+
queue->MutableLimit()->SetMemory(config.QueryExecutionLimitBytes);
378370

379371
Send(MakeResourceBrokerID(), configure.Release());
380372

381-
CurrentResourceBrokerLimits.emplace(std::move(limits));
373+
CurrentResourceBrokerConfig.emplace(std::move(config));
382374
}
383375

384376
TConsumerCounters& GetConsumerCounters(EMemoryConsumerKind consumer) {
@@ -445,9 +437,10 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
445437
std::shared_ptr<TMemTableMemoryConsumersCollection> MemTables;
446438
const TIntrusiveConstPtr<IProcessMemoryInfoProvider> ProcessMemoryInfoProvider;
447439
NKikimrConfig::TMemoryControllerConfig Config;
440+
TResourceBrokerConfig ResourceBrokerSelfConfig;
448441
const TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
449442
TMap<EMemoryConsumerKind, TConsumerCounters> ConsumerCounters;
450-
std::optional<TResourceBrokerLimits> CurrentResourceBrokerLimits;
443+
std::optional<TResourceBrokerConfig> CurrentResourceBrokerConfig;
451444
};
452445

453446
}
@@ -456,11 +449,13 @@ IActor* CreateMemoryController(
456449
TDuration interval,
457450
TIntrusiveConstPtr<IProcessMemoryInfoProvider> processMemoryInfoProvider,
458451
const NKikimrConfig::TMemoryControllerConfig& config,
452+
const TResourceBrokerConfig& resourceBrokerSelfConfig,
459453
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) {
460454
return new TMemoryController(
461455
interval,
462456
std::move(processMemoryInfoProvider),
463457
config,
458+
resourceBrokerSelfConfig,
464459
GetServiceCounters(counters, "utils")->GetSubgroup("component", "memory_controller"));
465460
}
466461

ydb/core/memory_controller/memory_controller.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,25 @@
1010

1111
namespace NKikimr::NMemory {
1212

13+
struct TResourceBrokerConfig {
14+
ui64 LimitBytes = 0;
15+
ui64 QueryExecutionLimitBytes = 0;
16+
17+
auto operator<=>(const TResourceBrokerConfig&) const = default;
18+
19+
TString ToString() const noexcept {
20+
TStringBuilder result;
21+
result << "LimitBytes: " << LimitBytes;
22+
result << " QueryExecutionLimitBytes: " << QueryExecutionLimitBytes;
23+
return result;
24+
}
25+
};
26+
1327
NActors::IActor* CreateMemoryController(
1428
TDuration interval,
1529
TIntrusiveConstPtr<IProcessMemoryInfoProvider> processMemoryInfoProvider,
16-
const NKikimrConfig::TMemoryControllerConfig& config,
30+
const NKikimrConfig::TMemoryControllerConfig& config,
31+
const TResourceBrokerConfig& resourceBrokerSelfConfig,
1732
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
1833

1934
}

ydb/core/memory_controller/memory_controller_config.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ inline ui64 GetHardLimitBytes(const NKikimrConfig::TMemoryControllerConfig& conf
6262
hasMemTotalHardLimit = true;
6363
return info.MemTotal.value();
6464
}
65-
return 512_MB; // fallback
65+
return 2_GB; // fallback
6666
}
6767

6868
GET_LIMIT(SoftLimit)

ydb/core/memory_controller/memory_controller_ut.cpp

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,27 @@ class TWithMemoryControllerServer : public TServer {
5151
ProcessMemoryInfoProvider = MakeIntrusive<TProcessMemoryInfoProvider>();
5252
ProcessMemoryInfo = &ProcessMemoryInfoProvider->ProcessMemoryInfo;
5353

54+
// copy-paste from TMemoryControllerInitializer::InitializeServices
55+
NMemory::TResourceBrokerConfig resourceBrokerSelfConfig;
56+
const auto& resourceBrokerConfig = Settings->AppConfig->GetResourceBrokerConfig();
57+
if (resourceBrokerConfig.HasResourceLimit() && resourceBrokerConfig.GetResourceLimit().HasMemory()) {
58+
resourceBrokerSelfConfig.LimitBytes = resourceBrokerConfig.GetResourceLimit().GetMemory();
59+
}
60+
for (const auto& queue : resourceBrokerConfig.GetQueues()) {
61+
if (queue.GetName() == NLocalDb::KqpResourceManagerQueue) {
62+
if (queue.HasLimit() && queue.GetLimit().HasMemory()) {
63+
resourceBrokerSelfConfig.QueryExecutionLimitBytes = queue.GetLimit().GetMemory();
64+
}
65+
}
66+
}
67+
Cerr << "ResourceBrokerSelfConfig: " << resourceBrokerSelfConfig.ToString() << Endl;
68+
5469
for (ui32 nodeIndex = 0; nodeIndex < Runtime->GetNodeCount(); ++nodeIndex) {
5570
Runtime->AddLocalService(MakeMemoryControllerId(nodeIndex),
5671
TActorSetupCmd(
5772
CreateMemoryController(TDuration::Seconds(1), (TIntrusivePtr<IProcessMemoryInfoProvider>)ProcessMemoryInfoProvider,
58-
{}, Runtime->GetDynamicCounters()),
73+
Settings->AppConfig->GetMemoryControllerConfig(), resourceBrokerSelfConfig,
74+
Runtime->GetDynamicCounters()),
5975
TMailboxType::ReadAsFilled,
6076
0),
6177
nodeIndex);
@@ -167,9 +183,9 @@ Y_UNIT_TEST(Counters_NoHardLimit) {
167183
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/CGroupLimit")->Val(), 0_MB);
168184
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/MemTotal")->Val(), 0_MB);
169185
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/AllocatedMemory")->Val(), 0_MB);
170-
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/HardLimit")->Val(), 512_MB); // default
171-
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/SoftLimit")->Val(), 384_MB);
172-
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/TargetUtilization")->Val(), 256_MB);
186+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/HardLimit")->Val(), 2_GB); // default
187+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/SoftLimit")->Val(), 1536_MB);
188+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/TargetUtilization")->Val(), 1_GB);
173189

174190
server->ProcessMemoryInfo->CGroupLimit = 200_MB;
175191
runtime.SimulateSleep(TDuration::Seconds(2));
@@ -401,12 +417,16 @@ Y_UNIT_TEST(ResourceBroker) {
401417
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)));
402418
auto config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
403419
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 150_MB);
420+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/QueryExecution/Limit")->Val(), 150_MB);
421+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/ActivitiesLimitBytes")->Val(), 300_MB);
404422

405423
server->ProcessMemoryInfo->CGroupLimit = 500_MB;
406424
runtime.SimulateSleep(TDuration::Seconds(2));
407425
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)));
408426
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
409427
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 75_MB);
428+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/QueryExecution/Limit")->Val(), 75_MB);
429+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/ActivitiesLimitBytes")->Val(), 150_MB);
410430

411431
// ensure that other settings are not affected:
412432
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest("queue_cs_ttl")));
@@ -417,7 +437,60 @@ Y_UNIT_TEST(ResourceBroker) {
417437
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
418438
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetCpu(), 3);
419439
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 3221225472);
440+
}
441+
442+
Y_UNIT_TEST(ResourceBroker_ConfigLimit) {
443+
using namespace NResourceBroker;
420444

445+
TPortManager pm;
446+
TServerSettings serverSettings(pm.GetPort(2134));
447+
serverSettings.SetDomainName("Root")
448+
.SetUseRealThreads(false);
449+
450+
auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
451+
memoryControllerConfig->SetQueryExecutionLimitPercent(15);
452+
453+
auto resourceBrokerConfig = serverSettings.AppConfig->MutableResourceBrokerConfig();
454+
resourceBrokerConfig->MutableResourceLimit()->SetMemory(1000_MB);
455+
auto queue = resourceBrokerConfig->AddQueues();
456+
queue->SetName("queue_kqp_resource_manager");
457+
queue->MutableLimit()->SetMemory(999_MB);
458+
queue = resourceBrokerConfig->AddQueues();
459+
queue->SetName("queue_cs_ttl");
460+
queue->MutableLimit()->SetMemory(13_MB);
461+
462+
auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
463+
server->ProcessMemoryInfo->CGroupLimit = 500_MB;
464+
auto& runtime = *server->GetRuntime();
465+
TAutoPtr<IEventHandle> handle;
466+
auto sender = runtime.AllocateEdgeActor();
467+
468+
InitRoot(server, sender);
469+
470+
runtime.SimulateSleep(TDuration::Seconds(2));
471+
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)));
472+
auto config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
473+
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 999_MB);
474+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/QueryExecution/Limit")->Val(), 999_MB);
475+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/ActivitiesLimitBytes")->Val(), 1000_MB);
476+
477+
server->ProcessMemoryInfo->CGroupLimit = 200_MB;
478+
runtime.SimulateSleep(TDuration::Seconds(2));
479+
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)));
480+
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
481+
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 999_MB);
482+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/QueryExecution/Limit")->Val(), 999_MB);
483+
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/ActivitiesLimitBytes")->Val(), 1000_MB);
484+
485+
// ensure that other settings are not affected:
486+
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest("queue_cs_ttl")));
487+
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
488+
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetCpu(), 3);
489+
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 13_MB);
490+
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest("queue_cs_general")));
491+
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
492+
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetCpu(), 3);
493+
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 3221225472);
421494
}
422495

423496
}

ydb/tests/library/harness/resources/default_yaml.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,5 +256,3 @@ federated_query_config:
256256
uri: ""
257257
pinger:
258258
ping_period: "30s"
259-
memory_controller_config:
260-
hard_limit_bytes: 4294967296

0 commit comments

Comments
 (0)