Skip to content

Commit c09a984

Browse files
authored
Balancing: add new limits, timeouts and sensors, fix blob on main by ingress, but not realy (#9239)
1 parent cf894d2 commit c09a984

File tree

7 files changed

+123
-24
lines changed

7 files changed

+123
-24
lines changed

ydb/core/base/blobstorage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ struct TEvBlobStorage {
737737
EvHugePreCompact,
738738
EvHugePreCompactResult,
739739
EvPDiskMetadataLoaded,
740+
EvBalancingSendPartsOnMain,
740741

741742
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
742743
EvLogResult,

ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ namespace NBalancing {
5151
} else if (ev->Sender == DeleterId) {
5252
IsDeleteCompleted = true;
5353
} else {
54-
STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB05, "Unexpected id", (Id, ev->Sender));
54+
STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB05, "Unexpected actor id", (Id, ev->Sender));
5555
}
5656
}
5757

@@ -76,9 +76,12 @@ namespace NBalancing {
7676

7777
TBatchedQueue<TPartInfo> SendOnMainParts;
7878
TBatchedQueue<TLogoBlobID> TryDeleteParts;
79+
std::unordered_map<TLogoBlobID, TVector<TPartInfo>> TryDeletePartsFullData; // if part on main by ingress, but actualy it is not, we could not delete it, so we need to send it on main
7980

8081
TBatchManager BatchManager;
8182

83+
TInstant StartTime;
84+
8285
///////////////////////////////////////////////////////////////////////////////////////////
8386
// Main logic
8487
///////////////////////////////////////////////////////////////////////////////////////////
@@ -98,8 +101,12 @@ namespace NBalancing {
98101
}
99102

100103
void ScheduleJobQuant() {
104+
Ctx->MonGroup.ReplTokenAquired()++;
105+
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
106+
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();
107+
101108
// once repl token received, start balancing - waking up sender and deleter
102-
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"),
109+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"),
103110
(SendPartsLeft, SendOnMainParts.Size()), (DeletePartsLeft, TryDeleteParts.Size()),
104111
(ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum()));
105112

@@ -125,7 +132,7 @@ namespace NBalancing {
125132
for (ui32 cnt = 0; It.Valid(); It.Next(), ++cnt) {
126133
if (cnt % 100 == 99 && TDuration::Seconds(timer.Passed()) > JOB_GRANULARITY) {
127134
// actor should not block the thread for a long time, so we should yield
128-
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed()));
135+
// STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed()));
129136
Send(SelfId(), new NActors::TEvents::TEvWakeup());
130137
return;
131138
}
@@ -138,7 +145,7 @@ namespace NBalancing {
138145

139146
auto [moveMask, delMask] = merger.Ingress.HandoffParts(&top, Ctx->VCtx->ShortSelfVDisk, key);
140147

141-
if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty()) {
148+
if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty() && SendOnMainParts.Size() < MAX_TO_SEND_PER_EPOCH) {
142149
// collect parts to send on main
143150
for (const auto& [parts, data]: merger.Parts) {
144151
if (!(partsToSend & parts).Empty()) {
@@ -151,35 +158,71 @@ namespace NBalancing {
151158
}
152159
}
153160

154-
if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty()) {
161+
if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty() && TryDeleteParts.Size() < MAX_TO_DELETE_PER_EPOCH) {
155162
// collect parts to delete
163+
auto key = It.GetCurKey().LogoBlobID();
156164
for (ui8 partIdx = partsToDelete.FirstPosition(); partIdx < partsToDelete.GetSize(); partIdx = partsToDelete.NextPosition(partIdx)) {
157-
TryDeleteParts.Data.emplace_back(TLogoBlobID(It.GetCurKey().LogoBlobID(), partIdx + 1));
165+
TryDeleteParts.Data.emplace_back(TLogoBlobID(key, partIdx + 1));
158166
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Delete"), (LogoBlobId, TryDeleteParts.Data.back().ToString()));
159167
}
168+
169+
for (const auto& [parts, data]: merger.Parts) {
170+
if (!(partsToDelete & parts).Empty()) {
171+
TryDeletePartsFullData[key].emplace_back(TPartInfo{
172+
.Key=key, .PartsMask=parts, .PartData=data
173+
});
174+
}
175+
}
160176
}
161177

162178
merger.Clear();
179+
180+
if (SendOnMainParts.Size() >= MAX_TO_SEND_PER_EPOCH && TryDeleteParts.Size() >= MAX_TO_DELETE_PER_EPOCH) {
181+
// reached the limit of parts to send and delete
182+
break;
183+
}
163184
}
164185

165186
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB08, VDISKP(Ctx->VCtx, "Keys collected"),
166187
(SendOnMainParts, SendOnMainParts.Data.size()), (TryDeleteParts, TryDeleteParts.Data.size()));
167-
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
168-
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();
169188

