Skip to content

Commit 0138b90

Browse files
authored
remove outdated code in rm service (#7207)
1 parent 8beeb7f commit 0138b90

File tree

4 files changed

+41
-160
lines changed

4 files changed

+41
-160
lines changed

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 27 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,6 @@ struct TEvPrivate {
103103

104104
struct TEvSchedulePublishResources : public TEventLocal<TEvSchedulePublishResources, EEv::EvSchedulePublishResources> {
105105
};
106-
107-
struct TEvTakeResourcesSnapshot : public TEventLocal<TEvTakeResourcesSnapshot, EEv::EvTakeResourcesSnapshot> {
108-
std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)> Callback;
109-
};
110106
};
111107

112108
class TKqpResourceManager : public IKqpResourceManager {
@@ -117,7 +113,6 @@ class TKqpResourceManager : public IKqpResourceManager {
117113
, ExecutionUnitsResource(config.GetComputeActorsCount())
118114
, ExecutionUnitsLimit(config.GetComputeActorsCount())
119115
, ScanQueryMemoryResource(config.GetQueryMemoryLimit())
120-
, PublishResourcesByExchanger(config.GetEnablePublishResourcesByExchanger())
121116
{
122117
SetConfigValues(config);
123118
}
@@ -132,10 +127,7 @@ class TKqpResourceManager : public IKqpResourceManager {
132127
config.GetKqpPatternCacheCompiledCapacityBytes(),
133128
config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
134129

135-
if (PublishResourcesByExchanger) {
136-
CreateResourceInfoExchanger(config.GetInfoExchangerSettings());
137-
return;
138-
}
130+
CreateResourceInfoExchanger(config.GetInfoExchangerSettings());
139131
}
140132

141133
const TIntrusivePtr<TKqpCounters>& GetCounters() const override {
@@ -144,14 +136,10 @@ class TKqpResourceManager : public IKqpResourceManager {
144136

145137
void CreateResourceInfoExchanger(
146138
const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) {
147-
PublishResourcesByExchanger = true;
148-
if (!ResourceInfoExchanger) {
149-
ResourceSnapshotState = std::make_shared<TResourceSnapshotState>();
150-
auto exchanger = CreateKqpResourceInfoExchangerActor(
151-
Counters, ResourceSnapshotState, settings);
152-
ResourceInfoExchanger = ActorSystem->Register(exchanger);
153-
return;
154-
}
139+
ResourceSnapshotState = std::make_shared<TResourceSnapshotState>();
140+
auto exchanger = CreateKqpResourceInfoExchangerActor(
141+
Counters, ResourceSnapshotState, settings);
142+
ResourceInfoExchanger = ActorSystem->Register(exchanger);
155143
}
156144

157145
bool AllocateExecutionUnits(ui32 cnt) {
@@ -328,39 +316,28 @@ class TKqpResourceManager : public IKqpResourceManager {
328316

329317
TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const override {
330318
TVector<NKikimrKqp::TKqpNodeResources> resources;
331-
Y_ABORT_UNLESS(PublishResourcesByExchanger);
332-
333-
if (PublishResourcesByExchanger) {
334-
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
335-
with_lock (ResourceSnapshotState->Lock) {
336-
infos = ResourceSnapshotState->Snapshot;
337-
}
338-
if (infos != nullptr) {
339-
resources = *infos;
340-
}
319+
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
320+
with_lock (ResourceSnapshotState->Lock) {
321+
infos = ResourceSnapshotState->Snapshot;
322+
}
323+
if (infos != nullptr) {
324+
resources = *infos;
341325
}
342326

343327
return resources;
344328
}
345329

346330
void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
347331
LOG_AS_D("Schedule Snapshot request");
348-
if (PublishResourcesByExchanger) {
349-
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
350-
with_lock (ResourceSnapshotState->Lock) {
351-
infos = ResourceSnapshotState->Snapshot;
352-
}
353-
TVector<NKikimrKqp::TKqpNodeResources> resources;
354-
if (infos != nullptr) {
355-
resources = *infos;
356-
}
357-
callback(std::move(resources));
358-
return;
332+
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
333+
with_lock (ResourceSnapshotState->Lock) {
334+
infos = ResourceSnapshotState->Snapshot;
335+
}
336+
TVector<NKikimrKqp::TKqpNodeResources> resources;
337+
if (infos != nullptr) {
338+
resources = *infos;
359339
}
360-
auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>();
361-
ev->Callback = std::move(callback);
362-
TAutoPtr<IEventHandle> handle = new IEventHandle(SelfId, SelfId, ev.Release());
363-
ActorSystem->Send(handle);
340+
callback(std::move(resources));
364341
}
365342

366343
TKqpLocalNodeResources GetLocalResources() const override {
@@ -470,7 +447,6 @@ class TKqpResourceManager : public IKqpResourceManager {
470447

471448
// state for resource info exchanger
472449
std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState;
473-
bool PublishResourcesByExchanger;
474450
TActorId ResourceInfoExchanger = TActorId();
475451
};
476452

@@ -500,7 +476,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
500476
: Config(config)
501477
, ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID())
502478
, KqpProxySharedResources(std::move(kqpProxySharedResources))
503-
, PublishResourcesByExchanger(config.GetEnablePublishResourcesByExchanger())
504479
{
505480
ResourceManager = std::make_shared<TKqpResourceManager>(config, counters);
506481
with_lock (ResourceManagers.Lock) {
@@ -575,7 +550,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
575550
hFunc(TEvInterconnect::TEvNodeInfo, Handle);
576551
hFunc(TEvPrivate::TEvPublishResources, HandleWork);
577552
hFunc(TEvPrivate::TEvSchedulePublishResources, HandleWork);
578-
hFunc(TEvPrivate::TEvTakeResourcesSnapshot, HandleWork);
579553
hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
580554
hFunc(TEvKqp::TEvKqpProxyPublishRequest, HandleWork);
581555
hFunc(TEvResourceBroker::TEvConfigResponse, HandleWork);
@@ -611,20 +585,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
611585
PublishResourceUsage("kqp_proxy");
612586
}
613587

614-
void HandleWork(TEvPrivate::TEvTakeResourcesSnapshot::TPtr& ev) {
615-
if (WbState.DomainNotFound) {
616-
LOG_E("Can not take resources snapshot, ssGroupId not set. Tenant: " << WbState.Tenant
617-
<< ", Board: " << WbState.BoardPath);
618-
ev->Get()->Callback({});
619-
return;
620-
}
621-
622-
LOG_D("Create Snapshot actor, board: " << WbState.BoardPath);
623-
624-
Register(
625-
CreateTakeResourcesSnapshotActor(WbState.BoardPath, std::move(ev->Get()->Callback)));
626-
}
627-
628588
void HandleWork(TEvResourceBroker::TEvConfigResponse::TPtr& ev) {
629589
if (!ev->Get()->QueueConfig) {
630590
LOG_E(NLocalDb::KqpResourceManagerQueue << " not configured!");
@@ -699,23 +659,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
699659
config.GetKqpPatternCacheCompiledCapacityBytes(),
700660
config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
701661

702-
bool enablePublishResourcesByExchanger = config.GetEnablePublishResourcesByExchanger();
703-
if (enablePublishResourcesByExchanger != PublishResourcesByExchanger) {
704-
PublishResourcesByExchanger = enablePublishResourcesByExchanger;
705-
if (enablePublishResourcesByExchanger) {
706-
ResourceManager->CreateResourceInfoExchanger(config.GetInfoExchangerSettings());
707-
PublishResourceUsage("exchanger enabled");
708-
} else {
709-
if (ResourceManager->ResourceInfoExchanger) {
710-
Send(ResourceManager->ResourceInfoExchanger, new TEvents::TEvPoison);
711-
ResourceManager->ResourceInfoExchanger = TActorId();
712-
}
713-
ResourceManager->PublishResourcesByExchanger = false;
714-
ResourceManager->ResourceSnapshotState.reset();
715-
PublishResourceUsage("exchanger disabled");
716-
}
717-
}
718-
719662
#define FORCE_VALUE(name) if (!config.Has ## name ()) config.Set ## name(config.Get ## name());
720663
FORCE_VALUE(ComputeActorsCount)
721664
FORCE_VALUE(ChannelBufferSize)
@@ -762,14 +705,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
762705
TStringStream str;
763706
str.Reserve(8 * 1024);
764707

765-
auto snapshot = TVector<NKikimrKqp::TKqpNodeResources>();
766-
767-
if (PublishResourcesByExchanger) {
768-
ResourceManager->RequestClusterResourcesInfo(
769-
[&snapshot](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
770-
snapshot = std::move(resources);
771-
});
772-
}
708+
auto snapshot = ResourceManager->GetClusterResources();
773709

774710
HTML(str) {
775711
PRE() {
@@ -823,9 +759,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
823759
ResourceManager->ResourceInfoExchanger = TActorId();
824760
}
825761
ResourceManager->ResourceSnapshotState.reset();
826-
if (WbState.BoardPublisherActorId) {
827-
Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison);
828-
}
829762
TActor::PassAway();
830763
}
831764

@@ -882,39 +815,14 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
882815
pool->SetAvailable(ResourceManager->ScanQueryMemoryResource.Available());
883816
}
884817

885-
if (PublishResourcesByExchanger) {
886-
LOG_I("Send to publish resource usage for "
887-
<< "reason: " << reason
888-
<< ", payload: " << payload.ShortDebugString());
889-
WbState.LastPublishTime = now;
890-
if (ResourceManager->ResourceInfoExchanger) {
891-
Send(ResourceManager->ResourceInfoExchanger,
892-
new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload)));
893-
}
894-
return;
895-
}
896-
897-
if (WbState.BoardPublisherActorId) {
898-
LOG_I("Kill previous board publisher for '" << WbState.BoardPath
899-
<< "' at " << WbState.BoardPublisherActorId << ", reason: " << reason);
900-
Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison);
901-
}
902-
903-
WbState.BoardPublisherActorId = TActorId();
904-
905-
if (WbState.DomainNotFound) {
906-
LOG_E("Can not find default state storage group for database " << WbState.Tenant);
907-
return;
908-
}
909-
910-
auto boardPublisher = CreateBoardPublishActor(WbState.BoardPath, payload.SerializeAsString(), SelfId(),
911-
/* ttlMs */ 0, /* reg */ true);
912-
WbState.BoardPublisherActorId = Register(boardPublisher);
913-
818+
LOG_I("Send to publish resource usage for "
819+
<< "reason: " << reason
820+
<< ", payload: " << payload.ShortDebugString());
914821
WbState.LastPublishTime = now;
915-
916-
LOG_I("Publish resource usage for '" << WbState.BoardPath << "' at " << WbState.BoardPublisherActorId
917-
<< ", reason: " << reason << ", payload: " << payload.ShortDebugString());
822+
if (ResourceManager->ResourceInfoExchanger) {
823+
Send(ResourceManager->ResourceInfoExchanger,
824+
new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload)));
825+
}
918826
}
919827

920828
private:
@@ -927,7 +835,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
927835
TString Tenant;
928836
TString BoardPath;
929837
bool DomainNotFound = false;
930-
TActorId BoardPublisherActorId;
931838
std::optional<TInstant> LastPublishTime;
932839
};
933840
TWhiteBoardState WbState;
@@ -940,7 +847,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
940847
std::shared_ptr<TKqpResourceManager> ResourceManager;
941848

942849
std::optional<TInstant> PublishResourcesScheduledAt;
943-
bool PublishResourcesByExchanger;
944850
std::optional<TString> SelfDataCenterId;
945851
};
946852

