@@ -73,6 +73,7 @@ namespace NBalancing {
73
73
TLogoBlobsSnapshot::TForwardIterator It;
74
74
TQueueActorMapPtr QueueActorMapPtr;
75
75
THashSet<TVDiskID> ConnectedVDisks;
76
+ THashSet<TVDiskID> GoodStatusVDisks;
76
77
77
78
TBatchedQueue<TPartInfo> SendOnMainParts;
78
79
TBatchedQueue<TLogoBlobID> TryDeleteParts;
@@ -83,46 +84,46 @@ namespace NBalancing {
83
84
TInstant StartTime;
84
85
85
86
// /////////////////////////////////////////////////////////////////////////////////////////
86
- // Main logic
87
+ // Init logic
87
88
// /////////////////////////////////////////////////////////////////////////////////////////
88
89
89
- void ContinueBalancing () {
90
- Ctx->MonGroup .PlannedToSendOnMain () = SendOnMainParts.Data .size ();
91
- Ctx->MonGroup .CandidatesToDelete () = TryDeleteParts.Data .size ();
90
+ void SendCheckVDisksStatusRequests () {
91
+ for (ui32 i = 0 ; i < GInfo->GetTotalVDisksNum (); ++i) {
92
+ const auto vdiskId = GInfo->GetVDiskId (i);
93
+ const auto actorId = GInfo->GetActorId (i);
94
+ if (TVDiskIdShort (vdiskId) != Ctx->VCtx ->ShortSelfVDisk ) {
95
+ Send (actorId, new TEvBlobStorage::TEvVStatus (vdiskId));
96
+ }
97
+ }
98
+ }
92
99
93
- if (SendOnMainParts.Empty () && TryDeleteParts.Empty ()) {
94
- // no more parts to send or delete
95
- STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB03, VDISKP (Ctx->VCtx , " Balancing completed" ));
96
- PassAway ();
100
+ void Handle (TEvBlobStorage::TEvVStatusResult::TPtr ev) {
101
+ auto msg = ev->Get ();
102
+ auto vdiskId = VDiskIDFromVDiskID (msg->Record .GetVDiskID ());
103
+ auto status = msg->Record .GetStatus ();
104
+ bool replicated = msg->Record .GetReplicated ();
105
+ bool isReadOnly = msg->Record .GetIsReadOnly ();
106
+
107
+ if (status != NKikimrProto::EReplyStatus::OK || !replicated || isReadOnly) {
108
+ STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP (Ctx->VCtx , " VDisk is not ready. Stop balancing" ),
109
+ (VDiskId, vdiskId), (Status, NKikimrProto::EReplyStatus_Name (status)), (Replicated, replicated), (ReadOnly, isReadOnly));
110
+ Stop (TDuration::Seconds (10 ));
97
111
return ;
98
112
}
99
113
100
- // ask for repl token to continue balancing
101
- STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB01, VDISKP (Ctx->VCtx , " Ask repl token to continue balancing" ), (SelfId, SelfId ()), (PDiskId, Ctx->VDiskCfg ->BaseInfo .PDiskId ));
102
- Send (MakeBlobStorageReplBrokerID (), new TEvQueryReplToken (Ctx->VDiskCfg ->BaseInfo .PDiskId ), NActors::IEventHandle::FlagTrackDelivery);
114
+ GoodStatusVDisks.insert (vdiskId);
103
115
}
104
116
105
- void ScheduleJobQuant () {
106
- Ctx->MonGroup .ReplTokenAquired ()++;
107
-
108
- // once repl token received, start balancing - waking up sender and deleter
109
- STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP (Ctx->VCtx , " Schedule job quant" ),
110
- (SendPartsLeft, SendOnMainParts.Size ()), (DeletePartsLeft, TryDeleteParts.Size ()),
111
- (ConnectedVDisks, ConnectedVDisks.size ()), (TotalVDisks, GInfo->GetTotalVDisksNum ()));
112
-
113
- // register sender and deleter actors
114
- BatchManager = TBatchManager (
115
- CreateSenderActor (SelfId (), SendOnMainParts.GetNextBatch (), QueueActorMapPtr, Ctx),
116
- CreateDeleterActor (SelfId (), TryDeleteParts.GetNextBatch (), QueueActorMapPtr, Ctx)
117
- );
117
+ bool ReadyToBalance () const {
118
+ return (GoodStatusVDisks.size () + 1 == GInfo->GetTotalVDisksNum ()) && (ConnectedVDisks.size () + 1 == GInfo->GetTotalVDisksNum ());
118
119
}
119
120
120
121
void CollectKeys () {
121
- if (ConnectedVDisks. size () + 1 != GInfo-> GetTotalVDisksNum ()) {
122
+ if (! ReadyToBalance ()) {
122
123
// not all vdisks are connected
123
124
STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB11, VDISKP (Ctx->VCtx , " Not all vdisks are connected, balancing should work only for full groups" ),
124
- (ConnectedVDisks, ConnectedVDisks.size ()), (TotalVDisksInGroup, GInfo->GetTotalVDisksNum ()));
125
- PassAway ( );
125
+ (ConnectedVDisks, ConnectedVDisks.size ()), (GoodStatusVDisks, GoodStatusVDisks. size ()), ( TotalVDisksInGroup, GInfo->GetTotalVDisksNum ()));
126
+ Stop ( TDuration::Seconds ( 10 ) );
126
127
return ;
127
128
}
128
129
@@ -197,6 +198,42 @@ namespace NBalancing {
197
198
ContinueBalancing ();
198
199
}
199
200
201
+ // /////////////////////////////////////////////////////////////////////////////////////////
202
+ // Main logic
203
+ // /////////////////////////////////////////////////////////////////////////////////////////
204
+
205
+ void ContinueBalancing () {
206
+ Ctx->MonGroup .PlannedToSendOnMain () = SendOnMainParts.Data .size ();
207
+ Ctx->MonGroup .CandidatesToDelete () = TryDeleteParts.Data .size ();
208
+
209
+ if (SendOnMainParts.Empty () && TryDeleteParts.Empty ()) {
210
+ // no more parts to send or delete
211
+ STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB03, VDISKP (Ctx->VCtx , " Balancing completed" ));
212
+ bool hasSomeWorkForNextEpoch = SendOnMainParts.Data .size () >= Ctx->Cfg .MaxToSendPerEpoch || TryDeleteParts.Data .size () >= Ctx->Cfg .MaxToDeletePerEpoch ;
213
+ Stop (hasSomeWorkForNextEpoch ? TDuration::Seconds (0 ) : Ctx->Cfg .TimeToSleepIfNothingToDo );
214
+ return ;
215
+ }
216
+
217
+ // ask for repl token to continue balancing
218
+ STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB01, VDISKP (Ctx->VCtx , " Ask repl token to continue balancing" ), (SelfId, SelfId ()), (PDiskId, Ctx->VDiskCfg ->BaseInfo .PDiskId ));
219
+ Send (MakeBlobStorageReplBrokerID (), new TEvQueryReplToken (Ctx->VDiskCfg ->BaseInfo .PDiskId ), NActors::IEventHandle::FlagTrackDelivery);
220
+ }
221
+
222
+ void ScheduleJobQuant () {
223
+ Ctx->MonGroup .ReplTokenAquired ()++;
224
+
225
+ // once repl token received, start balancing - waking up sender and deleter
226
+ STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP (Ctx->VCtx , " Schedule job quant" ),
227
+ (SendPartsLeft, SendOnMainParts.Size ()), (DeletePartsLeft, TryDeleteParts.Size ()),
228
+ (ConnectedVDisks, ConnectedVDisks.size ()), (TotalVDisks, GInfo->GetTotalVDisksNum ()));
229
+
230
+ // register sender and deleter actors
231
+ BatchManager = TBatchManager (
232
+ CreateSenderActor (SelfId (), SendOnMainParts.GetNextBatch (), QueueActorMapPtr, Ctx),
233
+ CreateDeleterActor (SelfId (), TryDeleteParts.GetNextBatch (), QueueActorMapPtr, Ctx)
234
+ );
235
+ }
236
+
200
237
void Handle (NActors::TEvents::TEvCompleted::TPtr ev) {
201
238
STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP (Ctx->VCtx , " TEvCompleted" ), (Type, ev->Type ));
202
239
BatchManager.Handle (ev);
@@ -205,7 +242,7 @@ namespace NBalancing {
205
242
Ctx->MonGroup .EpochTimeouts ()++;
206
243
Send (MakeBlobStorageReplBrokerID (), new TEvReleaseReplToken);
207
244
STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP (Ctx->VCtx , " Epoch timeout" ));
208
- PassAway ( );
245
+ Stop ( TDuration::Seconds ( 0 ) );
209
246
return ;
210
247
}
211
248
@@ -280,19 +317,24 @@ namespace NBalancing {
280
317
Send (BatchManager.DeleterId , msg->Clone ());
281
318
}
282
319
283
- void PassAway () override {
320
+ void Stop (TDuration timeoutBeforeNextLaunch) {
321
+ STLOG (PRI_INFO, BS_VDISK_BALANCING, BSVB12, VDISKP (Ctx->VCtx , " Stop balancing" ), (SendOnMainParts, SendOnMainParts.Data .size ()), (TryDeleteParts, TryDeleteParts.Data .size ()), (SecondsBeforeNextLaunch, timeoutBeforeNextLaunch.Seconds ()));
322
+
284
323
Send (BatchManager.SenderId , new NActors::TEvents::TEvPoison);
285
324
Send (BatchManager.DeleterId , new NActors::TEvents::TEvPoison);
286
325
for (const auto & kv : *QueueActorMapPtr) {
287
326
Send (kv.second , new TEvents::TEvPoison);
288
327
}
289
- Send ( Ctx->SkeletonId , new TEvStartBalancing ());
290
- TActorBootstrapped:: PassAway ();
328
+ TlsActivationContext-> Schedule (timeoutBeforeNextLaunch, new IEventHandle ( Ctx->SkeletonId , SelfId (), new TEvStartBalancing () ));
329
+ PassAway ();
291
330
}
292
331
293
332
STRICT_STFUNC (StateFunc,
294
- // Logic events
333
+ // Init events
295
334
cFunc (NActors::TEvents::TEvWakeup::EventType, CollectKeys)
335
+ hFunc(TEvBlobStorage::TEvVStatusResult, Handle)
336
+
337
+ // Main logic events
296
338
cFunc(TEvReplToken::EventType, ScheduleJobQuant)
297
339
hFunc(NActors::TEvents::TEvCompleted, Handle)
298
340
hFunc(TEvBalancingSendPartsOnMain, Handle)
@@ -323,6 +365,7 @@ namespace NBalancing {
323
365
CreateVDisksQueues ();
324
366
It.SeekToFirst ();
325
367
++Ctx->MonGroup .BalancingIterations ();
368
+ SendCheckVDisksStatusRequests ();
326
369
Schedule (TDuration::Seconds (10 ), new NActors::TEvents::TEvWakeup ());
327
370
}
328
371
};
0 commit comments