170189
// start balancing
171190
ContinueBalancing();
172191
}
173192

174193
void Handle(NActors::TEvents::TEvCompleted::TPtr ev) {
175194
BatchManager.Handle(ev);
195+
196+
if (StartTime + EPOCH_TIMEOUT < TlsActivationContext->Now()) {
197+
Ctx->MonGroup.EpochTimeouts()++;
198+
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Epoch timeout"));
199+
PassAway();
200+
}
201+
176202
if (BatchManager.IsBatchCompleted()) {
177203
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
178204

179205
ContinueBalancing();
180206
}
181207
}
182208

209+
void Handle(TEvBalancingSendPartsOnMain::TPtr ev) {
210+
Ctx->MonGroup.OnMainByIngressButNotRealy() += ev->Get()->Ids.size();
211+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB05, VDISKP(Ctx->VCtx, "Received from deleter TEvBalancingSendPartsOnMain"), (Parts, ev->Get()->Ids.size()));
212+
for (const auto& id: ev->Get()->Ids) {
213+
if (auto it = TryDeletePartsFullData.find(TLogoBlobID(id, 0)); it != TryDeletePartsFullData.end()) {
214+
for (const auto& part: it->second) {
215+
if (part.PartsMask.Get(id.PartId() - 1)) {
216+
SendOnMainParts.Data.push_back(part);
217+
break;
218+
}
219+
}
220+
} else {
221+
Y_DEBUG_ABORT_S("Part not found in TryDeletePartsFullData");
222+
}
223+
}
224+
}
225+
183226
///////////////////////////////////////////////////////////////////////////////////////////
184227
// Helper functions
185228
///////////////////////////////////////////////////////////////////////////////////////////
@@ -241,6 +284,7 @@ namespace NBalancing {
241284
cFunc(NActors::TEvents::TEvWakeup::EventType, CollectKeys)
242285
cFunc(TEvReplToken::EventType, ScheduleJobQuant)
243286
hFunc(NActors::TEvents::TEvCompleted, Handle)
287+
hFunc(TEvBalancingSendPartsOnMain, Handle)
244288

245289
// System events
246290
hFunc(NActors::TEvents::TEvUndelivered, Handle)
@@ -259,6 +303,7 @@ namespace NBalancing {
259303
, It(Ctx->Snap.HullCtx, &Ctx->Snap.LogoBlobsSnap)
260304
, SendOnMainParts(BATCH_SIZE)
261305
, TryDeleteParts(BATCH_SIZE)
306+
, StartTime(TlsActivationContext->Now())
262307
{
263308
}
264309

ydb/core/blobstorage/vdisk/balance/defs.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,18 @@ namespace NBalancing {
7272

7373
static constexpr ui32 BATCH_SIZE = 32;
7474

75+
static constexpr ui32 MAX_TO_SEND_PER_EPOCH = 1000;
76+
static constexpr ui32 MAX_TO_DELETE_PER_EPOCH = 1000;
77+
static constexpr TDuration EPOCH_TIMEOUT = TDuration::Minutes(1);
78+
79+
80+
struct TEvBalancingSendPartsOnMain : TEventLocal<TEvBalancingSendPartsOnMain, TEvBlobStorage::EvBalancingSendPartsOnMain> {
81+
TEvBalancingSendPartsOnMain(const TVector<TLogoBlobID>& ids)
82+
: Ids(ids)
83+
{}
84+
85+
TVector<TLogoBlobID> Ids;
86+
};
87+
7588
} // NBalancing
7689
} // NKikimr

ydb/core/blobstorage/vdisk/balance/deleter.cpp

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ namespace {
153153
++Responses;
154154
++Ctx->MonGroup.MarkedReadyToDeleteResponse();
155155
Ctx->MonGroup.MarkedReadyToDeleteWithResponseBytes() += GInfo->GetTopology().GType.PartSize(ev->Get()->Id);
156-
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB21, VDISKP(Ctx->VCtx, "Deleted local"), (LogoBlobID, ev->Get()->Id));
156+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB21, VDISKP(Ctx->VCtx, "Deleted local"), (LogoBlobID, ev->Get()->Id));
157157
}
158158

