Skip to content

Commit f3c9a60

Browse files
authored
Add ListNodes cache in DynamicNameserver (#12694)
1 parent ae2a5a5 commit f3c9a60

File tree

15 files changed

+202
-105
lines changed

15 files changed

+202
-105
lines changed

ydb/core/blobstorage/nodewarden/distconf.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ namespace NKikimr::NStorage {
1919
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
2020

2121
auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
22-
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
22+
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
2323
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
24-
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
24+
nodes->emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
2525
}
26+
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>(nodes);
2627
Send(SelfId(), ev.release());
2728

2829
// and subscribe for the node list too

ydb/core/fq/libs/actors/nodes_manager.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,14 +318,13 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
318318
void HandleResponse(NFq::TEvInternalService::TEvHealthCheckResponse::TPtr& ev) {
319319
try {
320320
const auto& status = ev->Get()->Status.GetStatus();
321-
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo());
322321
if (!ev->Get()->Status.IsSuccess()) {
323322
ythrow yexception() << status << '\n' << ev->Get()->Status.GetIssues().ToString();
324323
}
325324
const auto& res = ev->Get()->Result;
326325

327-
auto& nodesInfo = nameServiceUpdateReq->Nodes;
328-
nodesInfo.reserve(res.nodes().size());
326+
auto nodesInfo = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
327+
nodesInfo->reserve(res.nodes().size());
329328

330329
Peers.clear();
331330
std::set<ui32> nodeIds; // may be not unique
@@ -340,7 +339,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
340339
node.active_workers(), node.memory_limit(), node.memory_allocated(), node.data_center()});
341340

