Skip to content

Commit d518adf

Browse files
authored
Support correct PDisk stopping in context of metadata handling (#7104)
1 parent bbafb67 commit d518adf

7 files changed

+79
-31
lines changed

ydb/core/blobstorage/nodewarden/node_warden_pdisk.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -452,21 +452,23 @@ namespace NKikimr::NStorage {
452452
std::unique_ptr<IEventHandle> OriginalEv;
453453
std::unique_ptr<IEventBase> ConvertedEv;
454454
TActorId ParentId;
455+
const char* const EventType;
455456

456457
public:
457458
TPDiskMetadataInteractionActor(TPDiskKey pdiskKey, TAutoPtr<IEventHandle> originalEv,
458-
std::unique_ptr<IEventBase> convertedEv)
459+
std::unique_ptr<IEventBase> convertedEv, const char *eventType)
459460
: PDiskKey(pdiskKey)
460461
, OriginalEv(originalEv.Release())
461462
, ConvertedEv(std::move(convertedEv))
463+
, EventType(eventType)
462464
{}
463465

464466
void Bootstrap(TActorId parentId) {
465467
ParentId = parentId;
466468
Y_ABORT_UNLESS(PDiskKey.NodeId == SelfId().NodeId());
467469
Send(MakeBlobStoragePDiskID(PDiskKey.NodeId, PDiskKey.PDiskId), ConvertedEv.release(),
468470
IEventHandle::FlagTrackDelivery);
469-
Become(&TThis::StateFunc);
471+
Become(&TThis::StateFunc, TDuration::Seconds(10), new TEvents::TEvWakeup);
470472
}
471473

472474
void Handle(TEvents::TEvUndelivered::TPtr ev) {
@@ -506,18 +508,24 @@ namespace NKikimr::NStorage {
506508
TActorBootstrapped::PassAway();
507509
}
508510

511+
void HandleWakeup() {
512+
Y_DEBUG_ABORT("Event# %s took too long to process", EventType);
513+
STLOG(PRI_CRIT, BS_NODE, NW61, "TPDiskMetadataInteractionActor::Wakeup", (EventType, EventType));
514+
}
515+
509516
STRICT_STFUNC(StateFunc,
510517
hFunc(TEvents::TEvUndelivered, Handle);
511518
hFunc(NPDisk::TEvReadMetadataResult, Handle);
512519
hFunc(NPDisk::TEvWriteMetadataResult, Handle);
520+
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
513521
)
514522
};
515523

516524
void TNodeWarden::Handle(TEvNodeWardenReadMetadata::TPtr ev) {
517525
const TString& path = ev->Get()->Path;
518526
STLOG(PRI_DEBUG, BS_NODE, NW56, "TEvNodeWardenReadMetadata", (Path, path));
519527
Register(new TPDiskMetadataInteractionActor(GetPDiskForMetadata(path), ev.Release(),
520-
std::make_unique<NPDisk::TEvReadMetadata>()));
528+
std::make_unique<NPDisk::TEvReadMetadata>(), "TEvNodeWardenReadMetadata"));
521529
}
522530

523531
void TNodeWarden::Handle(TEvNodeWardenWriteMetadata::TPtr ev) {
@@ -528,7 +536,7 @@ namespace NKikimr::NStorage {
528536
const TString& path = msg->Path;
529537
STLOG(PRI_DEBUG, BS_NODE, NW57, "TEvNodeWardenWriteMetadata", (Path, path), (Metadata.size, data.size()));
530538
Register(new TPDiskMetadataInteractionActor(GetPDiskForMetadata(path), ev.Release(),
531-
std::make_unique<NPDisk::TEvWriteMetadata>(TRcBuf(std::move(data)))));
539+
std::make_unique<NPDisk::TEvWriteMetadata>(TRcBuf(std::move(data))), "TEvNodeWardenWriteMetadata"));
532540
}
533541