159159
bool IsDone() const {
@@ -200,7 +200,8 @@ namespace {
200200
if (ev->Get()->Tag != REQUEST_TIMEOUT_TAG) {
201201
return;
202202
}
203-
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB24, VDISKP(Ctx->VCtx, "SendRequestsToCheckPartsOnMain timeout"));
203+
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB24, VDISKP(Ctx->VCtx, "CandidatesToDeleteAskFromMainBatchTimeout"));
204+
Ctx->MonGroup.CandidatesToDeleteAskFromMainBatchTimeout()++;
204205
DeleteLocalParts();
205206
}
206207

@@ -217,23 +218,34 @@ namespace {
217218
// DeleteState
218219
///////////////////////////////////////////////////////////////////////////////////////////
219220

221+
ui32 CheckPartsOnMain() {
222+
TVector<TLogoBlobID> partsNotOnMain;
223+
for (const auto& part: PartsRequester.GetResult()) {
224+
if (!part.HasOnMain) {
225+
partsNotOnMain.push_back(part.Key);
226+
}
227+
}
228+
229+
if (!partsNotOnMain.empty()) {
230+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB29, VDISKP(Ctx->VCtx, "Send TEvBalancingSendPartsOnMain"));
231+
Send(NotifyId, new TEvBalancingSendPartsOnMain(std::move(partsNotOnMain)));
232+
}
233+
234+
ui32 partsOnMain = PartsRequester.GetResult().size() - partsNotOnMain.size();
235+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB30, VDISKP(Ctx->VCtx, "DeleteLocalParts"), (Parts, PartsRequester.GetResult().size()), (PartsOnMain, partsOnMain));
236+
237+
return partsOnMain;
238+
}
239+
220240
void DeleteLocalParts() {
221241
Become(&TThis::DeleteState);
222242

223-
if (PartsRequester.GetResult().empty()) {
243+
if (CheckPartsOnMain() == 0) {
224244
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB25, VDISKP(Ctx->VCtx, "Nothing to delete. PassAway"));
225245
PassAway();
226246
return;
227247
}
228248

229-
{
230-
ui32 partsOnMain = 0;
231-
for (const auto& part: PartsRequester.GetResult()) {
232-
partsOnMain += part.HasOnMain;
233-
}
234-
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB26, VDISKP(Ctx->VCtx, "DeleteLocalParts"), (Parts, PartsRequester.GetResult().size()), (PartsOnMain, partsOnMain));
235-
}
236-
237249
PartsDeleter.DeleteParts(SelfId(), PartsRequester.GetResult());
238250

