Skip to content

Commit 5d2be00

Browse files
authored
subscribe on changes in resource broker (#8187)
1 parent cee8afe commit 5d2be00

File tree

8 files changed

+59
-78
lines changed

8 files changed

+59
-78
lines changed

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
577577
IEventHandle::FlagTrackDelivery);
578578

579579
ToBroker(new TEvResourceBroker::TEvResourceBrokerRequest);
580-
ToBroker(new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue));
580+
ToBroker(new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue, /*subscribe=*/ true));
581581

582582
if (auto* mon = AppData()->Mon) {
583583
NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");

ydb/core/kqp/tests/kikimr_tpch/kqp_tpch_ut.cpp

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ class KqpTpch : public NUnitTest::TTestBase {
2929
Tpch.Reset(new NTpch::TTpchRunner(*Driver, Database));
3030

3131
if (!InitDone) {
32-
ConfigureTableService(endpoint);
33-
3432
Tpch->UploadBundledData(2, false);
3533

3634
InitDone = true;
@@ -42,58 +40,6 @@ class KqpTpch : public NUnitTest::TTestBase {
4240
Driver->Stop(true);
4341
}
4442

45-
void ConfigureTableService(const TString& endpoint) {
46-
NYdbGrpc::TGRpcClientLow grpcClient;
47-
auto grpcContext = grpcClient.CreateContext();
48-
49-
NYdbGrpc::TGRpcClientConfig grpcConfig{endpoint};
50-
auto grpc = grpcClient.CreateGRpcServiceConnection<NKikimrClient::TGRpcServer>(grpcConfig);
51-
52-
NKikimrClient::TConsoleRequest request;
53-
auto* action = request.MutableConfigureRequest()->MutableActions()->Add();
54-
auto* configItem = action->MutableAddConfigItem()->MutableConfigItem();
55-
configItem->SetKind(NKikimrConsole::TConfigItem::TableServiceConfigItem);
56-
auto* rm = configItem->MutableConfig()->MutableTableServiceConfig()->MutableResourceManager();
57-
rm->SetChannelBufferSize(10ul << 20);
58-
rm->SetMkqlLightProgramMemoryLimit(100ul << 20);
59-
rm->SetMkqlHeavyProgramMemoryLimit(100ul << 20);
60-
rm->SetQueryMemoryLimit(20ul << 30);
61-
rm->SetPublishStatisticsIntervalSec(0);
62-
63-
TAtomic done = 0;
64-
grpc->DoRequest<NKikimrClient::TConsoleRequest, NKikimrClient::TConsoleResponse>(
65-
request,
66-
[&done](NYdbGrpc::TGrpcStatus&& status, NKikimrClient::TConsoleResponse&& response) {
67-
if (status.Ok()) {
68-
if (response.GetStatus().code() != Ydb::StatusIds::SUCCESS) {
69-
AtomicSet(done, 3);
70-
return;
71-
}
72-
if (response.GetConfigureResponse().GetStatus().code() != Ydb::StatusIds::SUCCESS) {
73-
AtomicSet(done, 4);
74-
return;
75-
}
76-
AtomicSet(done, 1);
77-
} else {
78-
Cerr << "status: {" << status.Msg << ", " << status.InternalError << ", "
79-
<< status.GRpcStatusCode << "}" << Endl;
80-
Cerr << response.Utf8DebugString() << Endl;
81-
AtomicSet(done, 2);
82-
}
83-
},
84-
&NKikimrClient::TGRpcServer::Stub::AsyncConsoleRequest,
85-
{},
86-
grpcContext.get());
87-
88-
while (AtomicGet(done) == 0) {
89-
::Sleep(TDuration::Seconds(1));
90-
}
91-
grpcContext.reset();
92-
grpcClient.Stop(true);
93-
94-
UNIT_ASSERT_EQUAL(done, 1);
95-
}
96-
9743
UNIT_TEST_SUITE(KqpTpch);
9844
UNIT_TEST(Query01);
9945
UNIT_TEST(Query02);

ydb/core/kqp/tests/kikimr_tpch/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
UNITTEST()
22

3+
ENV(YDB_HARD_MEMORY_LIMIT_BYTES="107374182400")
34
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
45
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
56

ydb/core/memory_controller/memory_controller_ut.cpp

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class TWithMemoryControllerServer : public TServer {
6969
for (ui32 nodeIndex = 0; nodeIndex < Runtime->GetNodeCount(); ++nodeIndex) {
7070
Runtime->AddLocalService(MakeMemoryControllerId(nodeIndex),
7171
TActorSetupCmd(
72-
CreateMemoryController(TDuration::Seconds(1), (TIntrusivePtr<IProcessMemoryInfoProvider>)ProcessMemoryInfoProvider,
72+
CreateMemoryController(TDuration::Seconds(1), (TIntrusivePtr<IProcessMemoryInfoProvider>)ProcessMemoryInfoProvider,
7373
Settings->AppConfig->GetMemoryControllerConfig(), resourceBrokerSelfConfig,
7474
Runtime->GetDynamicCounters()),
7575
TMailboxType::ReadAsFilled,
@@ -221,7 +221,7 @@ Y_UNIT_TEST(Config_ConsumerLimits) {
221221
memoryControllerConfig->SetSharedCacheMaxPercent(30);
222222
memoryControllerConfig->SetSharedCacheMinBytes(100_MB);
223223
memoryControllerConfig->SetSharedCacheMaxBytes(500_MB);
224-
224+
225225
memoryControllerConfig->SetMemTableMinPercent(10);
226226
memoryControllerConfig->SetMemTableMaxPercent(20);
227227
memoryControllerConfig->SetMemTableMinBytes(10_MB);
@@ -232,7 +232,7 @@ Y_UNIT_TEST(Config_ConsumerLimits) {
232232

233233
auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
234234
auto& runtime = *server->GetRuntime();
235-
235+
236236
server->ProcessMemoryInfo->CGroupLimit = 1000_MB;
237237
runtime.SimulateSleep(TDuration::Seconds(2));
238238
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/SharedCache/LimitMin")->Val(), 200_MB);
@@ -399,7 +399,7 @@ Y_UNIT_TEST(ResourceBroker) {
399399

400400
auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
401401
memoryControllerConfig->SetQueryExecutionLimitPercent(15);
402-
402+
403403
auto resourceBrokerConfig = serverSettings.AppConfig->MutableResourceBrokerConfig();
404404
auto queue = resourceBrokerConfig->AddQueues();
405405
queue->SetName("queue_cs_ttl");
@@ -410,33 +410,42 @@ Y_UNIT_TEST(ResourceBroker) {
410410
auto& runtime = *server->GetRuntime();
411411
TAutoPtr<IEventHandle> handle;
412412
auto sender = runtime.AllocateEdgeActor();
413+
auto senderSubscriber = runtime.AllocateEdgeActor();
413414

414415
InitRoot(server, sender);
415-
416+
416417
runtime.SimulateSleep(TDuration::Seconds(2));
417418
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)));
418-
auto config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
419-
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 150_MB);
419+
auto config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(sender);
420+
UNIT_ASSERT_VALUES_EQUAL(config->Get()->QueueConfig->GetLimit().GetMemory(), 150_MB);
420421
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/QueryExecution/Limit")->Val(), 150_MB);
421422
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/ActivitiesLimitBytes")->Val(), 300_MB);
422423

424+
runtime.SimulateSleep(TDuration::Seconds(2));
425+
runtime.Send(new IEventHandle(MakeResourceBrokerID(), senderSubscriber, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue, /*subscribe=*/ true)));
426+
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(senderSubscriber);
427+
UNIT_ASSERT_VALUES_EQUAL(config->Get()->QueueConfig->GetLimit().GetMemory(), 150_MB);
428+
423429
server->ProcessMemoryInfo->CGroupLimit = 500_MB;
424430
runtime.SimulateSleep(TDuration::Seconds(2));
425431
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)));
426-
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
427-
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 75_MB);
432+
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(sender);
433+
UNIT_ASSERT_VALUES_EQUAL(config->Get()->QueueConfig->GetLimit().GetMemory(), 75_MB);
428434
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Consumer/QueryExecution/Limit")->Val(), 75_MB);
429435
UNIT_ASSERT_VALUES_EQUAL(server->MemoryControllerCounters->GetCounter("Stats/ActivitiesLimitBytes")->Val(), 150_MB);
430436

437+
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(senderSubscriber);
438+
UNIT_ASSERT_VALUES_EQUAL(config->Get()->QueueConfig->GetLimit().GetMemory(), 75_MB);
439+
431440
// ensure that other settings are not affected:
432441
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest("queue_cs_ttl")));
433-
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
434-
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetCpu(), 3);
435-
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 13_MB);
442+
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(sender);
443+
UNIT_ASSERT_VALUES_EQUAL(config->Get()->QueueConfig->GetLimit().GetCpu(), 3);
444+
UNIT_ASSERT_VALUES_EQUAL(config->Get()->QueueConfig->GetLimit().GetMemory(), 13_MB);
436445
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest("queue_cs_general")));
437-
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);
438-
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetCpu(), 3);
439-
UNIT_ASSERT_VALUES_EQUAL(config->QueueConfig->GetLimit().GetMemory(), 3221225472);
446+
config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(sender);
447+
UNIT_ASSERT_VALUES_EQUAL(config->Get()->QueueConfig->GetLimit().GetCpu(), 3);
448+
UNIT_ASSERT_VALUES_EQUAL(config->Get()->QueueConfig->GetLimit().GetMemory(), 3221225472);
440449
}
441450

