Skip to content

Commit 9040c7c

Browse files
alexvruuzhastik
authored andcommitted
Fix VDisk replication token handling, add some extra checks and log points (merge from main #10371) (#11225)
1 parent e54af03 commit 9040c7c

File tree

2 files changed

+41
-6
lines changed

2 files changed

+41
-6
lines changed

ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ namespace NKikimr {
174174
TEvResumeForce *ResumeForceToken = nullptr;
175175
TInstant ReplicationEndTime;
176176
bool UnrecoveredNonphantomBlobs = false;
177+
bool RequestedReplicationToken = false;
178+
bool HoldingReplicationToken = false;
177179

178180
TWatchdogTimer<TEvReplCheckProgress> ReplProgressWatchdog;
179181

@@ -287,6 +289,12 @@ namespace NKikimr {
287289
case Plan:
288290
// this is a first quantum of replication, so we have to register it in the broker
289291
State = AwaitToken;
292+
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
293+
if (RequestedReplicationToken) {
294+
STLOG(PRI_CRIT, BS_REPL, BSVR38, ReplCtx->VCtx->VDiskLogPrefix << "excessive replication token requested");
295+
break;
296+
}
297+
RequestedReplicationToken = true;
290298
if (!Send(MakeBlobStorageReplBrokerID(), new TEvQueryReplToken(ReplCtx->VDiskCfg->BaseInfo.PDiskId))) {
291299
HandleReplToken();
292300
}
@@ -303,6 +311,10 @@ namespace NKikimr {
303311
}
304312

305313
void HandleReplToken() {
314+
Y_ABORT_UNLESS(RequestedReplicationToken);
315+
RequestedReplicationToken = false;
316+
HoldingReplicationToken = true;
317+
306318
// switch to replication state
307319
Transition(AwaitToken, Replication);
308320
if (!ResumeIfReady()) {
@@ -408,6 +420,9 @@ namespace NKikimr {
408420
if (State == WaitQueues || State == Replication) {
409421
// release token as we have finished replicating
410422
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
423+
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken);
424+
Y_DEBUG_ABORT_UNLESS(HoldingReplicationToken);
425+
HoldingReplicationToken = false;
411426
}
412427
ResetReplProgressTimer(true);
413428

@@ -635,7 +650,15 @@ namespace NKikimr {
635650

636651
// return replication token if we have one
637652
if (State == AwaitToken || State == WaitQueues || State == Replication) {
638-
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
653+
Y_DEBUG_ABORT_UNLESS(RequestedReplicationToken || HoldingReplicationToken);
654+
if (RequestedReplicationToken || HoldingReplicationToken) {
655+
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
656+
}
657+
} else {
658+
Y_DEBUG_ABORT_UNLESS(!RequestedReplicationToken && !HoldingReplicationToken);
659+
if (RequestedReplicationToken || HoldingReplicationToken) {
660+
STLOG(PRI_CRIT, BS_REPL, BSVR37, ReplCtx->VCtx->VDiskLogPrefix << "stuck replication token");
661+
}
639662
}
640663

641664
if (ReplJobActorId) {

ydb/core/blobstorage/vdisk/repl/blobstorage_replproxy.cpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ namespace NKikimr {
129129
ui64 NextReceiveCookie;
130130
TResultQueue ResultQueue;
131131
std::shared_ptr<TMessageRelevanceTracker> Tracker = std::make_shared<TMessageRelevanceTracker>();
132+
bool Terminated = false;
132133

133134
TQueue<std::unique_ptr<TEvBlobStorage::TEvVGet>> SchedulerRequestQ;
134135
THashMap<ui64, TReplMemTokenId> RequestTokens;
@@ -227,9 +228,7 @@ namespace NKikimr {
227228
PrefetchDataSize = 0;
228229
RequestFromVDiskProxyPending = false;
229230
if (Finished) {
230-
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
231-
RequestTokens.clear();
232-
return PassAway(); // TODO(alexvru): check correctness of invocations
231+
return PassAway();
233232
}
234233
}
235234
// send request(s) if prefetch queue is not full
@@ -297,6 +296,9 @@ namespace NKikimr {
297296
if (msg->Record.GetCookie() == NextReceiveCookie) {
298297
ui64 cookie = NextReceiveCookie;
299298
ProcessResult(msg);
299+
if (Terminated) {
300+
return;
301+
}
300302
ReleaseMemToken(cookie);
301303
while (!ResultQueue.empty()) {
302304
const TQueueItem& top = ResultQueue.top();
@@ -305,6 +307,9 @@ namespace NKikimr {
305307
}
306308
ui64 cookie = NextReceiveCookie;
307309
ProcessResult(top.get());
310+
if (Terminated) {
311+
return;
312+
}
308313
ReleaseMemToken(cookie);
309314
ResultQueue.pop();
310315
}
@@ -314,6 +319,7 @@ namespace NKikimr {
314319
}
315320

316321
void ReleaseMemToken(ui64 cookie) {
322+
Y_ABORT_UNLESS(!Terminated);
317323
if (RequestTokens) {
318324
auto it = RequestTokens.find(cookie);
319325
Y_ABORT_UNLESS(it != RequestTokens.end());
@@ -428,6 +434,13 @@ namespace NKikimr {
428434
}
429435
}
430436

437+
void PassAway() override {
438+
Y_ABORT_UNLESS(!Terminated);
439+
Terminated = true;
440+
Send(MakeBlobStorageReplBrokerID(), new TEvPruneQueue);
441+
TActorBootstrapped::PassAway();
442+
}
443+
431444
STRICT_STFUNC(StateFunc,
432445
hFunc(TEvReplProxyNext, Handle)
433446
hFunc(TEvReplMemToken, Handle)
@@ -446,8 +459,7 @@ namespace NKikimr {
446459
TTrackableVector<TVDiskProxy::TScheduledBlob>&& ids,
447460
const TVDiskID& vdiskId,
448461
const TActorId& serviceId)
449-
: TActorBootstrapped<TVDiskProxyActor>()
450-
, ReplCtx(std::move(replCtx))
462+
: ReplCtx(std::move(replCtx))
451463
, GType(ReplCtx->VCtx->Top->GType)
452464
, Ids(std::move(ids))
453465
, VDiskId(vdiskId)

0 commit comments

Comments
 (0)