342341
if (node.interconnect_port()) {
343-
nodesInfo.emplace_back(TEvInterconnect::TNodeInfo{
342+
nodesInfo->emplace_back(TEvInterconnect::TNodeInfo{
344343
node.node_id(),
345344
node.node_address(),
346345
node.hostname(), // host
@@ -356,8 +355,9 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
356355
ServiceCounters.Counters->GetCounter("PeerCount", false)->Set(Peers.size());
357356
ServiceCounters.Counters->GetCounter("NodesHealthCheckOk", true)->Inc();
358357

359-
LOG_T("Send NodeInfo with size: " << nodesInfo.size() << " to DynamicNameserver");
360-
if (!nodesInfo.empty()) {
358+
LOG_T("Send NodeInfo with size: " << nodesInfo->size() << " to DynamicNameserver");
359+
if (!nodesInfo->empty()) {
360+
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo(nodesInfo));
361361
Send(GetNameserviceActorId(), nameServiceUpdateReq.Release());
362362
}
363363
} catch (yexception &e) {

ydb/core/health_check/health_check_ut.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,15 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
337337
}
338338

339339
void SetLongHostValue(TEvInterconnect::TEvNodesInfo::TPtr* ev) {
340+
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>((*ev)->Get()->Nodes);
340341
TString host(1000000, 'a');
341-
auto& pbRecord = (*ev)->Get()->Nodes;
342-
for (auto itIssue = pbRecord.begin(); itIssue != pbRecord.end(); ++itIssue) {
343-
itIssue->Host = host;
342+
for (auto it = nodes->begin(); it != nodes->end(); ++it) {
343+
it->Host = host;
344344
}
345+
auto newEv = IEventHandle::Downcast<TEvInterconnect::TEvNodesInfo>(
346+
new IEventHandle((*ev)->Recipient, (*ev)->Sender, new TEvInterconnect::TEvNodesInfo(nodes))
347+
);
348+
ev->Swap(newEv);
345349
}
346350

347351
Ydb::Monitoring::SelfCheckResult RequestHc(size_t const groupNumber, size_t const vdiscPerGroupNumber, bool const isMergeRecords = false, bool const largeSizeVdisksIssues = false) {

ydb/core/mind/bscontroller/impl.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,9 +1416,9 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
14161416
TNodeId NodeId;
14171417
TNodeLocation Location;
14181418

1419-
THostRecord(TEvInterconnect::TNodeInfo&& nodeInfo)
1419+
THostRecord(const TEvInterconnect::TNodeInfo& nodeInfo)
14201420
: NodeId(nodeInfo.NodeId)
1421-
, Location(std::move(nodeInfo.Location))
1421+
, Location(nodeInfo.Location)
14221422
{}
14231423

14241424
THostRecord(const NKikimrBlobStorage::TNodeIdentifier& node)
@@ -1432,11 +1432,13 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
14321432
THashMap<TNodeId, THostId> NodeIdToHostId;
14331433

14341434
public:
1435+
THostRecordMapImpl() = default;
1436+
14351437
THostRecordMapImpl(TEvInterconnect::TEvNodesInfo *msg) {
1436-
for (TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
1438+
for (const TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
14371439
const THostId hostId(nodeInfo.Host, nodeInfo.Port);
14381440
NodeIdToHostId.emplace(nodeInfo.NodeId, hostId);
1439-
HostIdToRecord.emplace(hostId, std::move(nodeInfo));
1441+
HostIdToRecord.emplace(hostId, nodeInfo);
14401442
}
14411443
}
14421444

@@ -1824,8 +1826,7 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
18241826

18251827
// For test purposes, required for self heal actor
18261828
void CreateEmptyHostRecordsMap() {
1827-
TEvInterconnect::TEvNodesInfo nodes;
1828-
HostRecords = std::make_shared<THostRecordMapImpl>(&nodes);
1829+
HostRecords = std::make_shared<THostRecordMapImpl>();
18291830
}
18301831

18311832
ui64 NextConfigTxSeqNo = 1;

ydb/core/mind/dynamic_nameserver.cpp

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,18 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,
6767

6868
if (rec.GetStatus().GetCode() != NKikimrNodeBroker::TStatus::OK) {
6969
// Reset proxy if node expired.
70-
if (exists)
70+
if (exists) {
7171
ResetInterconnectProxyConfig(NodeId, ctx);
72+
ListNodesCache->Invalidate(); // node was erased
73+
}
7274
ReplyWithErrorAndDie(ctx);
7375
return;
7476
}
7577

7678
TDynamicConfig::TDynamicNodeInfo node(rec.GetNode());
79+
if (!exists || !oldNode.EqualExceptExpire(node)) {
80+
ListNodesCache->Invalidate();
81+
}
7782

7883
// If ID is re-used by another node then proxy has to be reset.
7984
if (exists && !oldNode.EqualExceptExpire(node))
@@ -146,6 +151,7 @@ void TDynamicNameserver::Handle(TEvNodeWardenStorageConfig::TPtr ev) {
146151
auto newStaticConfig = BuildNameserverTable(config);
147152
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
148153
StaticConfig = std::move(newStaticConfig);
154+
ListNodesCache->Invalidate();
149155
for (const auto& subscriber : StaticNodeChangeSubscribers) {
150156
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
151157
}
@@ -221,31 +227,40 @@ void TDynamicNameserver::ResolveDynamicNode(ui32 nodeId,
221227
reply->NodeId = nodeId;
222228
ctx.Send(ev->Sender, reply);
223229
} else {
224-
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain], ev, deadline));
230+
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain],
231+
ListNodesCache, ev, deadline));
225232
}
226233
}
227234

228235
void TDynamicNameserver::SendNodesList(const TActorContext &ctx)
229-
{
236+
{
230237
auto now = ctx.Now();
231-
for (auto &sender : ListNodesQueue) {
232-
THolder<TEvInterconnect::TEvNodesInfo> reply(new TEvInterconnect::TEvNodesInfo);
238+
if (ListNodesCache->NeedUpdate(now)) {
239+
auto newNodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
240+
auto newExpire = now;
241+
233242
for (const auto &pr : StaticConfig->StaticNodeTable) {
234-
reply->Nodes.emplace_back(pr.first,
235-
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
236-
pr.second.Port, pr.second.Location, true);
243+
newNodes->emplace_back(pr.first,
244+
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
245+
pr.second.Port, pr.second.Location, true);
237246
}
238247

239248
for (auto &config : DynamicConfigs) {
240249
for (auto &pr : config->DynamicNodes) {
241-
if (pr.second.Expire > now)
242-
reply->Nodes.emplace_back(pr.first, pr.second.Address,
243-
pr.second.Host, pr.second.ResolveHost,
244-
pr.second.Port, pr.second.Location, false);
250+
if (pr.second.Expire > now) {
251+
newNodes->emplace_back(pr.first, pr.second.Address,
252+
pr.second.Host, pr.second.ResolveHost,
253+
pr.second.Port, pr.second.Location, false);
254+
newExpire = std::min(newExpire, pr.second.Expire);
255+
}
245256
}
246257
}
247258

248-
ctx.Send(sender, reply.Release());
259+
ListNodesCache->Update(newNodes, newExpire);
260+
}
261+
262+
for (auto &sender : ListNodesQueue) {
263+
ctx.Send(sender, new TEvInterconnect::TEvNodesInfo(ListNodesCache->GetNodes()));
249264
}
250265
ListNodesQueue.clear();
251266
}
@@ -301,15 +316,18 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
301316
config->ExpiredNodes.emplace(node.GetNodeId(), info);
302317
}
303318

319+
ListNodesCache->Invalidate();
304320
config->Epoch = rec.GetEpoch();
305321
ctx.Schedule(config->Epoch.End - ctx.Now(),
306322
new TEvPrivate::TEvUpdateEpoch(domain, config->Epoch.Id + 1));
307323
} else {
308324
// Note: this update may be optimized to only include new nodes
309325
for (auto &node : rec.GetNodes()) {
310326
auto nodeId = node.GetNodeId();
311-
if (!config->DynamicNodes.contains(nodeId))
327+
if (!config->DynamicNodes.contains(nodeId)) {
312328
config->DynamicNodes.emplace(nodeId, node);
329+
ListNodesCache->Invalidate();
330+
}
313331
}
314332
config->Epoch = rec.GetEpoch();
315333
}
@@ -396,8 +414,8 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvGetNode::TPtr &ev, const TAc
396414
ctx.Send(ev->Sender, reply.Release());
397415
} else {
398416
const TInstant deadline = ev->Get()->Deadline;
399-
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain], ev.Release(),
400-
deadline));
417+
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain],
418+
ListNodesCache, ev.Release(), deadline));
401419
}
402420
}
403421
}
@@ -456,6 +474,7 @@ void TDynamicNameserver::Handle(NConsole::TEvConsole::TEvConfigNotificationReque
456474
auto newStaticConfig = BuildNameserverTable(config.GetNameserviceConfig());
457475
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
458476
StaticConfig = std::move(newStaticConfig);
477+
ListNodesCache->Invalidate();
459478
for (const auto& subscriber : StaticNodeChangeSubscribers) {
460479
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
461480
}
@@ -507,5 +526,29 @@ TIntrusivePtr<TTableNameserverSetup> BuildNameserverTable(const NKikimrBlobStora
507526
return table;
508527
}
509528

529+
TListNodesCache::TListNodesCache()
530+
: Nodes(nullptr)
531+
, Expire(TInstant::Zero())
532+
{}
533+
534+
535+
void TListNodesCache::Update(TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr newNodes, TInstant newExpire) {
536+
Nodes = newNodes;
537+
Expire = newExpire;
538+
}
539+
540+
void TListNodesCache::Invalidate() {
541+
Nodes = nullptr;
542+
Expire = TInstant::Zero();
543+
}
544+
545+
bool TListNodesCache::NeedUpdate(TInstant now) const {
546+
return Nodes == nullptr || now > Expire;
547+
}
548+
549+
TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr TListNodesCache::GetNodes() const {
550+
return Nodes;
551+
}
552+
510553
} // NNodeBroker
511554
} // NKikimr

