Skip to content

Commit 912fdc1

Browse files
authored
NodeBroker: use deltas when returning recently added nodes (#9122)
1 parent 1afa3ba commit 912fdc1

File tree

5 files changed

+102
-3
lines changed

5 files changed

+102
-3
lines changed

ydb/core/mind/dynamic_nameserver.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
286286
ctx.Schedule(config->Epoch.End - ctx.Now(),
287287
new TEvPrivate::TEvUpdateEpoch(domain, config->Epoch.Id + 1));
288288
} else {
289+
// Note: this update may be optimized to only include new nodes
289290
for (auto &node : rec.GetNodes()) {
290291
auto nodeId = node.GetNodeId();
291292
if (!config->DynamicNodes.contains(nodeId))

ydb/core/mind/node_broker.cpp

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,14 +290,45 @@ void TNodeBroker::AddDelayedListNodesRequest(ui64 epoch,
290290

291291
void TNodeBroker::ProcessListNodesRequest(TEvNodeBroker::TEvListNodes::TPtr &ev)
292292
{
293-
ui64 version = ev->Get()->Record.GetCachedVersion();
293+
auto *msg = ev->Get();
294294

295295
NKikimrNodeBroker::TNodesInfo info;
296296
Epoch.Serialize(*info.MutableEpoch());
297297
info.SetDomain(AppData()->DomainsInfo->GetDomain()->DomainUid);
298298
TAutoPtr<TEvNodeBroker::TEvNodesInfo> resp = new TEvNodeBroker::TEvNodesInfo(info);
299-
if (version != Epoch.Version)
299+
300+
bool optimized = false;
301+
302+
if (msg->Record.HasCachedVersion()) {
303+
if (msg->Record.GetCachedVersion() == Epoch.Version) {
304+
// Client has an up-to-date list already
305+
optimized = true;
306+
} else {
307+
// We may be able to only send added nodes in the same epoch when
308+
// all deltas are cached up to the current epoch inclusive.
309+
ui64 neededFirstVersion = msg->Record.GetCachedVersion() + 1;
310+
if (!EpochDeltasVersions.empty() &&
311+
EpochDeltasVersions.front() <= neededFirstVersion &&
312+
EpochDeltasVersions.back() == Epoch.Version &&
313+
neededFirstVersion <= Epoch.Version)
314+
{
315+
ui64 firstIndex = neededFirstVersion - EpochDeltasVersions.front();
316+
if (firstIndex > 0) {
317+
// Note: usually there is a small number of nodes added
318+
// between subsequent requests, so this substr should be
319+
// very cheap.
320+
resp->PreSerializedData = EpochDeltasCache.substr(EpochDeltasEndOffsets[firstIndex - 1]);
321+
} else {
322+
resp->PreSerializedData = EpochDeltasCache;
323+
}
324+
optimized = true;
325+
}
326+
}
327+
}
328+
329+
if (!optimized) {
300330
resp->PreSerializedData = EpochCache;
331+
}
301332

302333
TabletCounters->Percentile()[COUNTER_LIST_NODES_BYTES].IncrementFor(resp->GetCachedByteSize());
303334
LOG_TRACE_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
@@ -308,12 +339,16 @@ void TNodeBroker::ProcessListNodesRequest(TEvNodeBroker::TEvListNodes::TPtr &ev)
308339

309340
void TNodeBroker::ProcessDelayedListNodesRequests()
310341
{
342+
THashSet<TActorId> processed;
311343
while (!DelayedListNodesRequests.empty()) {
312344
auto it = DelayedListNodesRequests.begin();
313345
if (it->first > Epoch.Id)
314346
break;
315347

316-
ProcessListNodesRequest(it->second);
348+
// Avoid processing more than one request from the same sender
349+
if (processed.insert(it->second->Sender).second) {
350+
ProcessListNodesRequest(it->second);
351+
}
317352
DelayedListNodesRequests.erase(it);
318353
}
319354
}
@@ -432,6 +467,11 @@ void TNodeBroker::PrepareEpochCache()
432467

433468
Y_PROTOBUF_SUPPRESS_NODISCARD info.SerializeToString(&EpochCache);
434469
TabletCounters->Simple()[COUNTER_EPOCH_SIZE_BYTES].Set(EpochCache.Size());
470+
471+
EpochDeltasCache.clear();
472+
EpochDeltasVersions.clear();
473+
EpochDeltasEndOffsets.clear();
474+
TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size());
435475
}
436476

437477
void TNodeBroker::AddNodeToEpochCache(const TNodeInfo &node)
@@ -447,6 +487,17 @@ void TNodeBroker::AddNodeToEpochCache(const TNodeInfo &node)
447487

448488
EpochCache += delta;
449489
TabletCounters->Simple()[COUNTER_EPOCH_SIZE_BYTES].Set(EpochCache.Size());
490+
491+
if (!EpochDeltasVersions.empty() && EpochDeltasVersions.back() + 1 != Epoch.Version) {
492+
EpochDeltasCache.clear();
493+
EpochDeltasVersions.clear();
494+
EpochDeltasEndOffsets.clear();
495+
}
496+
497+
EpochDeltasCache += delta;
498+
EpochDeltasVersions.push_back(Epoch.Version);
499+
EpochDeltasEndOffsets.push_back(EpochDeltasCache.size());
500+
TabletCounters->Simple()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set(EpochDeltasCache.size());
450501
}
451502

452503
void TNodeBroker::SubscribeForConfigUpdates(const TActorContext &ctx)

ydb/core/mind/node_broker_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,10 @@ class TNodeBroker : public TActor<TNodeBroker>
341341
TSchedulerCookieHolder EpochTimerCookieHolder;
342342
TString EpochCache;
343343

344+
TString EpochDeltasCache;
345+
TVector<ui64> EpochDeltasVersions;
346+
TVector<ui64> EpochDeltasEndOffsets;
347+
344348
TTabletCountersBase* TabletCounters;
345349
TAutoPtr<TTabletCountersBase> TabletCountersPtr;
346350

ydb/core/mind/node_broker_ut.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,48 @@ Y_UNIT_TEST_SUITE(TNodeBrokerTest) {
858858
UNIT_ASSERT_VALUES_EQUAL(epoch1.GetId(), epoch.GetId() + 5);
859859
}
860860

861+
Y_UNIT_TEST(TestListNodesEpochDeltas)
862+
{
863+
TTestBasicRuntime runtime(8, false);
864+
Setup(runtime, 10);
865+
TActorId sender = runtime.AllocateEdgeActor();
866+
867+
WaitForEpochUpdate(runtime, sender);
868+
WaitForEpochUpdate(runtime, sender);
869+
870+
auto epoch0 = GetEpoch(runtime, sender);
871+
CheckRegistration(runtime, sender, "host1", 1001, "host1.yandex.net", "1.2.3.4",
872+
1, 2, 3, 4, TStatus::OK, NODE1, epoch0.GetNextEnd());
873+
auto epoch1 = CheckFilteredNodesList(runtime, sender, {NODE1}, {}, 0, epoch0.GetVersion());
874+
CheckRegistration(runtime, sender, "host2", 1001, "host2.yandex.net", "1.2.3.5",
875+
1, 2, 3, 5, TStatus::OK, NODE2, epoch1.GetNextEnd());
876+
auto epoch2 = CheckFilteredNodesList(runtime, sender, {NODE2}, {}, 0, epoch1.GetVersion());
877+
CheckRegistration(runtime, sender, "host3", 1001, "host3.yandex.net", "1.2.3.6",
878+
1, 2, 3, 6, TStatus::OK, NODE3, epoch2.GetNextEnd());
879+
auto epoch3 = CheckFilteredNodesList(runtime, sender, {NODE3}, {}, 0, epoch2.GetVersion());
880+
881+
CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3}, {}, 0, epoch0.GetVersion());
882+
CheckFilteredNodesList(runtime, sender, {NODE2, NODE3}, {}, 0, epoch1.GetVersion());
883+
CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch3.GetVersion());
884+
885+
RebootTablet(runtime, MakeNodeBrokerID(), sender);
886+
CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch3.GetVersion());
887+
888+
CheckRegistration(runtime, sender, "host4", 1001, "host4.yandex.net", "1.2.3.7",
889+
1, 2, 3, 7, TStatus::OK, NODE4, epoch3.GetNextEnd());
890+
auto epoch4 = CheckFilteredNodesList(runtime, sender, {NODE4}, {}, 0, epoch3.GetVersion());
891+
892+
// NodeBroker doesn't have enough history in memory and replies with the full node list
893+
CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE4}, {}, 0, epoch2.GetVersion());
894+
895+
WaitForEpochUpdate(runtime, sender);
896+
auto epoch5 = GetEpoch(runtime, sender);
897+
CheckFilteredNodesList(runtime, sender, {}, {}, 0, epoch5.GetVersion());
898+
899+
// New epoch may remove nodes, so deltas are not returned on epoch change
900+
CheckFilteredNodesList(runtime, sender, {NODE1, NODE2, NODE3, NODE4}, {}, 0, epoch3.GetVersion());
901+
}
902+
861903
Y_UNIT_TEST(TestRandomActions)
862904
{
863905
TTestBasicRuntime runtime(8, false);

ydb/core/protos/counters_node_broker.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ option (NKikimr.TabletTypeName) = "NodeBroker"; // Used as prefix for all counte
88

99
enum ESimpleCounters {
1010
COUNTER_EPOCH_SIZE_BYTES = 0 [(CounterOpts) = {Name: "EpochSizeBytes"}];
11+
COUNTER_EPOCH_DELTAS_SIZE_BYTES = 1 [(CounterOpts) = {Name: "EpochDeltasSizeBytes"}];
1112
}
1213

1314
enum ECumulativeCounters {

0 commit comments

Comments
 (0)