239251
Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(DELETE_TIMEOUT_TAG)); // delete timeout
@@ -251,12 +263,14 @@ namespace {
251263
if (ev->Get()->Tag != DELETE_TIMEOUT_TAG) {
252264
return;
253265
}
254-
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB28, VDISKP(Ctx->VCtx, "DeleteLocalParts timeout"));
266+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB31, VDISKP(Ctx->VCtx, "MarkReadyBatchTimeout"));
267+
Ctx->MonGroup.MarkReadyBatchTimeout()++;
255268
PassAway();
256269
}
257270

258271
void PassAway() override {
259272
Send(NotifyId, new NActors::TEvents::TEvCompleted(DELETER_ID));
273+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB32, VDISKP(Ctx->VCtx, "TDeleter::PassAway"));
260274
TActorBootstrapped::PassAway();
261275
}
262276

ydb/core/blobstorage/vdisk/balance/sender.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ namespace {
190190
} else {
191191
++Ctx->MonGroup.SentOnMain();
192192
Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetBlobID()));
193-
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString()));
193+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB14, VDISKP(Ctx->VCtx, "Put done"), (Msg, ev->Get()->ToString()));
194194
}
195195
}
196196

@@ -204,7 +204,7 @@ namespace {
204204
}
205205
++Ctx->MonGroup.SentOnMain();
206206
Ctx->MonGroup.SentOnMainWithResponseBytes() += GInfo->GetTopology().GType.PartSize(LogoBlobIDFromLogoBlobID(item.GetBlobID()));
207-
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB16, VDISKP(Ctx->VCtx, "MultiPut done"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString()));
207+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB16, VDISKP(Ctx->VCtx, "MultiPut done"), (Key, LogoBlobIDFromLogoBlobID(item.GetBlobID()).ToString()));
208208
}
209209
}
210210

@@ -229,6 +229,8 @@ namespace {
229229
void ReadPartsFromDisk() {
230230
Become(&TThis::StateRead);
231231

232+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB29, VDISKP(Ctx->VCtx, "ReadPartsFromDisk"), (Parts, Reader.GetPartsSize()));
233+
232234
if (Reader.GetPartsSize() == 0) {
233235
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Nothing to read. PassAway"));
234236
PassAway();
@@ -255,7 +257,8 @@ namespace {
255257
if (ev->Get()->Tag != READ_TIMEOUT_TAG) {
256258
return;
257259
}
258-
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB17, VDISKP(Ctx->VCtx, "TimeoutRead"), (Requests, Reader.GetPartsSize()), (Responses, Reader.GetResponses()));
260+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB17, VDISKP(Ctx->VCtx, "ReadFromHandoffBatchTimeout"), (Requests, Reader.GetPartsSize()), (Responses, Reader.GetResponses()));
261+
Ctx->MonGroup.ReadFromHandoffBatchTimeout()++;
259262
SendPartsOnMain();
260263
}
261264

@@ -274,6 +277,8 @@ namespace {
274277
void SendPartsOnMain() {
275278
Become(&TThis::StateSend);
276279

280+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB29, VDISKP(Ctx->VCtx, "SendPartsOnMain"), (Parts, Reader.GetResult().size()));
281+
277282
if (Reader.GetResult().empty()) {
278283
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB18, VDISKP(Ctx->VCtx, "Nothing to send. PassAway"));
279284
PassAway();
@@ -297,12 +302,14 @@ namespace {
297302
if (ev->Get()->Tag != SEND_TIMEOUT_TAG) {
298303
return;
299304
}
300-
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB19, VDISKP(Ctx->VCtx, "TimeoutSend"));
305+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB19, VDISKP(Ctx->VCtx, "SendOnMainBatchTimeout"));
306+
Ctx->MonGroup.SendOnMainBatchTimeout()++;
301307
PassAway();
302308
}
303309

