Skip to content

Commit 8d8468f

Browse files
authored
add fast bs queue to donor for online read (#16699) (#17325)
1 parent b11d285 commit 8d8468f

File tree

4 files changed

+200
-16
lines changed

4 files changed

+200
-16
lines changed

ydb/core/blobstorage/ut_blobstorage/donor.cpp

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,4 +320,161 @@ Y_UNIT_TEST_SUITE(Donor) {
320320
}
321321
// env.Sim(TDuration::Seconds(10));
322322
}
323+
324+
void CheckHasDonor(TEnvironmentSetup& env, const TActorId& vdiskActorId, const TVDiskID& vdiskId) {
325+
auto baseConfig = env.FetchBaseConfig();
326+
bool found = false;
327+
for (const auto& slot : baseConfig.GetVSlot()) {
328+
if (slot.DonorsSize()) {
329+
UNIT_ASSERT(!found);
330+
UNIT_ASSERT_VALUES_EQUAL(slot.DonorsSize(), 1);
331+
const auto& donor = slot.GetDonors(0);
332+
const auto& id = donor.GetVSlotId();
333+
UNIT_ASSERT_VALUES_EQUAL(vdiskActorId, MakeBlobStorageVDiskID(id.GetNodeId(), id.GetPDiskId(), id.GetVSlotId()));
334+
UNIT_ASSERT_VALUES_EQUAL(VDiskIDFromVDiskID(donor.GetVDiskId()), vdiskId);
335+
found = true;
336+
}
337+
}
338+
UNIT_ASSERT(found);
339+
}
340+
341+
TVector<NKikimrBlobStorage::TBaseConfig_TVSlot_TDonorDisk> GetDonors(TEnvironmentSetup& env, const TVDiskID& vdiskId) {
342+
TVector<NKikimrBlobStorage::TBaseConfig_TVSlot_TDonorDisk> result;
343+
const auto& baseConfig = env.FetchBaseConfig();
344+
for (const auto& slot : baseConfig.GetVSlot()) {
345+
for (size_t donorId = 0; donorId < slot.DonorsSize(); ++donorId) {
346+
const auto& donor = slot.GetDonors(donorId);
347+
if (VDiskIDFromVDiskID(donor.GetVDiskId()) == vdiskId) {
348+
result.push_back(donor);
349+
}
350+
}
351+
}
352+
return result;
353+
}
354+
355+
Y_UNIT_TEST(CheckOnlineReadRequestToDonor) {
356+
TEnvironmentSetup env{TEnvironmentSetup::TSettings{
357+
.NodeCount = 8,
358+
.VDiskReplPausedAtStart = true,
359+
.Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
360+
}};
361+
auto& runtime = env.Runtime;
362+
363+
env.EnableDonorMode();
364+
env.CreateBoxAndPool(2, 1);
365+
env.CommenceReplication();
366+
env.Sim(TDuration::Seconds(30));
367+
368+
const ui32 groupId = env.GetGroups().front();
369+
370+
const TActorId edge = runtime->AllocateEdgeActor(1, __FILE__, __LINE__);
371+
const TString buffer = TString(2_MB, 'b');
372+
TLogoBlobID logoBlobId(1, 1, 0, 0, buffer.size(), 0);
373+
TVDiskID vdiskId;
374+
bool vdiskIdWithBlobSet = false;
375+
TLogoBlobID vdiskLogoBlobId;
376+
377+
// Put blob and find vdisk with it and partId = 1
378+
{
379+
env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
380+
if (ev->GetTypeRewrite() == TEvBlobStorage::EvVPut) {
381+
Y_UNUSED(nodeId);
382+
auto* msg = ev->Get<TEvBlobStorage::TEvVPut>();
383+
const auto& blobId = LogoBlobIDFromLogoBlobID(msg->Record.GetBlobID());
384+
if (blobId.IsSameBlob(logoBlobId) && blobId.PartId() == 1 && !vdiskIdWithBlobSet) {
385+
vdiskId = VDiskIDFromVDiskID(msg->Record.GetVDiskID());
386+
vdiskLogoBlobId = blobId;
387+
vdiskIdWithBlobSet = true;
388+
} else {
389+
}
390+
}
391+
return true;
392+
};
393+
394+
runtime->WrapInActorContext(edge, [&] {
395+
SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvPut(logoBlobId, buffer, TInstant::Max()));
396+
});
397+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge, false);
398+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
399+
UNIT_ASSERT(vdiskIdWithBlobSet);
400+
}
401+
402+
auto info = env.GetGroupInfo(groupId);
403+
const TActorId& vdiskActorId = info->GetActorId(vdiskId);
404+
405+
// Move slot out from disk and finf donor
406+
env.SettlePDisk(vdiskActorId);
407+
CheckHasDonor(env, vdiskActorId, vdiskId);
408+
const auto& donors = GetDonors(env, vdiskId);
409+
UNIT_ASSERT_VALUES_EQUAL(donors.size(), 1);
410+
const auto& donor = donors.front();
411+
412+
bool requestVdiskNotYet = false;
413+
bool fastRequestToDonor = false;
414+
bool asyncRequestToDonor = false;
415+
416+
const auto& checkRequestToDonor = [&](std::unique_ptr<IEventHandle>& ev, const NKikimrBlobStorage::EGetHandleClass& handleClass, bool& requestExist) {
417+
auto* msg = ev->Get<TEvBlobStorage::TEvVGet>();
418+
if (msg->Record.ExtremeQueriesSize() != 1) {
419+
return;
420+
}
421+
const auto& query = msg->Record.GetExtremeQueries(0);
422+
const auto& blobId = LogoBlobIDFromLogoBlobID(query.GetId());
423+
const auto& slotId = donor.GetVSlotId();
424+
const auto& donorActorId = MakeBlobStorageVDiskID(slotId.GetNodeId(), slotId.GetPDiskId(), slotId.GetVSlotId());
425+
426+
if (blobId == vdiskLogoBlobId &&
427+
ev->Recipient == donorActorId &&
428+
msg->Record.GetHandleClass() == handleClass) {
429+
UNIT_ASSERT(!requestExist);
430+
requestExist = true;
431+
}
432+
return;
433+
};
434+
435+
// Check disk answer TEvEnrichNotYet and request FastRead from donor for online read
436+
env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
437+
Y_UNUSED(nodeId);
438+
if (ev->GetTypeRewrite() == TEvBlobStorage::EvEnrichNotYet) {
439+
UNIT_ASSERT(!requestVdiskNotYet);
440+
auto msg = ev->Get<TEvBlobStorage::TEvEnrichNotYet>()->Query.Get()->Get();
441+
UNIT_ASSERT_VALUES_EQUAL(msg->Record.ExtremeQueriesSize(), 1);
442+
const auto& query = msg->Record.GetExtremeQueries(0);
443+
const auto& vdid = VDiskIDFromVDiskID(msg->Record.GetVDiskID());
444+
const auto& blobId = LogoBlobIDFromLogoBlobID(query.GetId());
445+
UNIT_ASSERT(vdid.SameExceptGeneration(vdiskId));
446+
UNIT_ASSERT_VALUES_EQUAL(vdid.GroupGeneration, 2);
447+
UNIT_ASSERT_VALUES_EQUAL(blobId, vdiskLogoBlobId);
448+
requestVdiskNotYet = true;
449+
}
450+
451+
if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGet) {
452+
checkRequestToDonor(ev, NKikimrBlobStorage::EGetHandleClass::FastRead, fastRequestToDonor);
453+
}
454+
return true;
455+
};
456+
457+
// Get blob
458+
{
459+
auto ev = new TEvBlobStorage::TEvGet(logoBlobId, 0, 0, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead);
460+
runtime->WrapInActorContext(edge, [&] {SendToBSProxy(edge, groupId, ev);});
461+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(edge, false);
462+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
463+
UNIT_ASSERT(requestVdiskNotYet);
464+
UNIT_ASSERT(fastRequestToDonor);
465+
}
466+
467+
// Check disk request AsyncRead from donor for replication
468+
env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
469+
Y_UNUSED(nodeId);
470+
if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGet) {
471+
checkRequestToDonor(ev, NKikimrBlobStorage::EGetHandleClass::AsyncRead, asyncRequestToDonor);
472+
}
473+
return true;
474+
};
475+
476+
// Start replication
477+
env.CommenceReplication();
478+
UNIT_ASSERT(asyncRequestToDonor);
479+
}
323480
}

ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ namespace NKikimr {
149149

150150
struct TDonorQueueItem {
151151
TVDiskID VDiskId;
152-
TActorId QueueActorId;
152+
TDonorQueueActors QueueActors;
153153
ui32 NodeId;
154154
ui32 PDiskId;
155155
ui32 VSlotId;
@@ -170,7 +170,7 @@ namespace NKikimr {
170170
TMilestoneQueue MilestoneQueue;
171171
TActorId ReplJobActorId;
172172
std::list<std::optional<TDonorQueueItem>> DonorQueue;
173-
std::deque<std::pair<TVDiskID, TActorId>> Donors;
173+
std::deque<std::pair<TVDiskID, TDonorQueueActors>> Donors;
174174
std::set<TVDiskID> ConnectedPeerDisks, ConnectedDonorDisks;
175175
TEvResumeForce *ResumeForceToken = nullptr;
176176
TInstant ReplicationEndTime;
@@ -215,21 +215,30 @@ namespace NKikimr {
215215
for (const auto& [vdiskId, vdiskActorId] : ReplCtx->VDiskCfg->BaseInfo.DonorDiskIds) {
216216
TIntrusivePtr<NBackpressure::TFlowRecord> flowRecord(new NBackpressure::TFlowRecord);
217217
auto info = MakeIntrusive<TBlobStorageGroupInfo>(ReplCtx->GInfo, vdiskId, vdiskActorId);
218-
const TActorId queueActorId = Register(CreateVDiskBackpressureClient(info, vdiskId,
218+
const TActorId asyncReadQueueActorId = Register(CreateVDiskBackpressureClient(info, vdiskId,
219219
NKikimrBlobStorage::EVDiskQueueId::GetAsyncRead, ReplCtx->MonGroup.GetGroup(), ReplCtx->VCtx,
220-
NBackpressure::TQueueClientId(NBackpressure::EQueueClientType::ReplJob, 0), "Donor",
220+
NBackpressure::TQueueClientId(NBackpressure::EQueueClientType::ReplJob, 0), "ReplicationDonor",
221+
ReplCtx->VDiskCfg->ReplInterconnectChannel, vdiskActorId.NodeId() == SelfId().NodeId(),
222+
TDuration::Minutes(1), flowRecord, NMonitoring::TCountableBase::EVisibility::Private));
223+
224+
const TActorId fastReadQueueActorId = Register(CreateVDiskBackpressureClient(info, vdiskId,
225+
NKikimrBlobStorage::EVDiskQueueId::GetFastRead, ReplCtx->MonGroup.GetGroup(), ReplCtx->VCtx,
226+
NBackpressure::TQueueClientId(NBackpressure::EQueueClientType::ReplJob, 0), "OnlineReadDonor",
221227
ReplCtx->VDiskCfg->ReplInterconnectChannel, vdiskActorId.NodeId() == SelfId().NodeId(),
222228
TDuration::Minutes(1), flowRecord, NMonitoring::TCountableBase::EVisibility::Private));
223229
ui32 nodeId, pdiskId, vslotId;
224230
std::tie(nodeId, pdiskId, vslotId) = DecomposeVDiskServiceId(vdiskActorId);
225231
DonorQueue.emplace_back(TDonorQueueItem{
226232
.VDiskId = vdiskId,
227-
.QueueActorId = queueActorId,
233+
.QueueActors = TDonorQueueActors{
234+
.AsyncReadQueueActorId = asyncReadQueueActorId,
235+
.FastReadQueueActorId = fastReadQueueActorId
236+
},
228237
.NodeId = nodeId,
229238
.PDiskId = pdiskId,
230239
.VSlotId = vslotId
231240
});
232-
Donors.emplace_back(vdiskId, queueActorId);
241+
Donors.emplace_back(vdiskId, TDonorQueueActors(asyncReadQueueActorId, fastReadQueueActorId));
233242
}
234243
DonorQueue.emplace_back(std::nullopt); // disks from group
235244

@@ -348,8 +357,10 @@ namespace NKikimr {
348357
}
349358

350359
void DropDonor(const TDonorQueueItem& donor) {
351-
Donors.erase(std::find(Donors.begin(), Donors.end(), std::make_pair(donor.VDiskId, donor.QueueActorId)));
352-
Send(donor.QueueActorId, new TEvents::TEvPoison); // kill the queue actor
360+
Donors.erase(std::find(Donors.begin(), Donors.end(), std::make_pair(donor.VDiskId,
361+
TDonorQueueActors(donor.QueueActors.AsyncReadQueueActorId, donor.QueueActors.FastReadQueueActorId))));
362+
Send(donor.QueueActors.AsyncReadQueueActorId, new TEvents::TEvPoison); // kill the queue actor
363+
Send(donor.QueueActors.FastReadQueueActorId, new TEvents::TEvPoison); // kill the queue actor
353364
Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvBlobStorage::TEvDropDonor(donor.NodeId,
354365
donor.PDiskId, donor.VSlotId, donor.VDiskId));
355366
}
@@ -470,7 +481,7 @@ namespace NKikimr {
470481
donor->NodeId << ":" << donor->PDiskId << ":" << donor->VSlotId << "}") : "generic"));
471482
ReplJobActorId = Register(CreateReplJobActor(ReplCtx, SelfId(), from, QueueActorMapPtr,
472483
BlobsToReplicatePtr, UnreplicatedBlobsPtr, donor ? std::make_optional(std::make_pair(
473-
donor->VDiskId, donor->QueueActorId)) : std::nullopt, std::move(UnreplicatedBlobRecords),
484+
donor->VDiskId, donor->QueueActors.AsyncReadQueueActorId)) : std::nullopt, std::move(UnreplicatedBlobRecords),
474485
std::move(MilestoneQueue)));
475486
}
476487