534542
TPDiskKey TNodeWarden::GetPDiskForMetadata(const TString& path) {

ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,20 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
928928
} MetadataHandlingState = EMetadataHandlingState::WAITING_FOR_STARTUP;
929929
bool NeedToStopOnPoison = false;
930930

931+
void DropMetadata() {
932+
for (auto& ev : std::exchange(PendingMetadata, {})) {
933+
switch (ev->GetTypeRewrite()) {
934+
case TEvReadMetadata::EventType:
935+
Send(ev->Sender, new TEvReadMetadataResult(EPDiskMetadataOutcome::ERROR, std::nullopt));
936+
break;
937+
938+
case TEvWriteMetadata::EventType:
939+
Send(ev->Sender, new TEvWriteMetadataResult(EPDiskMetadataOutcome::ERROR, std::nullopt));
940+
break;
941+
}
942+
}
943+
}
944+
931945
void StartHandlingMetadata(bool error) {
932946
MetadataHandlingState = error
933947
? EMetadataHandlingState::ERROR
@@ -973,6 +987,11 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
973987
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
974988
// All states
975989

990+
void PassAway() override {
991+
DropMetadata();
992+
TActorBootstrapped::PassAway();
993+
}
994+
976995
void HandlePoison() {
977996
if (NeedToStopOnPoison && PDisk) {
978997
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TStopDevice>());

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,9 @@ void TPDisk::Stop() {
371371
while (InputQueue.GetWaitingSize() > 0) {
372372
TRequestBase::AbortDelete(InputQueue.Pop(), ActorSystem);
373373
}
374+
375+
DropAllMetadataRequests();
376+
374377
if (InitialTailBuffer) {
375378
InitialTailBuffer->Exec(ActorSystem);
376379
InitialTailBuffer = nullptr;

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,8 @@ class TPDisk : public IPDisk {
377377
void HandleNextWriteMetadata();
378378
void ProcessWriteMetadataResult(TWriteMetadataResult& request);
379379

380+
void DropAllMetadataRequests();
381+
380382
TRcBuf CreateMetadataPayload(TRcBuf& metadata, size_t offset, size_t payloadSize, ui32 sectorSize, bool encryption,
381383
const TKey& key, ui64 sequenceNumber, ui32 recordIndex, ui32 totalRecords);
382384
bool WriteMetadataSync(TRcBuf&& metadata);

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_metadata.cpp

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,13 @@ namespace NKikimr::NPDisk {
7777

7878
class TCompletionWriteMetadata : public TCompletionAction {
7979
TPDisk* const PDisk;
80+
const TActorId Sender;
8081
std::deque<std::tuple<NMeta::TSlotKey, TRcBuf>> WriteQueue;
8182

8283
public:
83-
TCompletionWriteMetadata(TPDisk *pdisk)
84+
TCompletionWriteMetadata(TPDisk *pdisk, TActorId sender)
8485
: PDisk(pdisk)
86+
, Sender(sender)
8587
{}
8688

8789
bool CanHandleResult() const override { return true; }
@@ -90,11 +92,11 @@ namespace NKikimr::NPDisk {
9092
WriteQueue.emplace_back(key, std::move(buffer));
9193
}
9294

93-
void IssueQuery() {
95+
void IssueQuery(TActorSystem *actorSystem) {
9496
Y_ABORT_UNLESS(!WriteQueue.empty());
9597
auto& [key, buffer] = WriteQueue.front();
9698
const ui64 writeOffset = PDisk->Format.Offset(key.ChunkIdx, key.OffsetInSectors);
97-
LOG_DEBUG_S(*PDisk->ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PDiskId
99+
LOG_DEBUG_S(*actorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PDiskId
98100
<< " TCompletionWriteMetadata::IssueQuery"
99101
<< " Buffer.size# " << buffer.size()
100102
<< " WriteOffset# " << writeOffset
@@ -105,20 +107,21 @@ namespace NKikimr::NPDisk {
105107
}
106108

107109
void Exec(TActorSystem *actorSystem) override {
108-
LOG_DEBUG_S(*PDisk->ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PDiskId
110+
LOG_DEBUG_S(*actorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDisk->PDiskId
109111
<< " TCompletionWriteMetadata::Exec"
110112
<< " Result# " << Result);
111113
if (Result != EIoResult::Ok) {
112-
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TWriteMetadataResult>(false));
114+
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TWriteMetadataResult>(false, Sender));
113115
} else if (WriteQueue.empty()) {
114-
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TWriteMetadataResult>(true));
116+
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TWriteMetadataResult>(true, Sender));
115117
} else {
116-
return IssueQuery();
118+
return IssueQuery(actorSystem);
117119
}
118-
Release(actorSystem);
120+
delete this;
119121
}
120122

121-
void Release(TActorSystem* /*actorSystem*/) override {
123+
void Release(TActorSystem *actorSystem) override {
124+
actorSystem->Send(Sender, new TEvWriteMetadataResult(EPDiskMetadataOutcome::ERROR, std::nullopt));
122125
delete this;
123126
}
124127
};
@@ -188,16 +191,18 @@ namespace NKikimr::NPDisk {
188191

189192
class TCompletionWriteUnformattedMetadata : public TCompletionAction {
190193
TPDisk* const PDisk;
194+
const TActorId Sender;
191195
const TMetadataFormatSector Format;
192196
TRcBuf Payload;
193197
const TMainKey MainKey;
194198
int FormatIndex = -1; // -1 for payload
195199
ui32 BadSectors = 0;
196200

197201
public:
198-
TCompletionWriteUnformattedMetadata(TPDisk *pdisk, const TMetadataFormatSector& format, TRcBuf&& payload,
199-
const TMainKey& mainKey)
202+
TCompletionWriteUnformattedMetadata(TPDisk *pdisk, TActorId sender, const TMetadataFormatSector& format,
203+
TRcBuf&& payload, const TMainKey& mainKey)
200204
: PDisk(pdisk)
205+
, Sender(sender)
201206
, Format(format)
202207
, Payload(std::move(payload))
203208
, MainKey(mainKey)
@@ -246,12 +251,13 @@ namespace NKikimr::NPDisk {
246251
}
247252
}
248253

249-
void Release(TActorSystem* /*actorSystem*/) override {
254+
void Release(TActorSystem *actorSystem) override {
255+
actorSystem->Send(Sender, new TEvWriteMetadataResult(EPDiskMetadataOutcome::ERROR, std::nullopt));
250256
delete this;
251257
}
252258

253259
void Finish(bool success) {
254-
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TWriteMetadataResult>(success));
260+
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TWriteMetadataResult>(success, Sender));
255261
delete this;
256262
}
257263
};
@@ -585,7 +591,7 @@ namespace NKikimr::NPDisk {
585591
}
586592

587593
// generate write queue
588-
auto completion = std::make_unique<TCompletionWriteMetadata>(this);
594+
auto completion = std::make_unique<TCompletionWriteMetadata>(this, write.Sender);
589595
size_t offset = 0;
590596
for (ui32 i = 0; i < numSlotsRequired; ++i, offset += slotSize) {
591597
const NMeta::TSlotKey key = freeSlotKeys[i];
@@ -603,7 +609,7 @@ namespace NKikimr::NPDisk {
603609
it->second = NMeta::ESlotState::BEING_WRITTEN;
604610
}
605611

606-
completion.release()->IssueQuery();
612+
completion.release()->IssueQuery(ActorSystem);
607613
},
608614
[&](NMeta::TUnformatted& unformatted) {
609615
TMetadataFormatSector& fmt = unformatted.FormatInFlight;
@@ -641,7 +647,8 @@ namespace NKikimr::NPDisk {
641647
fmt.Length = bytesToWrite;
642648
fmt.SequenceNumber = Meta.NextSequenceNumber;
643649

644-
auto *completion = new TCompletionWriteUnformattedMetadata(this, fmt, std::move(payload), write.MainKey);
650+
auto *completion = new TCompletionWriteUnformattedMetadata(this, write.Sender, fmt, std::move(payload),
651+
write.MainKey);
645652
completion->IssueQuery();
646653
},
647654
}, Meta.State);
@@ -698,6 +705,12 @@ namespace NKikimr::NPDisk {
698705
ProcessMetadataRequestQueue();
699706
}
700707

708+
void TPDisk::DropAllMetadataRequests() {
709+
for (auto& item : std::exchange(Meta.Requests, {})) {
710+
TRequestBase::AbortDelete(item.release(), ActorSystem);
711+
}
712+
}
713+
701714
TRcBuf TPDisk::CreateMetadataPayload(TRcBuf& metadata, size_t offset, size_t payloadSize, ui32 sectorSize,
702715
bool encryption, const TKey& key, ui64 sequenceNumber, ui32 recordIndex, ui32 totalRecords) {
703716
Y_ABORT_UNLESS(offset + payloadSize <= metadata.size());

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_metadata.h

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,6 @@
55
#include "blobstorage_pdisk_defs.h"
66
#include "blobstorage_pdisk_data.h"
77

8-
// TODO:
9-
// 1. Нужно сделать механизм для чтения и записи метаданных в двух режимах:
10-
// * когда PDisk отформатирован и работает
11-
// * когда PDisk без форматирования
12-
// 2. Нужно сделать перенос метаданных при форматировании, причём так, чтобы при случайном перебое в процессе не вышло,
13-
// что метаданные потерялись и их больше нет (NO_METADATA), хотя ERROR допустимо. Но нежелательно.
14-
// 3. Obliterate должен удалять метаданные тоже, даже если они были на неформатированном диске.
15-
// 4. Старый PDisk может не запускаться на новом PDisk'е с метаданными и без форматирования.
16-
178
namespace NKikimr::NPDisk {
189

1910
class TRequestBase;

ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,6 +1018,10 @@ class TReadMetadata : public TRequestBase {
10181018
ERequestType GetType() const override {
10191019
return ERequestType::RequestReadMetadata;
10201020
}
1021+
1022+
void Abort(TActorSystem *actorSystem) override {
1023+
actorSystem->Send(Sender, new TEvReadMetadataResult(EPDiskMetadataOutcome::ERROR, std::nullopt));
1024+
}
10211025
};
10221026

10231027
class TInitialReadMetadataResult : public TRequestBase {
@@ -1051,20 +1055,28 @@ class TWriteMetadata : public TRequestBase {
10511055
ERequestType GetType() const override {
10521056
return ERequestType::RequestWriteMetadata;
10531057
}
1058+
1059+
void Abort(TActorSystem *actorSystem) override {
1060+
actorSystem->Send(Sender, new TEvWriteMetadataResult(EPDiskMetadataOutcome::ERROR, std::nullopt));
1061+
}
10541062
};
10551063

10561064
class TWriteMetadataResult : public TRequestBase {
10571065
public:
10581066
const bool Success;
10591067

1060-
TWriteMetadataResult(bool success, TAtomicBase reqIdx)
1061-
: TRequestBase({}, TReqId(TReqId::WriteMetadataResult, reqIdx), OwnerSystem, 0, NPriInternal::Other)
1068+
TWriteMetadataResult(bool success, TActorId sender, TAtomicBase reqIdx)
1069+
: TRequestBase(sender, TReqId(TReqId::WriteMetadataResult, reqIdx), OwnerSystem, 0, NPriInternal::Other)
10621070
, Success(success)
10631071
{}
10641072

10651073
ERequestType GetType() const override {
10661074
return ERequestType::RequestWriteMetadataResult;
10671075
}
1076+
1077+
void Abort(TActorSystem *actorSystem) override {
1078+
actorSystem->Send(Sender, new TEvWriteMetadataResult(EPDiskMetadataOutcome::ERROR, std::nullopt));
1079+
}
10681080
};
10691081

10701082
class TPushUnformattedMetadataSector : public TRequestBase {

0 commit comments

Comments
 (0)