ydb/core/mind/dynamic_nameserver_impl.h

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,19 @@ struct TDynamicConfig : public TThrRefBase {
6969

7070
using TDynamicConfigPtr = TIntrusivePtr<TDynamicConfig>;
7171

72+
class TListNodesCache : public TSimpleRefCount<TListNodesCache> {
73+
public:
74+
TListNodesCache();
75+
76+
void Update(TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr newNodes, TInstant newExpire);
77+
void Invalidate();
78+
bool NeedUpdate(TInstant now) const;
79+
TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr GetNodes() const;
80+
private:
81+
TIntrusiveVector<TEvInterconnect::TNodeInfo>::TConstPtr Nodes;
82+
TInstant Expire;
83+
};
84+
7285
class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverBase> {
7386
public:
7487
using TBase = TActorBootstrapped<TDynamicNodeResolverBase>;
@@ -78,10 +91,12 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
7891
}
7992

8093
TDynamicNodeResolverBase(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
94+
TIntrusivePtr<TListNodesCache> listNodesCache,
8195
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
8296
: Owner(owner)
8397
, NodeId(nodeId)
8498
, Config(config)
99+
, ListNodesCache(listNodesCache)
85100
, OrigRequest(origRequest)
86101
, Deadline(deadline)
87102
{
@@ -118,6 +133,7 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
118133
TActorId Owner;
119134
ui32 NodeId;
120135
TDynamicConfigPtr Config;
136+
TIntrusivePtr<TListNodesCache> ListNodesCache;
121137
TAutoPtr<IEventHandle> OrigRequest;
122138
const TInstant Deadline;
123139

@@ -128,8 +144,9 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
128144
class TDynamicNodeResolver : public TDynamicNodeResolverBase {
129145
public:
130146
TDynamicNodeResolver(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
147+
TIntrusivePtr<TListNodesCache> listNodesCache,
131148
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
132-
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
149+
: TDynamicNodeResolverBase(owner, nodeId, config, listNodesCache, origRequest, deadline)
133150
{
134151
}
135152

@@ -140,8 +157,9 @@ class TDynamicNodeResolver : public TDynamicNodeResolverBase {
140157
class TDynamicNodeSearcher : public TDynamicNodeResolverBase {
141158
public:
142159
TDynamicNodeSearcher(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
160+
TIntrusivePtr<TListNodesCache> listNodesCache,
143161
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
144-
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
162+
: TDynamicNodeResolverBase(owner, nodeId, config, listNodesCache, origRequest, deadline)
145163
{
146164
}
147165

@@ -180,6 +198,7 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {
180198

181199
TDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup> &setup, ui32 resolvePoolId)
182200
: StaticConfig(setup)
201+
, ListNodesCache(MakeIntrusive<TListNodesCache>())
183202
, ResolvePoolId(resolvePoolId)
184203
{
185204
Y_ABORT_UNLESS(StaticConfig->IsEntriesUnique());
@@ -258,6 +277,8 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {
258277
TIntrusivePtr<TTableNameserverSetup> StaticConfig;
259278
std::array<TDynamicConfigPtr, DOMAINS_COUNT> DynamicConfigs;
260279
TVector<TActorId> ListNodesQueue;
280+
TIntrusivePtr<TListNodesCache> ListNodesCache;
281+
261282
std::array<TActorId, DOMAINS_COUNT> NodeBrokerPipes;
262283
// When ListNodes requests are sent to NodeBroker tablets this
263284
// bitmap indicates domains which didn't answer yet.

0 commit comments

Comments
 (0)