@@ -644,7 +655,8 @@ namespace NKikimr {
644655
}
645656
for (const auto& donor : DonorQueue) {
646657
if (donor) {
647-
Send(donor->QueueActorId, new TEvents::TEvPoison);
658+
Send(donor->QueueActors.AsyncReadQueueActorId, new TEvents::TEvPoison);
659+
Send(donor->QueueActors.FastReadQueueActorId, new TEvents::TEvPoison);
648660
}
649661
}
650662
for (const TActorId& actorId : DonorQueryActors) {

ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,15 @@ namespace NKikimr {
248248

249249
struct TEvReplCheckProgress : TEventLocal<TEvReplCheckProgress, TEvBlobStorage::EvReplCheckProgress> {};
250250

251+
struct TDonorQueueActors {
252+
TActorId AsyncReadQueueActorId;
253+
TActorId FastReadQueueActorId;
254+
255+
bool operator==(const TDonorQueueActors &other) const {
256+
return AsyncReadQueueActorId == other.AsyncReadQueueActorId && FastReadQueueActorId == other.FastReadQueueActorId;
257+
}
258+
};
259+
251260
////////////////////////////////////////////////////////////////////////////
252261
// REPL ACTOR CREATOR
253262
////////////////////////////////////////////////////////////////////////////

ydb/core/blobstorage/vdisk/repl/query_donor.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ namespace NKikimr {
1010
const ui64 Cookie;
1111
std::unique_ptr<TEvBlobStorage::TEvVGetResult> Result;
1212
TActorId ParentId;
13-
std::deque<std::pair<TVDiskID, TActorId>> Donors;
13+
std::deque<std::pair<TVDiskID, TDonorQueueActors>> Donors;
1414
TDynBitMap UnresolvedItems;
1515
TIntrusivePtr<TVDiskContext> VCtx;
1616

1717
public:
18-
TDonorQueryActor(TEvBlobStorage::TEvEnrichNotYet& msg, std::deque<std::pair<TVDiskID, TActorId>> donors, const TIntrusivePtr<TVDiskContext>& vCtx)
18+
TDonorQueryActor(TEvBlobStorage::TEvEnrichNotYet& msg, std::deque<std::pair<TVDiskID, TDonorQueueActors>> donors, const TIntrusivePtr<TVDiskContext>& vCtx)
1919
: Query(msg.Query->Release().Release())
2020
, Sender(msg.Query->Sender)
2121
, Cookie(msg.Query->Cookie)
@@ -45,7 +45,7 @@ namespace NKikimr {
4545
return PassAway();
4646
}
4747

48-
auto [vdiskId, actorId] = Donors.back();
48+
auto [vdiskId, actors] = Donors.back();
4949
Donors.pop_back();
5050

5151
// we use AsyncRead priority as we are going to use the replication queue for the VDisk; also this doesn't
@@ -57,7 +57,13 @@ namespace NKikimr {
5757
const auto flags = record.GetShowInternals()
5858
? TEvBlobStorage::TEvVGet::EFlags::ShowInternals
5959
: TEvBlobStorage::TEvVGet::EFlags::None;
60-
auto query = fun(vdiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead, flags, {}, {}, std::nullopt);
60+
const auto handleClass = record.GetHandleClass() == NKikimrBlobStorage::EGetHandleClass::FastRead
61+
? NKikimrBlobStorage::EGetHandleClass::FastRead
62+
: NKikimrBlobStorage::EGetHandleClass::AsyncRead;
63+
const auto queueActorId = record.GetHandleClass() == NKikimrBlobStorage::EGetHandleClass::FastRead
64+
? actors.FastReadQueueActorId
65+
: actors.AsyncReadQueueActorId;
66+
auto query = fun(vdiskId, TInstant::Max(), handleClass, flags, {}, {}, std::nullopt);
6167

6268
bool action = false;
6369
Y_FOR_EACH_BIT(i, UnresolvedItems) {
@@ -69,8 +75,8 @@ namespace NKikimr {
6975

7076
if (action) {
7177
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_VDISK_GET, SelfId() << " sending " << query->ToString()
72-
<< " to " << actorId);
73-
Send(actorId, query.release(), IEventHandle::FlagTrackDelivery);
78+
<< " to " << queueActorId);
79+
Send(queueActorId, query.release(), IEventHandle::FlagTrackDelivery);
7480
} else {
7581
PassAway();
7682
}

0 commit comments

Comments
 (0)