Skip to content

Commit 09373f6

Browse files
authored
Intoduce PhantomBlobs unit test, fix WhatsNext VERIFY fail (#20016)
1 parent 4aea290 commit 09373f6

File tree

7 files changed

+342
-60
lines changed

7 files changed

+342
-60
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
2+
#include "ut_helpers.h"
3+
4+
using namespace NKikimr;
5+
6+
#define Ctest Cerr
7+
8+
Y_UNIT_TEST_SUITE(PhantomBlobs) {
9+
10+
enum class ENodeState {
11+
Alive = 0,
12+
Dead,
13+
Restart,
14+
};
15+
16+
struct TTestCtx : public TTestCtxBase {
17+
TTestCtx(TEnvironmentSetup::TSettings settings)
18+
: TTestCtxBase(std::move(settings)) {
19+
}
20+
21+
void RunTest(ui32 initialBlobs, ui32 unsyncedBlobs, std::vector<ENodeState> nodeStates) {
22+
Y_VERIFY(nodeStates.size() == NodeCount);
23+
const ui64 blobSize = 10;
24+
const ui32 unsyncedBatchSize = 10000;
25+
Initialize();
26+
27+
ui64 tabletId = 5000;
28+
ui32 channel = 1;
29+
ui32 generation = 1;
30+
ui32 step = 1;
31+
ui32 perGenCtr = 1;
32+
33+
Ctest << "Write blobs" << Endl;
34+
std::vector<TLogoBlobID> blobs = WriteCompressedData(TDataProfile{
35+
.GroupId = GroupId,
36+
.TotalBlobs = initialBlobs,
37+
.BlobSize = blobSize,
38+
.TabletId = tabletId,
39+
.Channel = channel,
40+
.Generation = generation,
41+
.Step = step,
42+
});
43+
44+
auto collectEverything = [&](TVector<TLogoBlobID>* keepFlags, TVector<TLogoBlobID>* doNotKeepFlags) {
45+
Env->Runtime->WrapInActorContext(Edge, [&] {
46+
TString data;
47+
SendToBSProxy(Edge, GroupId, new TEvBlobStorage::TEvCollectGarbage(
48+
tabletId, generation, ++perGenCtr, channel, true, generation, step,
49+
keepFlags, doNotKeepFlags, TInstant::Max(), true, false));
50+
});
51+
Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvCollectGarbageResult>(
52+
Edge, false, TInstant::Max());
53+
};
54+
55+
Ctest << "Set Keep flags" << Endl;
56+
collectEverything(new TVector<TLogoBlobID>(blobs.begin(), blobs.end()), nullptr);
57+
58+
Ctest << "Wait for sync" << Endl;
59+
Env->Sim(TDuration::Minutes(30));
60+
61+
Ctest << "Shutdown and restart nodes" << Endl;
62+
for (ui32 nodeId = 1; nodeId <= NodeCount; ++nodeId) {
63+
switch (nodeStates[nodeId - 1]) {
64+
case ENodeState::Alive:
65+
break;
66+
case ENodeState::Dead:
67+
Env->StopNode(nodeId);
68+
break;
69+
case ENodeState::Restart:
70+
Env->StopNode(nodeId);
71+
Env->Sim(TDuration::Minutes(1));
72+
Env->StartNode(nodeId);
73+
break;
74+
}
75+
Env->Sim(TDuration::Minutes(1));
76+
}
77+
78+
Ctest << "Wait for sync" << Endl;
79+
Env->Sim(TDuration::Minutes(30));
80+
81+
AllocateEdgeActor(); // reallocate actor, in case it lived on a restarted or dead node
82+
83+
Ctest << "Set DoNotKeepFlags" << Endl;
84+
collectEverything(nullptr, new TVector<TLogoBlobID>(blobs.begin(), blobs.end()));
85+
86+
for (ui32 i = 0; i < unsyncedBlobs; i += unsyncedBatchSize) {
87+
Ctest << "Write batch, blobs written# " << i << Endl;
88+
generation += 10;
89+
std::vector<TLogoBlobID> batch = WriteCompressedData(TDataProfile{
90+
.GroupId = GroupId,
91+
.TotalBlobs = unsyncedBatchSize,
92+
.BlobSize = blobSize,
93+
.BatchSize = 1000,
94+
.TabletId = tabletId,
95+
.Channel = channel,
96+
.Generation = generation,
97+
.Step = step,
98+
});
99+
// collectEverything(new TVector<TLogoBlobID>(batch.begin(), batch.end()), nullptr);
100+
// collectEverything(nullptr, new TVector<TLogoBlobID>(batch.begin(), batch.end()));
101+
collectEverything(nullptr, nullptr);
102+
}
103+
104+
Ctest << "Wait for sync" << Endl;
105+
Env->Sim(TDuration::Minutes(30));
106+
107+
Ctest << "Enable nodes" << Endl;
108+
for (ui32 nodeId = 1; nodeId <= NodeCount; ++nodeId) {
109+
if (nodeStates[nodeId - 1] == ENodeState::Dead) {
110+
Env->StartNode(nodeId);
111+
}
112+
}
113+
114+
Ctest << "Wait for sync" << Endl;
115+
Env->Sim(TDuration::Minutes(30));
116+
117+
++generation;
118+
Ctest << "Move soft barrier" << Endl;
119+
collectEverything(nullptr, nullptr);
120+
121+
Env->Sim(TDuration::Minutes(30));
122+
123+
// Env->Runtime->FilterFunction = [&](ui32, std::unique_ptr<IEventHandle>& ev) {
124+
// Cerr << " --- " << ev->Sender << "->" << ev->Recipient << ev->ToString() << Endl;
125+
// return true;
126+
// };
127+
128+
auto status = GetGroupStatus(GroupId);
129+
Ctest << "Group status# " << status->ToString() << Endl;
130+
131+
Ctest << "Get group configuration" << Endl;
132+
133+
TIntrusivePtr<TBlobStorageGroupInfo> group = Env->GetGroupInfo(GroupId);
134+
135+
Ctest << "Check blobs" << Endl;
136+
for (ui32 orderNumber = 0; orderNumber < Erasure.BlobSubgroupSize(); ++orderNumber) {
137+
Ctest << "Check orderNumber# " << orderNumber << Endl;
138+
TVDiskID vdiskId = group->GetVDiskId(orderNumber);
139+
NKikimrBlobStorage::EVDiskQueueId queue = NKikimrBlobStorage::EVDiskQueueId::GetFastRead;
140+
Env->WithQueueId(vdiskId, queue, [&](TActorId queueId) {
141+
for (const TLogoBlobID& blob : blobs) {
142+
for (ui32 partIdx = 1; partIdx <= Erasure.BlobSubgroupSize(); ++partIdx) {
143+
auto ev = TEvBlobStorage::TEvVGet::CreateExtremeIndexQuery(vdiskId, TInstant::Max(),
144+
NKikimrBlobStorage::EGetHandleClass::FastRead);
145+
ev->AddExtremeQuery(blob, 0, 0);
146+
Env->Runtime->Send(new IEventHandle(queueId, Edge, ev.release()), Edge.NodeId());
147+
auto res = Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(Edge, false, TInstant::Max());
148+
auto record = res->Get()->Record;
149+
UNIT_ASSERT_VALUES_EQUAL_C(record.GetStatus(), NKikimrProto::OK, record.GetErrorReason());
150+
UNIT_ASSERT_C(record.ResultSize() == 1, res->ToString());
151+
UNIT_ASSERT_C(record.GetResult(0).GetStatus() == NKikimrProto::NODATA, res->ToString());
152+
UNIT_ASSERT_C(!record.GetResult(0).HasIngress(), res->ToString());
153+
}
154+
}
155+
TLogoBlobID from(tabletId, 0, 0, channel, 0, 0, 1);
156+
TLogoBlobID to(tabletId, generation + 100, 9000, channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId);
157+
});
158+
}
159+
}
160+
};
161+
162+
std::vector<ENodeState> GetStatesAllAlive(TBlobStorageGroupType erasure) {
163+
return std::vector<ENodeState>(erasure.BlobSubgroupSize(), ENodeState::Alive);
164+
}
165+
166+
std::vector<ENodeState> GetStatesOneDead(TBlobStorageGroupType erasure) {
167+
std::vector<ENodeState> states(erasure.BlobSubgroupSize(), ENodeState::Alive);
168+
states[0] = ENodeState::Dead;
169+
return states;
170+
}
171+
172+
std::vector<ENodeState> GetStatesTwoDead(TBlobStorageGroupType erasure) {
173+
std::vector<ENodeState> states(erasure.BlobSubgroupSize(), ENodeState::Alive);
174+
states[0] = ENodeState::Dead;
175+
states[4] = ENodeState::Dead;
176+
return states;
177+
}
178+
179+
std::vector<ENodeState> GetStatesOneDeadAllRestart(TBlobStorageGroupType erasure) {
180+
std::vector<ENodeState> states(erasure.BlobSubgroupSize(), ENodeState::Restart);
181+
states[0] = ENodeState::Dead;
182+
return states;
183+
}
184+
185+
void Test(TBlobStorageGroupType erasure, std::vector<ENodeState> nodeStates) {
186+
return; // require PhantomFlagStorage implementation
187+
auto it = std::find_if(nodeStates.begin(), nodeStates.end(),
188+
[&](const ENodeState& state) { return state != ENodeState::Dead; } );
189+
Y_VERIFY(it != nodeStates.end());
190+
ui32 controllerNodeId = it - nodeStates.begin() + 1;
191+
TTestCtx ctx({
192+
.NodeCount = erasure.BlobSubgroupSize(),
193+
.Erasure = erasure,
194+
.ControllerNodeId = controllerNodeId,
195+
.PDiskChunkSize = 32_MB,
196+
});
197+
ctx.RunTest(1000, 3'000'000, nodeStates);
198+
}
199+
200+
201+
#define TEST_PHANTOM_BLOBS(name, erasure) \
202+
Y_UNIT_TEST(Test##name##erasure) { \
203+
auto e = TBlobStorageGroupType::Erasure##erasure; \
204+
Test(e, GetStates##name(e)); \
205+
}
206+
207+
// TEST_PHANTOM_BLOBS(AllAlive, Mirror3dc);
208+
// TEST_PHANTOM_BLOBS(AllAlive, Mirror3of4);
209+
// TEST_PHANTOM_BLOBS(AllAlive, 4Plus2Block);
210+
211+
TEST_PHANTOM_BLOBS(OneDead, Mirror3dc);
212+
// TEST_PHANTOM_BLOBS(OneDead, Mirror3of4);
213+
// TEST_PHANTOM_BLOBS(OneDead, 4Plus2Block);
214+
215+
216+
TEST_PHANTOM_BLOBS(TwoDead, Mirror3dc);
217+
218+
// TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3dc);
219+
// TEST_PHANTOM_BLOBS(OneDeadAllRestart, Mirror3of4);
220+
// TEST_PHANTOM_BLOBS(OneDeadAllRestart, 4Plus2Block);
221+
222+
}

ydb/core/blobstorage/ut_blobstorage/replication.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ Y_UNIT_TEST_SUITE(ReplicationSpace) {
430430
.GroupId = groupId,
431431
.TotalSize = dataSize,
432432
.BlobSize = blobSize,
433-
.DelayBetweenPuts = TDuration::Seconds(1),
433+
.DelayBetweenBatches = TDuration::Seconds(1),
434434
.Erasure = erasure,
435435
.CookieStrategy = TTestCtxBase::TDataProfile::ECookieStrategy::WithSamePlacement,
436436
});

ydb/core/blobstorage/ut_blobstorage/ut_helpers.h

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -318,11 +318,13 @@ struct TTestCtxBase {
318318

319319
public:
320320
ui32 GroupId;
321-
ui64 TotalSize;
321+
std::optional<ui64> TotalSize;
322+
std::optional<ui32> TotalBlobs;
322323
ui64 BlobSize;
324+
ui32 BatchSize = 1;
323325
EContentType ContentType = EContentType::Zeros;
324326

325-
TDuration DelayBetweenPuts = TDuration::Zero();
327+
TDuration DelayBetweenBatches = TDuration::Zero();
326328

327329
// must be specified when using ECookieStrategy::WithSamePlacement
328330
std::optional<TBlobStorageGroupType> Erasure = std::nullopt;
@@ -354,13 +356,23 @@ struct TTestCtxBase {
354356
};
355357

356358
std::vector<TLogoBlobID> WriteCompressedData(TDataProfile profile) {
359+
Y_VERIFY(profile.TotalSize || profile.TotalBlobs);
357360
std::vector<TLogoBlobID> blobs;
358361

359362
static ui64 cookie = 0;
363+
std::vector<TLogoBlobID> batch;
360364

361-
for (ui64 size = 0; size < profile.TotalSize; size += profile.BlobSize) {
365+
ui32 blobsToWrite;
366+
367+
if (profile.TotalBlobs) {
368+
blobsToWrite = *profile.TotalBlobs;
369+
} else {
370+
blobsToWrite = (*profile.TotalSize / profile.BlobSize) + !!(*profile.TotalSize % profile.BlobSize);
371+
}
372+
373+
for (ui64 i = 0; i < blobsToWrite; ++i) {
362374
cookie = profile.NextCookie(cookie);
363-
blobs.emplace_back(profile.TabletId, profile.Generation, profile.Step, profile.Channel,
375+
batch.emplace_back(profile.TabletId, profile.Generation, profile.Step, profile.Channel,
364376
profile.BlobSize, cookie);
365377

366378
Env->Runtime->WrapInActorContext(Edge, [&] {
@@ -377,17 +389,24 @@ struct TTestCtxBase {
377389
Y_FAIL();
378390
}
379391

380-
SendToBSProxy(Edge, profile.GroupId, new TEvBlobStorage::TEvPut(blobs.back(), data, TInstant::Max()),
392+
SendToBSProxy(Edge, profile.GroupId, new TEvBlobStorage::TEvPut(batch.back(), data, TInstant::Max()),
381393
NKikimrBlobStorage::TabletLog);
382394
});
383395

384-
auto res = Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(
385-
Edge, false, TInstant::Max());
386-
// Cerr << "Write data " << size << " " << res->Get()->ToString()<< Endl;
387-
UNIT_ASSERT(res->Get()->Status == NKikimrProto::OK);
388396

389-
if (profile.DelayBetweenPuts != TDuration::Zero()) {
390-
Env->Sim(profile.DelayBetweenPuts);
397+
if (batch.size() == profile.BatchSize || i == blobsToWrite - 1) {
398+
for (ui32 i = 0; i < batch.size(); ++i) {
399+
auto res = Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(
400+
Edge, false, TInstant::Max());
401+
UNIT_ASSERT(res->Get()->Status == NKikimrProto::OK);
402+
}
403+
404+
if (profile.DelayBetweenBatches != TDuration::Zero()) {
405+
Env->Sim(profile.DelayBetweenBatches);
406+
}
407+
408+
blobs.insert(blobs.end(), batch.begin(), batch.end());
409+
batch.clear();
391410
}
392411
}
393412
return blobs;

ydb/core/blobstorage/ut_blobstorage/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ SRCS(
3939
monitoring.cpp
4040
multiget.cpp
4141
patch.cpp
42+
phantom_blobs.cpp
4243
recovery.cpp
4344
sanitize_groups.cpp
4445
scrub_fast.cpp

0 commit comments

Comments
 (0)