ydb/core/kqp/rm_service/kqp_rm_service.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,6 @@ class IKqpResourceManager : private TNonCopyable {
229229
};
230230

231231

232-
NActors::IActor* CreateTakeResourcesSnapshotActor(
233-
const TString& boardPath,
234-
std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback);
235-
236-
237232
struct TResourceSnapshotState {
238233
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> Snapshot;
239234
TMutex Lock;

ydb/core/kqp/rm_service/kqp_rm_ut.cpp

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,12 @@ TResourceBrokerConfig MakeResourceBrokerTestConfig() {
9494
return config;
9595
}
9696

97-
NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig(
98-
bool EnablePublishResourcesByExchanger = false) {
97+
NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig() {
9998
NKikimrConfig::TTableServiceConfig::TResourceManager config;
10099

101100
config.SetComputeActorsCount(100);
102101
config.SetPublishStatisticsIntervalSec(0);
103102
config.SetQueryMemoryLimit(1000);
104-
config.SetEnablePublishResourcesByExchanger(EnablePublishResourcesByExchanger);
105103

106104
auto* infoExchangerRetrySettings = config.MutableInfoExchangerSettings();
107105
auto* exchangerSettings = infoExchangerRetrySettings->MutableExchangerSettings();
@@ -276,12 +274,9 @@ class KqpRm : public TTestBase {
276274
UNIT_TEST(NotEnoughMemory);
277275
UNIT_TEST(NotEnoughExecutionUnits);
278276
UNIT_TEST(ResourceBrokerNotEnoughResources);
279-
UNIT_TEST(SingleSnapshotByStateStorage);
280277
UNIT_TEST(SingleSnapshotByExchanger);
281278
UNIT_TEST(Reduce);
282-
UNIT_TEST(SnapshotSharingByStateStorage);
283279
UNIT_TEST(SnapshotSharingByExchanger);
284-
UNIT_TEST(NodesMembershipByStateStorage);
285280
UNIT_TEST(NodesMembershipByExchanger);
286281
UNIT_TEST(DisonnectNodes);
287282
UNIT_TEST_SUITE_END();
@@ -291,15 +286,12 @@ class KqpRm : public TTestBase {
291286
void NotEnoughMemory();
292287
void NotEnoughExecutionUnits();
293288
void ResourceBrokerNotEnoughResources();
294-
void Snapshot(bool byExchanger);
295-
void SingleSnapshotByStateStorage();
289+
void Snapshot();
296290
void SingleSnapshotByExchanger();
297291
void Reduce();
298-
void SnapshotSharing(bool byExchanger);
299-
void SnapshotSharingByStateStorage();
292+
void SnapshotSharing();
300293
void SnapshotSharingByExchanger();
301-
void NodesMembership(bool byExchanger);
302-
void NodesMembershipByStateStorage();
294+
void NodesMembership();
303295
void NodesMembershipByExchanger();
304296
void DisonnectNodes();
305297

@@ -450,8 +442,8 @@ void KqpRm::ResourceBrokerNotEnoughResources() {
450442
AssertResourceBrokerSensors(0, 1000, 0, 0, 1);
451443
}
452444

453-
void KqpRm::Snapshot(bool byExchanger) {
454-
StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)});
445+
void KqpRm::Snapshot() {
446+
StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()});
455447
NKikimr::TActorSystemStub stub;
456448

457449
auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId());
@@ -490,12 +482,8 @@ void KqpRm::Snapshot(bool byExchanger) {
490482
CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm);
491483
}
492484