442451
Y_UNIT_TEST(ResourceBroker_ConfigLimit) {
@@ -449,7 +458,7 @@ Y_UNIT_TEST(ResourceBroker_ConfigLimit) {
449458

450459
auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
451460
memoryControllerConfig->SetQueryExecutionLimitPercent(15);
452-
461+
453462
auto resourceBrokerConfig = serverSettings.AppConfig->MutableResourceBrokerConfig();
454463
resourceBrokerConfig->MutableResourceLimit()->SetMemory(1000_MB);
455464
auto queue = resourceBrokerConfig->AddQueues();
@@ -466,7 +475,7 @@ Y_UNIT_TEST(ResourceBroker_ConfigLimit) {
466475
auto sender = runtime.AllocateEdgeActor();
467476

468477
InitRoot(server, sender);
469-
478+
470479
runtime.SimulateSleep(TDuration::Seconds(2));
471480
runtime.Send(new IEventHandle(MakeResourceBrokerID(), sender, new TEvResourceBroker::TEvConfigRequest(NLocalDb::KqpResourceManagerQueue)));
472481
auto config = runtime.GrabEdgeEvent<TEvResourceBroker::TEvConfigResponse>(handle);

ydb/core/tablet/resource_broker.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,16 +1234,34 @@ void TResourceBrokerActor::Handle(TEvResourceBroker::TEvConfigure::TPtr &ev,
12341234
ResourceBroker->Configure(std::move(config));
12351235
}
12361236

1237-
LOG_LOG_S(ctx,
1238-
success ? NActors::NLog::PRI_INFO : NActors::NLog::PRI_ERROR,
1239-
NKikimrServices::RESOURCE_BROKER,
1237+
LOG_LOG_S(ctx,
1238+
success ? NActors::NLog::PRI_INFO : NActors::NLog::PRI_ERROR,
1239+
NKikimrServices::RESOURCE_BROKER,
12401240
"Configure result: " << response->Record.ShortDebugString());
12411241

1242+
auto newConfig = ResourceBroker->GetConfig();
1243+
for (auto& queue : newConfig.GetQueues()) {
1244+
auto it = QueueSubscribers.find(queue.GetName());
1245+
if (it == QueueSubscribers.end())
1246+
continue;
1247+
1248+
for(const TActorId& subscriber: it->second) {
1249+
auto resp = MakeHolder<TEvResourceBroker::TEvConfigResponse>();
1250+
resp->QueueConfig = queue;
1251+
ctx.Send(subscriber, resp.Release());
1252+
}
1253+
}
1254+
12421255
ctx.Send(ev->Sender, response.Release());
12431256
}
12441257

12451258
void TResourceBrokerActor::Handle(TEvResourceBroker::TEvConfigRequest::TPtr& ev, const TActorContext&)
12461259
{
1260+
if (ev->Get()->Subscribe) {
1261+
auto [it, _] = QueueSubscribers.emplace(ev->Get()->Queue, THashSet<TActorId>());
1262+
it->second.emplace(ev->Sender);
1263+
}
1264+
12471265
auto config = ResourceBroker->GetConfig();
12481266
auto resp = MakeHolder<TEvResourceBroker::TEvConfigResponse>();
12491267
for (auto& queue : config.GetQueues()) {
@@ -1266,7 +1284,7 @@ void TResourceBrokerActor::Handle(TEvResourceBroker::TEvResourceBrokerRequest::T
12661284
void TResourceBrokerActor::Handle(NMon::TEvHttpInfo::TPtr &ev, const TActorContext &ctx)
12671285
{
12681286
auto config = ResourceBroker->GetConfig();
1269-
1287+
12701288
TStringStream str;
12711289
HTML(str) {
12721290
PRE() {

ydb/core/tablet/resource_broker.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,12 @@ struct TEvResourceBroker {
201201

202202
struct TEvConfigRequest : public TEventLocal<TEvConfigRequest, EvConfigRequest> {
203203
TString Queue;
204+
const bool Subscribe = false;
204205

205-
TEvConfigRequest(const TString& queue)
206-
: Queue(queue) {}
206+
TEvConfigRequest(const TString& queue, bool subscribe = false)
207+
: Queue(queue)
208+
, Subscribe(subscribe)
209+
{}
207210
};
208211

209212
struct TEvConfigResponse : public TEventLocal<TEvConfigResponse, EvConfigResponse> {

ydb/core/tablet/resource_broker_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ class TResourceBrokerActor : public TActorBootstrapped<TResourceBrokerActor> {
482482
NKikimrResourceBroker::TResourceBrokerConfig BootstrapConfig;
483483
::NMonitoring::TDynamicCounterPtr BootstrapCounters;
484484
TIntrusivePtr<TResourceBroker> ResourceBroker;
485+
THashMap<TString, THashSet<TActorId>> QueueSubscribers;
485486
};
486487

487488
} // NResourceBroker

ydb/tests/library/harness/kikimr_config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,9 @@ def __init__(
385385
if default_user_sid:
386386
self.yaml_config["domains_config"]["security_config"]["default_user_sids"] = [default_user_sid]
387387

388+
if os.getenv("YDB_HARD_MEMORY_LIMIT_BYTES"):
389+
self.yaml_config["memory_controller_config"] = {"hard_limit_bytes": int(os.getenv("YDB_HARD_MEMORY_LIMIT_BYTES"))}
390+
388391
if pg_compatible_expirement:
389392
self.yaml_config["table_service_config"]["enable_prepared_ddl"] = True
390393
self.yaml_config["table_service_config"]["enable_ast_cache"] = True

0 commit comments

Comments
 (0)