Skip to content

Commit 1727b44

Browse files
authored
Add DS Proxy deadline tests (#11749)
1 parent e34944a commit 1727b44

File tree

4 files changed

+332
-128
lines changed

4 files changed

+332
-128
lines changed

ydb/core/blobstorage/ut_blobstorage/acceleration.cpp

Lines changed: 2 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
2+
#include <ydb/core/blobstorage/ut_blobstorage/lib/vdisk_delay_emulator.h>
23
#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
34
#include <ydb/core/blobstorage/dsproxy/group_sessions.h>
45

@@ -10,133 +11,6 @@
1011
#define Ctest Cnull
1112

1213
Y_UNIT_TEST_SUITE(Acceleration) {
13-
using TFlowRecord = TIntrusivePtr<NBackpressure::TFlowRecord>;
14-
using TQueueId = NKikimrBlobStorage::EVDiskQueueId;
15-
16-
struct TDiskDelay {
17-
TWeightedRandom<TDuration> Delays;
18-
TDuration Max;
19-
TString Tag;
20-
21-
TDiskDelay(TDuration delay = TDuration::Zero(), TString tag = "")
22-
: Max(delay)
23-
, Tag(tag)
24-
{
25-
Delays.AddValue(delay, 1);
26-
}
27-
28-
TDiskDelay(TDuration min, ui64 minWeight, TDuration max, ui64 maxWeight, TString tag = "")
29-
: Max(max)
30-
, Tag(tag)
31-
{
32-
Delays.AddValue(min, minWeight);
33-
Delays.AddValue(max, maxWeight);
34-
}
35-
36-
TDiskDelay(const TDiskDelay&) = default;
37-
TDiskDelay(TDiskDelay&&) = default;
38-
TDiskDelay& operator=(const TDiskDelay&) = default;
39-
TDiskDelay& operator=(TDiskDelay&&) = default;
40-
41-
TDuration GetRandom() {
42-
return Delays.GetRandom();
43-
}
44-
};
45-
46-
struct TEvDelayedMessageWrapper : public TEventLocal<TEvDelayedMessageWrapper, TEvBlobStorage::EvDelayedMessageWrapper> {
47-
public:
48-
std::unique_ptr<IEventHandle> Event;
49-
50-
TEvDelayedMessageWrapper(std::unique_ptr<IEventHandle>& ev)
51-
: Event(ev.release())
52-
{}
53-
};
54-
55-
struct TVDiskDelayEmulator {
56-
TVDiskDelayEmulator(const std::shared_ptr<TEnvironmentSetup>& env)
57-
: Env(env)
58-
{}
59-
60-
using TFlowKey = std::pair<ui32, TQueueId>; // { nodeId, queueId }
61-
62-
std::shared_ptr<TEnvironmentSetup> Env;
63-
TActorId Edge;
64-
// assuming there is only one disk per node
65-
std::unordered_map<TFlowKey, TFlowRecord> FlowRecords;
66-
67-
std::unordered_map<ui32, TDiskDelay> DelayByNode;
68-
std::deque<TDiskDelay> DelayByResponseOrder;
69-
TDiskDelay DefaultDelay = TDuration::Seconds(1);
70-
bool LogUnwrap = false;
71-
72-
using TEventHandler = std::function<bool(std::unique_ptr<IEventHandle>&)>;
73-
74-
std::unordered_map<ui32, TEventHandler> EventHandlers;
75-
76-
void AddHandler(ui32 eventType, TEventHandler handler) {
77-
EventHandlers[eventType] = handler;
78-
}
79-
80-
bool Filter(ui32/* nodeId*/, std::unique_ptr<IEventHandle>& ev) {
81-
if (ev->GetTypeRewrite() == TEvDelayedMessageWrapper::EventType) {
82-
std::unique_ptr<IEventHandle> delayedMsg(std::move(ev));
83-
ev.reset(delayedMsg->Get<TEvDelayedMessageWrapper>()->Event.release());
84-
if (LogUnwrap) {
85-
Ctest << TAppData::TimeProvider->Now() << " Unwrap " << ev->ToString() << Endl;
86-
}
87-
return true;
88-
}
89-
90-
ui32 type = ev->GetTypeRewrite();
91-
auto it = EventHandlers.find(type);
92-
if (it != EventHandlers.end() && it->second) {
93-
return (it->second)(ev);
94-
}
95-
return true;
96-
}
97-
98-
TDuration GetMsgDelay(ui32 vdiskNodeId) {
99-
TDiskDelay& delay = DefaultDelay;
100-
auto it = DelayByNode.find(vdiskNodeId);
101-
if (it == DelayByNode.end()) {
102-
if (!DelayByResponseOrder.empty()) {
103-
delay = DelayByResponseOrder.front();
104-
DelayByResponseOrder.pop_front();
105-
}
106-
DelayByNode[vdiskNodeId] = delay;
107-
} else {
108-
delay = it->second;
109-
}
110-
TDuration rand = delay.GetRandom();
111-
return rand;
112-
}
113-
114-
TDuration DelayMsg(std::unique_ptr<IEventHandle>& ev) {
115-
TDuration delay = GetMsgDelay(ev->Sender.NodeId());
116-
117-
Env->Runtime->WrapInActorContext(Edge, [&] {
118-
TActivationContext::Schedule(delay, new IEventHandle(
119-
ev->Sender,
120-
ev->Recipient,
121-
new TEvDelayedMessageWrapper(ev))
122-
);
123-
});
124-
return delay;
125-
}
126-
127-
void SetDelayByResponseOrder(const std::deque<TDiskDelay>& delays) {
128-
DelayByResponseOrder = delays;
129-
DelayByNode = {};
130-
}
131-
};
132-
133-
struct TDelayer {
134-
std::shared_ptr<TVDiskDelayEmulator> VDiskDelayEmulator;
135-
136-
bool operator()(ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
137-
return VDiskDelayEmulator->Filter(nodeId, ev);
138-
}
139-
};
14014

14115
struct TestCtx {
14216
TestCtx(const TBlobStorageGroupType& erasure, float slowDiskThreshold, float delayMultiplier,
@@ -196,7 +70,7 @@ Y_UNIT_TEST_SUITE(Acceleration) {
19670
});
19771
auto res = Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(Edge, false, TInstant::Max());
19872

199-
Env->Runtime->FilterFunction = TDelayer{ .VDiskDelayEmulator = VDiskDelayEmulator };
73+
Env->Runtime->FilterFunction = TDelayFilterFunctor{ .VDiskDelayEmulator = VDiskDelayEmulator };
20074

20175
for (const TQueueId& queueId : { TQueueId::PutTabletLog, TQueueId::GetFastRead, TQueueId::PutAsyncBlob,
20276
TQueueId::GetAsyncRead }) {
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
2+
#include <ydb/core/blobstorage/ut_blobstorage/lib/vdisk_delay_emulator.h>
3+
#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
4+
5+
#include <util/stream/null.h>
6+
7+
#include "ut_helpers.h"
8+
9+
#define Ctest Cnull
10+
11+
Y_UNIT_TEST_SUITE(Deadlines) {
12+
13+
struct TestCtx {
14+
TestCtx(const TBlobStorageGroupType& erasure, TDuration vdiskDelay)
15+
: NodeCount(erasure.BlobSubgroupSize() + 1)
16+
, Erasure(erasure)
17+
, Env(new TEnvironmentSetup({
18+
.NodeCount = NodeCount,
19+
.Erasure = erasure,
20+
.LocationGenerator = [this](ui32 nodeId) { return LocationGenerator(nodeId); },
21+
}))
22+
, VDiskDelay(vdiskDelay)
23+
, VDiskDelayEmulator(new TVDiskDelayEmulator(Env))
24+
{}
25+
26+
TNodeLocation LocationGenerator(ui32 nodeId) {
27+
if (Erasure.BlobSubgroupSize() == 9) {
28+
if (nodeId == NodeCount) {
29+
return TNodeLocation{"4", "1", "1", "1"};
30+
}
31+
return TNodeLocation{
32+
std::to_string((nodeId - 1) / 3),
33+
"1",
34+
std::to_string((nodeId - 1) % 3),
35+
"0"
36+
};
37+
} else {
38+
if (nodeId == NodeCount) {
39+
return TNodeLocation{"2", "1", "1", "1"};
40+
}
41+
return TNodeLocation{"1", "1", std::to_string(nodeId), "0"};
42+
}
43+
}
44+
45+
void Initialize() {
46+
Env->CreateBoxAndPool(1, 1);
47+
Env->Sim(TDuration::Minutes(1));
48+
49+
NKikimrBlobStorage::TBaseConfig base = Env->FetchBaseConfig();
50+
UNIT_ASSERT_VALUES_EQUAL(base.GroupSize(), 1);
51+
const auto& group = base.GetGroup(0);
52+
GroupId = group.GetGroupId();
53+
54+
Edge = Env->Runtime->AllocateEdgeActor(NodeCount);
55+
VDiskDelayEmulator->Edge = Edge;
56+
57+
std::unordered_map<ui32, ui32> OrderNumberToNodeId;
58+
59+
for (ui32 orderNum = 0; orderNum < group.VSlotIdSize(); ++orderNum) {
60+
OrderNumberToNodeId[orderNum] = group.GetVSlotId(orderNum).GetNodeId();
61+
}
62+
63+
Env->Runtime->WrapInActorContext(Edge, [&] {
64+
SendToBSProxy(Edge, GroupId, new TEvBlobStorage::TEvStatus(TInstant::Max()));
65+
});
66+
auto res = Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(Edge, false, TInstant::Max());
67+
VDiskDelayEmulator->DefaultDelay = VDiskDelay;
68+
69+
Env->Runtime->FilterFunction = TDelayFilterFunctor{ .VDiskDelayEmulator = VDiskDelayEmulator };
70+
71+
}
72+
73+
~TestCtx() {
74+
Env->Runtime->FilterFunction = {};
75+
}
76+
77+
ui32 NodeCount;
78+
TBlobStorageGroupType Erasure;
79+
std::shared_ptr<TEnvironmentSetup> Env;
80+
81+
ui32 GroupId;
82+
TActorId Edge;
83+
TDuration VDiskDelay;
84+
std::shared_ptr<TVDiskDelayEmulator> VDiskDelayEmulator;
85+
};
86+
87+
void TestPut(const TBlobStorageGroupType& erasure, TDuration delay, TDuration timeout) {
88+
Y_ABORT_UNLESS(timeout < delay);
89+
90+
TestCtx ctx(erasure, delay);
91+
ctx.Initialize();
92+
93+
TInstant now = TAppData::TimeProvider->Now();
94+
95+
ctx.VDiskDelayEmulator->LogUnwrap = true;
96+
ctx.VDiskDelayEmulator->AddHandler(TEvBlobStorage::TEvVPutResult::EventType, [&](std::unique_ptr<IEventHandle>& ev) {
97+
ui32 nodeId = ev->Sender.NodeId();
98+
if (nodeId < ctx.NodeCount) {
99+
ctx.VDiskDelayEmulator->DelayMsg(ev);
100+
return false;
101+
}
102+
return true;
103+
});
104+
105+
ctx.Env->Runtime->WrapInActorContext(ctx.Edge, [&] {
106+
TString data = MakeData(1000);
107+
TLogoBlobID blobId(1, 1, 1, 1, data.size(), 123);
108+
TEvBlobStorage::TEvPut* ev = new TEvBlobStorage::TEvPut(blobId, data, now + timeout);
109+
SendToBSProxy(ctx.Edge, ctx.GroupId, ev, NKikimrBlobStorage::TabletLog);
110+
});
111+
112+
TInstant t1 = now + timeout / 2; // Still waiting for vdisks
113+
TInstant t2 = now + (timeout + delay) / 2; // Should hit deadline already
114+
115+
{
116+
auto res = ctx.Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(
117+
ctx.Edge, false, t1);
118+
UNIT_ASSERT_C(!res, "Premature response, now# " << TAppData::TimeProvider->Now());
119+
}
120+
121+
{
122+
auto res = ctx.Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(
123+
ctx.Edge, false, t2);
124+
UNIT_ASSERT(res);
125+
UNIT_ASSERT(res->Get()->Status == NKikimrProto::DEADLINE);
126+
}
127+
}
128+
129+
void TestGet(const TBlobStorageGroupType& erasure, TDuration delay, TDuration timeout) {
130+
Y_ABORT_UNLESS(timeout < delay);
131+
132+
TestCtx ctx(erasure, delay);
133+
ctx.Initialize();
134+
135+
TInstant now = TAppData::TimeProvider->Now();
136+
137+
ctx.VDiskDelayEmulator->LogUnwrap = true;
138+
ctx.VDiskDelayEmulator->AddHandler(TEvBlobStorage::TEvVGetResult::EventType, [&](std::unique_ptr<IEventHandle>& ev) {
139+
ui32 nodeId = ev->Sender.NodeId();
140+
if (nodeId < ctx.NodeCount) {
141+
ctx.VDiskDelayEmulator->DelayMsg(ev);
142+
return false;
143+
}
144+
return true;
145+
});
146+
147+
{
148+
TString data = MakeData(1000);
149+
TLogoBlobID blobId(1, 1, 1, 1, data.size(), 123);
150+
ctx.Env->Runtime->WrapInActorContext(ctx.Edge, [&] {
151+
TEvBlobStorage::TEvPut* ev = new TEvBlobStorage::TEvPut(blobId, data, now + timeout);
152+
SendToBSProxy(ctx.Edge, ctx.GroupId, ev, NKikimrBlobStorage::TabletLog);
153+
});
154+
auto putRes = ctx.Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(ctx.Edge, false, TInstant::Max());
155+
UNIT_ASSERT_VALUES_EQUAL(putRes->Get()->Status, NKikimrProto::OK);
156+
157+
ctx.Env->Runtime->WrapInActorContext(ctx.Edge, [&] {
158+
SendToBSProxy(ctx.Edge, ctx.GroupId, new TEvBlobStorage::TEvGet(blobId, 0, data.size(), now + timeout,
159+
NKikimrBlobStorage::FastRead));
160+
});
161+
}
162+
163+
TInstant t1 = now + timeout / 2; // Still waiting for vdisks
164+
TInstant t2 = now + (timeout + delay) / 2; // Should hit deadline already
165+
// TInstant t2 = now + delay + TDuration::Seconds(1);
166+
167+
{
168+
auto res = ctx.Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(
169+
ctx.Edge, false, t1);
170+
UNIT_ASSERT_C(!res, "Premature response, now# " << TAppData::TimeProvider->Now());
171+
}
172+
173+
{
174+
auto res = ctx.Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(
175+
ctx.Edge, false, t2);
176+
UNIT_ASSERT(res);
177+
UNIT_ASSERT(res->Get()->Status == NKikimrProto::DEADLINE);
178+
}
179+
}
180+
181+
#define TEST_DEADLINE(method, erasure) \
182+
Y_UNIT_TEST(Test##method##erasure) { \
183+
Test##method(TBlobStorageGroupType::Erasure##erasure, \
184+
TDuration::Seconds(50), TDuration::Seconds(40)); \
185+
}
186+
187+
TEST_DEADLINE(Put, Mirror3dc);
188+
TEST_DEADLINE(Put, 4Plus2Block);
189+
TEST_DEADLINE(Put, Mirror3of4);
190+
191+
TEST_DEADLINE(Get, Mirror3dc);
192+
TEST_DEADLINE(Get, 4Plus2Block);
193+
TEST_DEADLINE(Get, Mirror3of4);
194+
195+
#undef TEST_DEADLINE
196+
}

0 commit comments

Comments
 (0)