493-
void KqpRm::SingleSnapshotByStateStorage() {
494-
Snapshot(false);
495-
}
496-
497485
void KqpRm::SingleSnapshotByExchanger() {
498-
Snapshot(true);
486+
Snapshot();
499487
}
500488

501489
void KqpRm::Reduce() {
@@ -537,8 +525,8 @@ void KqpRm::Reduce() {
537525
AssertResourceBrokerSensors(0, 30, 0, 0, 1);
538526
}
539527

540-
void KqpRm::SnapshotSharing(bool byExchanger) {
541-
StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)});
528+
void KqpRm::SnapshotSharing() {
529+
StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()});
542530
NKikimr::TActorSystemStub stub;
543531

544532
auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
@@ -607,16 +595,12 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
607595
}
608596
}
609597

610-
void KqpRm::SnapshotSharingByStateStorage() {
611-
SnapshotSharing(false);
612-
}
613-
614598
void KqpRm::SnapshotSharingByExchanger() {
615-
SnapshotSharing(true);
599+
SnapshotSharing();
616600
}
617601

618-
void KqpRm::NodesMembership(bool byExchanger) {
619-
StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)});
602+
void KqpRm::NodesMembership() {
603+
StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()});
620604
NKikimr::TActorSystemStub stub;
621605

622606
auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
@@ -641,16 +625,13 @@ void KqpRm::NodesMembership(bool byExchanger) {
641625
CheckSnapshot(0, {{1000, 100}}, rm_first);
642626
}
643627

644-
void KqpRm::NodesMembershipByStateStorage() {
645-
NodesMembership(false);
646-
}
647628

648629
void KqpRm::NodesMembershipByExchanger() {
649-
NodesMembership(true);
630+
NodesMembership();
650631
}
651632

652633
void KqpRm::DisonnectNodes() {
653-
StartRms({MakeKqpResourceManagerConfig(true), MakeKqpResourceManagerConfig(true)});
634+
StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()});
654635
NKikimr::TActorSystemStub stub;
655636

656637
auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());

0 commit comments

Comments
 (0)