304310
void PassAway() override {
305311
Send(NotifyId, new NActors::TEvents::TEvCompleted(SENDER_ID));
312+
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB28, VDISKP(Ctx->VCtx, "TSender::PassAway"));
306313
TActorBootstrapped::PassAway();
307314
}
308315

ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,38 +593,53 @@ public:
593593
GROUP_CONSTRUCTOR(TBalancingGroup)
594594
{
595595
COUNTER_INIT(BalancingIterations, true);
596+
COUNTER_INIT(EpochTimeouts, true);
597+
COUNTER_INIT(ReplTokenAquired, true);
598+
COUNTER_INIT(OnMainByIngressButNotRealy, true);
596599

597600
COUNTER_INIT(PlannedToSendOnMain, false);
598601
COUNTER_INIT(CandidatesToDelete, false);
599602

600603
COUNTER_INIT(ReadFromHandoffBytes, true);
601604
COUNTER_INIT(ReadFromHandoffResponseBytes, true);
605+
COUNTER_INIT(ReadFromHandoffBatchTimeout, true);
602606
COUNTER_INIT(SentOnMain, true);
603607
COUNTER_INIT(SentOnMainBytes, true);
604608
COUNTER_INIT(SentOnMainWithResponseBytes, true);
609+
COUNTER_INIT(SendOnMainBatchTimeout, true);
605610

606611
COUNTER_INIT(CandidatesToDeleteAskedFromMain, true);
607612
COUNTER_INIT(CandidatesToDeleteAskedFromMainResponse, true);
613+
COUNTER_INIT(CandidatesToDeleteAskFromMainBatchTimeout, true);
608614
COUNTER_INIT(MarkedReadyToDelete, true);
609615
COUNTER_INIT(MarkedReadyToDeleteBytes, true);
610616
COUNTER_INIT(MarkedReadyToDeleteResponse, true);
611617
COUNTER_INIT(MarkedReadyToDeleteWithResponseBytes, true);
618+
COUNTER_INIT(MarkReadyBatchTimeout, true);
612619
}
613620

614621
COUNTER_DEF(BalancingIterations);
622+
COUNTER_DEF(EpochTimeouts);
623+
COUNTER_DEF(ReplTokenAquired);
624+
COUNTER_DEF(OnMainByIngressButNotRealy);
625+
615626
COUNTER_DEF(PlannedToSendOnMain);
616627
COUNTER_DEF(ReadFromHandoffBytes);
617628
COUNTER_DEF(ReadFromHandoffResponseBytes);
629+
COUNTER_DEF(ReadFromHandoffBatchTimeout);
618630
COUNTER_DEF(SentOnMain);
619631
COUNTER_DEF(SentOnMainBytes);
620632
COUNTER_DEF(SentOnMainWithResponseBytes);
633+
COUNTER_DEF(SendOnMainBatchTimeout);
621634
COUNTER_DEF(CandidatesToDelete);
622635
COUNTER_DEF(CandidatesToDeleteAskedFromMain);
623636
COUNTER_DEF(CandidatesToDeleteAskedFromMainResponse);
637+
COUNTER_DEF(CandidatesToDeleteAskFromMainBatchTimeout);
624638
COUNTER_DEF(MarkedReadyToDelete);
625639
COUNTER_DEF(MarkedReadyToDeleteBytes);
626640
COUNTER_DEF(MarkedReadyToDeleteResponse);
627641
COUNTER_DEF(MarkedReadyToDeleteWithResponseBytes);
642+
COUNTER_DEF(MarkReadyBatchTimeout);
628643
};
629644

630645
///////////////////////////////////////////////////////////////////////////////////

ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2552,6 +2552,10 @@ namespace NKikimr {
25522552
}
25532553

25542554
void RunBalancing(const TActorContext &ctx) {
2555+
if (VCtx->GroupId.GetRawId() == 0) {
2556+
// don't run balancing for the static group
2557+
return;
2558+
}
25552559
if (!Config->FeatureFlags.GetUseVDisksBalancing() || VCtx->Top->GType.GetErasure() == TErasureType::ErasureMirror3of4) {
25562560
return;
25572561
}

0 commit comments

Comments
 (0)