Skip to content

Commit fb39834

Browse files
committed
ensure boot queue progress (#16713)
1 parent 4b8e06e commit fb39834

File tree

7 files changed

+150
-29
lines changed

7 files changed

+150
-29
lines changed

ydb/core/mind/hive/boot_queue.cpp

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,54 @@ void TBootQueue::AddToBootQueue(TBootQueueRecord record) {
1717
}
1818

1919
TBootQueue::TBootQueueRecord TBootQueue::PopFromBootQueue() {
20-
TBootQueueRecord record = BootQueue.top();
21-
BootQueue.pop();
20+
TQueue& currentQueue = GetCurrentQueue();
21+
TBootQueueRecord record = currentQueue.top();
22+
currentQueue.pop();
23+
if (ProcessWaitQueue) {
24+
NextFromWaitQueue = !NextFromWaitQueue;
25+
}
2226
return record;
2327
}
2428

2529
void TBootQueue::AddToWaitQueue(TBootQueueRecord record) {
26-
WaitQueue.emplace_back(record);
30+
WaitQueue.push(record);
31+
}
32+
33+
void TBootQueue::IncludeWaitQueue() {
34+
ProcessWaitQueue = true;
2735
}
2836

29-
void TBootQueue::MoveFromWaitQueueToBootQueue() {
30-
for (TBootQueueRecord record : WaitQueue) {
31-
AddToBootQueue(record);
37+
void TBootQueue::ExcludeWaitQueue() {
38+
ProcessWaitQueue = false;
39+
}
40+
41+
bool TBootQueue::Empty() const {
42+
if (ProcessWaitQueue) {
43+
return BootQueue.empty() && WaitQueue.empty();
44+
} else {
45+
return BootQueue.empty();
46+
}
47+
}
48+
49+
size_t TBootQueue::Size() const {
50+
if (ProcessWaitQueue) {
51+
return BootQueue.size() + WaitQueue.size();
52+
} else {
53+
return BootQueue.size();
54+
}
55+
}
56+
57+
TBootQueue::TQueue& TBootQueue::GetCurrentQueue() {
58+
if (BootQueue.empty()) {
59+
return WaitQueue;
60+
}
61+
if (WaitQueue.empty()) {
62+
return BootQueue;
63+
}
64+
if (ProcessWaitQueue && NextFromWaitQueue) {
65+
return WaitQueue;
3266
}
33-
WaitQueue.clear();
67+
return BootQueue;
3468
}
3569

3670
}

ydb/core/mind/hive/boot_queue.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,30 @@ struct TBootQueue {
5252

5353
static_assert(sizeof(TBootQueueRecord) <= 24);
5454

55-
std::priority_queue<TBootQueueRecord, std::vector<TBootQueueRecord>> BootQueue;
56-
std::deque<TBootQueueRecord> WaitQueue; // tablets from BootQueue waiting for new nodes
55+
using TQueue = TPriorityQueue<TBootQueueRecord>;
5756

57+
TQueue BootQueue;
58+
TQueue WaitQueue; // tablets from BootQueue waiting for new nodes
59+
private:
60+
bool ProcessWaitQueue = false;
61+
bool NextFromWaitQueue = false;
62+
63+
public:
5864
void AddToBootQueue(TBootQueueRecord record);
5965
TBootQueueRecord PopFromBootQueue();
6066
void AddToWaitQueue(TBootQueueRecord record);
61-
void MoveFromWaitQueueToBootQueue();
67+
void IncludeWaitQueue();
68+
void ExcludeWaitQueue();
69+
bool Empty() const;
70+
size_t Size() const;
6271

6372
template<typename... Args>
6473
void EmplaceToBootQueue(Args&&... args) {
6574
BootQueue.emplace(args...);
6675
}
76+
77+
private:
78+
TQueue& GetCurrentQueue();
6779
};
6880

6981
}

ydb/core/mind/hive/hive_events.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ struct TEvPrivate {
5151
{}
5252
};
5353

54-
struct TEvProcessBootQueue : TEventLocal<TEvProcessBootQueue, EvProcessBootQueue> {};
54+
struct TEvProcessBootQueue : TEventLocal<TEvProcessBootQueue, EvProcessBootQueue> {
55+
bool ProcessWaitQueue = false; // Only for use in tests
56+
};
5557

5658
struct TEvPostponeProcessBootQueue : TEventLocal<TEvPostponeProcessBootQueue, EvPostponeProcessBootQueue> {};
5759

ydb/core/mind/hive/hive_impl.cpp

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,17 @@ void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb&, TSideEffects& sideEffects)
223223
THPTimer bootQueueProcessingTimer;
224224
if (ProcessWaitQueueScheduled) {
225225
BLOG_D("Handle ProcessWaitQueue (size: " << BootQueue.WaitQueue.size() << ")");
226-
BootQueue.MoveFromWaitQueueToBootQueue();
226+
BootQueue.IncludeWaitQueue();
227227
ProcessWaitQueueScheduled = false;
228228
}
229229
ProcessBootQueueScheduled = false;
230230
ui64 processedItems = 0;
231231
ui64 tabletsStarted = 0;
232232
TInstant postponedStart;
233233
TStackVec<TBootQueue::TBootQueueRecord> delayedTablets;
234-
while (!BootQueue.BootQueue.empty() && processedItems < GetMaxBootBatchSize()) {
234+
std::vector<TBootQueue::TBootQueueRecord> waitingTablets;
235+
waitingTablets.reserve(std::min<size_t>(BootQueue.Size(), GetMaxBootBatchSize()));
236+
while (!BootQueue.Empty() && processedItems < GetMaxBootBatchSize()) {
235237
TBootQueue::TBootQueueRecord record = BootQueue.PopFromBootQueue();
236238
BLOG_TRACE("Tablet " << record.TabletId << "." << record.FollowerId << " has priority " << record.Priority);
237239
++processedItems;
@@ -260,7 +262,7 @@ void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb&, TSideEffects& sideEffects)
260262
sideEffects.Send(actorToNotify, new TEvPrivate::TEvRestartComplete(tablet->GetFullTabletId(), "boot delay"));
261263
}
262264
tablet->ActorsToNotifyOnRestart.clear();
263-
BootQueue.AddToWaitQueue(record); // waiting for new node
265+
waitingTablets.push_back(record); // waiting for new node
264266
tablet->InWaitQueue = true;
265267
continue;
266268
}
@@ -280,10 +282,16 @@ void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb&, TSideEffects& sideEffects)
280282
delayedTablets.push_back(record);
281283
}
282284
}
285+
if (waitingTablets.size() == processedItems || BootQueue.WaitQueue.empty()) {
286+
BootQueue.ExcludeWaitQueue();
287+
}
283288
for (TBootQueue::TBootQueueRecord record : delayedTablets) {
284289
record.Priority -= 1;
285290
BootQueue.AddToBootQueue(record);
286291
}
292+
for (auto& record : waitingTablets) {
293+
BootQueue.AddToWaitQueue(record);
294+
}
287295
if (TabletCounters != nullptr) {
288296
UpdateCounterBootQueueSize(BootQueue.BootQueue.size());
289297
TabletCounters->Simple()[NHive::COUNTER_WAITQUEUE_SIZE].Set(BootQueue.WaitQueue.size());
@@ -301,7 +309,7 @@ void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb&, TSideEffects& sideEffects)
301309
BLOG_D("ProcessBootQueue - BootQueue throttling (size: " << BootQueue.BootQueue.size() << ")");
302310
return;
303311
}
304-
if (processedItems == GetMaxBootBatchSize()) {
312+
if (processedItems == GetMaxBootBatchSize() && !BootQueue.Empty()) {
305313
BLOG_D("ProcessBootQueue - rescheduling");
306314
ProcessBootQueue();
307315
} else if (postponedStart > now) {
@@ -316,8 +324,11 @@ void THive::HandleInit(TEvPrivate::TEvProcessBootQueue::TPtr&) {
316324
Schedule(TDuration::Seconds(1), new TEvPrivate::TEvProcessBootQueue());
317325
}
318326

319-
void THive::Handle(TEvPrivate::TEvProcessBootQueue::TPtr&) {
327+
void THive::Handle(TEvPrivate::TEvProcessBootQueue::TPtr& ev) {
320328
BLOG_TRACE("ProcessBootQueue - executing");
329+
if (ev->Get()->ProcessWaitQueue) {
330+
ProcessWaitQueue();
331+
}
321332
Execute(CreateProcessBootQueue());
322333
}
323334

@@ -2240,14 +2251,14 @@ TResourceRawValues THive::GetDefaultResourceInitialMaximumValues() {
22402251
}
22412252

22422253
void THive::ProcessTabletBalancer() {
2243-
if (!ProcessTabletBalancerScheduled && !ProcessTabletBalancerPostponed && BootQueue.BootQueue.empty()) {
2254+
if (!ProcessTabletBalancerScheduled && !ProcessTabletBalancerPostponed && BootQueue.Empty()) {
22442255
Schedule(GetBalancerCooldown(LastBalancerTrigger), new TEvPrivate::TEvProcessTabletBalancer());
22452256
ProcessTabletBalancerScheduled = true;
22462257
}
22472258
}
22482259

22492260
void THive::ProcessStorageBalancer() {
2250-
if (!ProcessStorageBalancerScheduled && BootQueue.BootQueue.empty()) {
2261+
if (!ProcessStorageBalancerScheduled && BootQueue.Empty()) {
22512262
Schedule(GetBalancerCooldown(EBalancerType::Storage), new TEvPrivate::TEvProcessStorageBalancer());
22522263
ProcessStorageBalancerScheduled = true;
22532264
}
@@ -2484,10 +2495,7 @@ void THive::UpdateTotalResourceValues(
24842495
TInstant now = TInstant::Now();
24852496

24862497
if (LastResourceChangeReaction + GetResourceChangeReactionPeriod() < now) {
2487-
// in case we had overloaded nodes
2488-
if (!BootQueue.WaitQueue.empty()) {
2489-
ProcessWaitQueue();
2490-
} else if (!BootQueue.BootQueue.empty()) {
2498+
if (!BootQueue.Empty()) {
24912499
ProcessBootQueue();
24922500
}
24932501
ProcessTabletBalancer();

ydb/core/mind/hive/hive_impl_ut.cpp

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,29 @@ Y_UNIT_TEST_SUITE(THiveImplTest) {
5858
timer.Reset();
5959

6060
double maxP = 100;
61+
std::vector<TBootQueue::TBootQueueRecord> records;
62+
records.reserve(NUM_TABLETS);
63+
unsigned i = 0;
6164

62-
while (!bootQueue.BootQueue.empty()) {
65+
while (!bootQueue.Empty()) {
6366
auto record = bootQueue.PopFromBootQueue();
6467
UNIT_ASSERT(record.Priority <= maxP);
6568
maxP = record.Priority;
66-
auto itTablet = tablets.find(record.TabletId);
67-
if (itTablet != tablets.end()) {
68-
bootQueue.AddToWaitQueue(itTablet->second);
69+
UNIT_ASSERT(tablets.contains(record.TabletId));
70+
records.push_back(record);
71+
if (++i == NUM_TABLETS / 2) {
72+
// to test both modes
73+
bootQueue.IncludeWaitQueue();
74+
}
75+
}
76+
bootQueue.ExcludeWaitQueue();
77+
78+
i = 0;
79+
for (auto& record : records) {
80+
if (++i % 3 == 0) {
81+
bootQueue.AddToBootQueue(record);
82+
} else {
83+
bootQueue.AddToWaitQueue(record);
6984
}
7085
}
7186

@@ -81,7 +96,11 @@ Y_UNIT_TEST_SUITE(THiveImplTest) {
8196

8297
timer.Reset();
8398

84-
bootQueue.MoveFromWaitQueueToBootQueue();
99+
bootQueue.IncludeWaitQueue();
100+
while (!bootQueue.Empty()) {
101+
bootQueue.PopFromBootQueue();
102+
}
103+
bootQueue.ExcludeWaitQueue();
85104

86105
passed = timer.Get().SecondsFloat();
87106
Ctest << "Move = " << passed << Endl;

ydb/core/mind/hive/hive_ut.cpp

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,14 @@ Y_UNIT_TEST_SUITE(THiveTest) {
634634
runtime.SendToPipe(hiveTablet, senderB, new TEvHive::TEvReassignTabletSpace(tabletId, channels), 0, GetPipeConfigWithRetries());
635635
}
636636

637+
TActorId GetHiveActor(TTestActorRuntime& runtime, ui64 hiveTablet) {
638+
TActorId senderB = runtime.AllocateEdgeActor(0);
639+
runtime.SendToPipe(hiveTablet, senderB, new TEvHive::TEvTabletMetrics, 0, GetPipeConfigWithRetries());
640+
TAutoPtr<IEventHandle> handle;
641+
runtime.GrabEdgeEventRethrow<TEvLocal::TEvTabletMetricsAck>(handle);
642+
return handle->Sender;
643+
}
644+
637645
void MakeSureTabletIsDown(TTestActorRuntime &runtime, ui64 tabletId, ui32 nodeIndex) {
638646
TActorId sender = runtime.AllocateEdgeActor(nodeIndex);
639647
runtime.ConnectToPipe(tabletId, sender, nodeIndex, NTabletPipe::TClientConfig());
@@ -3475,7 +3483,8 @@ Y_UNIT_TEST_SUITE(THiveTest) {
34753483
TVector<ui64> tabletIds;
34763484
const ui64 hiveTablet = MakeDefaultHiveID();
34773485
const ui64 testerTablet = MakeTabletID(false, 1);
3478-
CreateTestBootstrapper(runtime, CreateTestTabletInfo(hiveTablet, TTabletTypes::Hive), &CreateDefaultHive);
3486+
const TActorId bootstrapper = CreateTestBootstrapper(runtime, CreateTestTabletInfo(hiveTablet, TTabletTypes::Hive), &CreateDefaultHive);
3487+
runtime.EnableScheduleForActor(bootstrapper);
34793488
{
34803489
TDispatchOptions options;
34813490
options.FinalEvents.emplace_back(TEvLocal::EvSyncTablets, runtime.GetNodeCount());
@@ -7039,6 +7048,43 @@ Y_UNIT_TEST_SUITE(THiveTest) {
70397048
}
70407049
}
70417050

7051+
Y_UNIT_TEST(TestBootProgress) {
7052+
TTestBasicRuntime runtime(1, false);
7053+
Setup(runtime, true, 3, [](TAppPrepare& app) {
7054+
app.HiveConfig.SetMaxBootBatchSize(1);
7055+
app.HiveConfig.SetResourceChangeReactionPeriod(0);
7056+
});
7057+
const ui64 hiveTablet = MakeDefaultHiveID();
7058+
const ui64 testerTablet = MakeTabletID(false, 1);
7059+
const TActorId bootstrapper = CreateTestBootstrapper(runtime, CreateTestTabletInfo(hiveTablet, TTabletTypes::Hive), &CreateDefaultHive);
7060+
runtime.EnableScheduleForActor(bootstrapper);
7061+
{
7062+
TDispatchOptions options;
7063+
options.FinalEvents.emplace_back(TEvLocal::EvSyncTablets, runtime.GetNodeCount());
7064+
runtime.DispatchEvents(options, TDuration::Zero());
7065+
}
7066+
for (int i = 0; i < 5; ++i) {
7067+
THolder<TEvHive::TEvCreateTablet> ev(new TEvHive::TEvCreateTablet(testerTablet, 100500 + i, TTabletTypes::Hive, BINDED_CHANNELS));
7068+
ev->Record.AddAllowedDomains();
7069+
ev->Record.MutableAllowedDomains(0)->SetSchemeShard(52); // garbage domain id - these tablets will never boot
7070+
ev->Record.MutableAllowedDomains(0)->SetPathId(42);
7071+
SendCreateTestTablet(runtime, hiveTablet, testerTablet, std::move(ev), 0, false);
7072+
}
7073+
TActorId hiveActor = GetHiveActor(runtime, hiveTablet);
7074+
// Simulate a situation when wait queue is constantly processed
7075+
// this could happen e. g. when nodes are often restarting
7076+
// (previously it would happen all the time because of metric updates)
7077+
auto handler = runtime.AddObserver<NHive::TEvPrivate::TEvProcessBootQueue>([=](auto&& ev) {
7078+
if (ev->Recipient == hiveActor) {
7079+
ev->Get()->ProcessWaitQueue = true;
7080+
}
7081+
});
7082+
THolder<TEvHive::TEvCreateTablet> ev(new TEvHive::TEvCreateTablet(testerTablet, 100505, TTabletTypes::Dummy, BINDED_CHANNELS));
7083+
auto tabletId = SendCreateTestTablet(runtime, hiveTablet, testerTablet, std::move(ev), 0, false);
7084+
MakeSureTabletIsUp(runtime, tabletId, 0);
7085+
7086+
}
7087+
70427088
Y_UNIT_TEST(TestStopTenant) {
70437089
TTestBasicRuntime runtime(2, false);
70447090
Setup(runtime, true);

ydb/core/mind/hive/monitoring.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ class TTxMonEvent_MemStateTablets : public TTransactionBase<THive> {
245245
}
246246
if (WaitingOnly) {
247247
tabletIdIndex.reserve(Self->BootQueue.WaitQueue.size());
248-
for (const TBootQueue::TBootQueueRecord& rec : Self->BootQueue.WaitQueue) {
248+
for (const TBootQueue::TBootQueueRecord& rec : Self->BootQueue.WaitQueue.Container()) {
249249
TTabletInfo* tablet = Self->FindTablet(rec.TabletId, rec.FollowerId);
250250
if (tablet != nullptr) {
251251
tabletIdIndex.push_back({tabletIndexFunction(*tablet), tablet});

0 commit comments

Comments
 (0)