Skip to content

Commit 39f6824

Browse files
authored
add pile drain (#20109)
1 parent 272ffc2 commit 39f6824

File tree

12 files changed

+230
-60
lines changed

12 files changed

+230
-60
lines changed

ydb/core/mind/hive/bridge_pile_info.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ struct TBridgePileInfo {
77
NKikimrBridge::TClusterState::EPileState State = NKikimrBridge::TClusterState::DISCONNECTED;
88
bool IsPrimary = false;
99
bool IsPromoted = false;
10+
bool Drain = false;
11+
TVector<TActorId> DrainInitiators; // Currently always empty
1012

1113
std::unordered_set<TNodeId> Nodes;
1214

@@ -26,5 +28,9 @@ struct TBridgePileInfo {
2628
&& IsPrimary == other.IsPrimary
2729
&& IsPromoted == other.IsPromoted;
2830
}
31+
32+
ui32 GetId() const {
33+
return Id.GetRawId();
34+
}
2935
};
3036
} // namespace NKikimr::NHive

ydb/core/mind/hive/drain.cpp

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <ydb/library/actors/core/actor_bootstrapped.h>
2+
#include "drain.h"
23
#include "hive_impl.h"
34
#include "hive_log.h"
45
#include "node_info.h"
@@ -14,8 +15,8 @@ class THiveDrain : public NActors::TActorBootstrapped<THiveDrain>, public ISubAc
1415
TVector<TFullTabletId>::iterator NextKick;
1516
ui32 KickInFlight;
1617
ui32 Movements;
17-
TNodeId NodeId;
18-
bool DownBefore = false;
18+
TDrainTarget Target;
19+
std::unordered_map<TNodeId, bool> DownBefore;
1920
TActorId DomainHivePipeClient;
2021
TTabletId DomainHiveId = 0;
2122
ui32 DomainMovements = 0;
@@ -31,7 +32,9 @@ class THiveDrain : public NActors::TActorBootstrapped<THiveDrain>, public ISubAc
3132
NTabletPipe::CloseClient(SelfId(), DomainHivePipeClient);
3233
}
3334
Hive->RemoveSubActor(this);
34-
Hive->BalancerNodes.erase(NodeId);
35+
for (TGetNodes getNodes{Hive}; auto nodeId : std::visit(getNodes, Target)) {
36+
Hive->BalancerNodes.erase(nodeId);
37+
}
3538
return IActor::PassAway();
3639
}
3740

@@ -40,7 +43,7 @@ class THiveDrain : public NActors::TActorBootstrapped<THiveDrain>, public ISubAc
4043
}
4144

4245
TString GetDescription() const override {
43-
return TStringBuilder() << "Drain(" << NodeId << ")";
46+
return TStringBuilder() << "Drain(" << Target << ")";
4447
}
4548

4649
TSubActorId GetId() const override {
@@ -49,13 +52,15 @@ class THiveDrain : public NActors::TActorBootstrapped<THiveDrain>, public ISubAc
4952

5053
void ReplyAndDie(NKikimrProto::EReplyStatus status) {
5154
BLOG_I("Drain " << SelfId() << " finished with " << Movements << " movements made");
52-
TNodeInfo* nodeInfo = Hive->FindNode(NodeId);
53-
if (nodeInfo != nullptr) {
54-
if (!DownBefore) {
55-
nodeInfo->SetDown(false);
55+
for (auto [nodeId, downBefore] : DownBefore) {
56+
TNodeInfo* nodeInfo = Hive->FindNode(nodeId);
57+
if (nodeInfo != nullptr) {
58+
if (!downBefore) {
59+
nodeInfo->SetDown(false);
60+
}
5661
}
5762
}
58-
Hive->Execute(Hive->CreateSwitchDrainOff(NodeId, std::move(Settings), status, Movements + DomainMovements));
63+
Hive->Execute(Hive->CreateSwitchDrainOff(Target, std::move(Settings), status, Movements + DomainMovements));
5964
PassAway();
6065
}
6166

@@ -68,7 +73,7 @@ class THiveDrain : public NActors::TActorBootstrapped<THiveDrain>, public ISubAc
6873
while (CanKickNextTablet()) {
6974
TFullTabletId tabletId = *NextKick;
7075
TTabletInfo* tablet = Hive->FindTablet(tabletId);
71-
if (tablet != nullptr && tablet->IsAlive() && tablet->NodeId == NodeId) {
76+
if (tablet != nullptr && tablet->IsAlive() && std::visit(TGetNodes(Hive), Target).contains(tablet->NodeId)) {
7277
THive::TBestNodeResult result = Hive->FindBestNode(*tablet);
7378
if (std::holds_alternative<TNodeInfo*>(result)) {
7479
TNodeInfo* node = std::get<TNodeInfo*>(result);
@@ -118,7 +123,7 @@ class THiveDrain : public NActors::TActorBootstrapped<THiveDrain>, public ISubAc
118123

119124
void Handle(TEvHive::TEvDrainNodeResult::TPtr& ev) {
120125
BLOG_D("Drain " << SelfId() << " received status from domain hive " << ev->Get()->Record.ShortDebugString());
121-
BLOG_I("Drain " << SelfId() << " continued for node " << NodeId << " with " << Tablets.size() << " tablets");
126+
BLOG_I("Drain " << SelfId() << " continued for " << Target << " with " << Tablets.size() << " tablets");
122127
DomainDrainCompleted(ev->Get()->Record.GetMovements());
123128
}
124129

@@ -135,25 +140,26 @@ class THiveDrain : public NActors::TActorBootstrapped<THiveDrain>, public ISubAc
135140
if (DomainHivePipeClient) {
136141
NTabletPipe::CloseClient(SelfId(), DomainHivePipeClient);
137142
}
138-
RequestDrainFromDomainHive();
143+
Y_ABORT_UNLESS(std::holds_alternative<TNodeId>(Target));
144+
RequestDrainFromDomainHive(std::get<TNodeId>(Target));
139145
}
140146
}
141147

142148
void Timeout() {
143149
ReplyAndDie(NKikimrProto::TIMEOUT);
144150
}
145151

146-
void RequestDrainFromDomainHive() {
152+
void RequestDrainFromDomainHive(TNodeId nodeId) {
147153
NTabletPipe::TClientConfig pipeConfig;
148154
pipeConfig.RetryPolicy = {.RetryLimitCount = 13};
149155
DomainHivePipeClient = Register(NTabletPipe::CreateClient(SelfId(), DomainHiveId, pipeConfig));
150-
THolder<TEvHive::TEvDrainNode> event = MakeHolder<TEvHive::TEvDrainNode>(NodeId);
156+
THolder<TEvHive::TEvDrainNode> event = MakeHolder<TEvHive::TEvDrainNode>(nodeId);
151157
event->Record.SetDownPolicy(Settings.DownPolicy);
152158
event->Record.SetPersist(Settings.Persist);
153159
event->Record.SetDrainInFlight(Settings.DrainInFlight);
154160
event->Record.SetSeqNo(SeqNo);
155161
NTabletPipe::SendData(SelfId(), DomainHivePipeClient, event.Release());
156-
BLOG_I("Drain " << SelfId() << " forwarded for node " << NodeId << " to hive " << DomainHiveId);
162+
BLOG_I("Drain " << SelfId() << " forwarded for node " << nodeId << " to hive " << DomainHiveId);
157163
}
158164

159165

@@ -162,53 +168,56 @@ class THiveDrain : public NActors::TActorBootstrapped<THiveDrain>, public ISubAc
162168
return NKikimrServices::TActivity::HIVE_BALANCER_ACTOR;
163169
}
164170

165-
THiveDrain(THive* hive, TNodeId nodeId, TDrainSettings settings)
171+
THiveDrain(THive* hive, TDrainTarget target, TDrainSettings settings)
166172
: Hive(hive)
167173
, NextKick(Tablets.end())
168174
, KickInFlight(0)
169175
, Movements(0)
170-
, NodeId(nodeId)
176+
, Target(target)
171177
, Settings(std::move(settings))
172178
{}
173179

174180
void Bootstrap() {
175-
TNodeInfo* nodeInfo = Hive->FindNode(NodeId);
176-
if (nodeInfo != nullptr) {
177-
{
178-
Tablets.reserve(nodeInfo->GetTabletsRunning());
181+
for (TGetNodes getNodes{Hive}; auto nodeId : std::visit(getNodes, Target)) {
182+
TNodeInfo* nodeInfo = Hive->FindNode(nodeId);
183+
if (nodeInfo != nullptr) {
184+
Tablets.reserve(Tablets.size() + nodeInfo->GetTabletsRunning());
179185
for (const auto& [object, tablets] : nodeInfo->TabletsOfObject) {
180186
for (TTabletInfo* tabletInfo : tablets) {
181-
if (tabletInfo->GetVolatileState() == TTabletInfo::EVolatileState::TABLET_VOLATILE_STATE_RUNNING) {
187+
if (tabletInfo->CanBeAlive()) {
182188
Tablets.push_back(tabletInfo->GetFullTabletId());
183189
}
184190
}
185191
}
186-
}
187-
NextKick = Tablets.begin();
188-
DownBefore = nodeInfo->Down;
189-
if (!DownBefore) {
192+
DownBefore[nodeId] = nodeInfo->Down;
190193
nodeInfo->SetDown(true);
194+
} else {
195+
return ReplyAndDie(NKikimrProto::ERROR);
191196
}
197+
}
198+
NextKick = Tablets.begin();
199+
if (Settings.Forward) {
200+
Y_ABORT_UNLESS(std::holds_alternative<TNodeId>(Target));
201+
TNodeId nodeId = std::get<TNodeId>(Target);
202+
TNodeInfo* nodeInfo = Hive->FindNode(nodeId);
192203
SeqNo = nodeInfo->DrainSeqNo;
193204

194205
if (nodeInfo->ServicedDomains.size() == 1) {
195206
TDomainInfo* domainInfo = Hive->FindDomain(nodeInfo->ServicedDomains.front());
196207
if (domainInfo != nullptr) {
197208
if (domainInfo->HiveId != 0 && domainInfo->HiveId != Hive->TabletID()) {
198209
DomainHiveId = domainInfo->HiveId;
199-
RequestDrainFromDomainHive();
210+
RequestDrainFromDomainHive(nodeId);
200211
Become(&THiveDrain::StateWork, TDuration::MilliSeconds(TIMEOUT), new TEvents::TEvWakeup());
201212
return;
202213
}
203214
}
204215
}
205-
206-
Become(&THiveDrain::StateWork, TDuration::MilliSeconds(TIMEOUT), new TEvents::TEvWakeup());
207-
BLOG_I("Drain " << SelfId() << " started for node " << NodeId << " with " << Tablets.size() << " tablets");
208-
KickNextTablet();
209-
} else {
210-
ReplyAndDie(NKikimrProto::ERROR);
211216
}
217+
218+
Become(&THiveDrain::StateWork, TDuration::MilliSeconds(TIMEOUT), new TEvents::TEvWakeup());
219+
BLOG_I("Drain " << SelfId() << " started for " << Target << " with " << Tablets.size() << " tablets");
220+
KickNextTablet();
212221
}
213222

214223
STATEFN(StateWork) {
@@ -224,13 +233,19 @@ class THiveDrain : public NActors::TActorBootstrapped<THiveDrain>, public ISubAc
224233
}
225234
};
226235

227-
void THive::StartHiveDrain(TNodeId nodeId, TDrainSettings settings) {
228-
if (BalancerNodes.emplace(nodeId).second) {
229-
auto* balancer = new THiveDrain(this, nodeId, std::move(settings));
236+
void THive::StartHiveDrain(TDrainTarget target, TDrainSettings settings) {
237+
bool shouldStart = false;
238+
for (TGetNodes getNodes{this}; auto nodeId : std::visit(getNodes, target)) {
239+
if (BalancerNodes.emplace(nodeId).second) {
240+
shouldStart = true;
241+
}
242+
}
243+
if (shouldStart) {
244+
auto* balancer = new THiveDrain(this, target, std::move(settings));
230245
SubActors.emplace_back(balancer);
231246
RegisterWithSameMailbox(balancer);
232247
} else {
233-
BLOG_W("It's not possible to start drain on node " << nodeId << ", the node is already busy");
248+
BLOG_W("It's not possible to start drain on " << target << ", it is already busy");
234249
}
235250
}
236251

ydb/core/mind/hive/drain.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
3+
#include <util/generic/overloaded.h>
4+
5+
#include "hive_impl.h"
6+
7+
namespace NKikimr::NHive {
8+
9+
struct TGetNodes {
10+
private:
11+
THive* const Hive;
12+
std::unordered_set<TNodeId> Node;
13+
14+
public:
15+
explicit TGetNodes(THive* Hive)
16+
: Hive(Hive)
17+
{}
18+
19+
const std::unordered_set<TNodeId>& operator()(TNodeId nodeId) {
20+
Node = {nodeId};
21+
return Node;
22+
}
23+
24+
const std::unordered_set<TNodeId>& operator()(TBridgePileId pileId) {
25+
return Hive->GetPile(pileId).Nodes;
26+
}
27+
};
28+
29+
} // namespace NKikimr::NHive
30+
31+
Y_DECLARE_OUT_SPEC(inline, NKikimr::NHive::TDrainTarget, o, drainTarget) {
32+
std::visit(TOverloaded{
33+
[&](NKikimr::NHive::TNodeId nodeId) { o << "node " << nodeId; },
34+
[&](NKikimr::TBridgePileId pileId) { o << "pile " << pileId; }
35+
}, drainTarget);
36+
}

ydb/core/mind/hive/hive.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,10 +286,13 @@ struct THiveSharedSettings {
286286
}
287287
};
288288

289+
using TDrainTarget = std::variant<TNodeId, TBridgePileId>;
290+
289291
struct TDrainSettings {
290292
bool Persist = true;
291293
NKikimrHive::EDrainDownPolicy DownPolicy = NKikimrHive::EDrainDownPolicy::DRAIN_POLICY_KEEP_DOWN_UNTIL_RESTART;
292294
ui32 DrainInFlight = 0;
295+
bool Forward = true;
293296
};
294297

295298
struct TBalancerSettings {

ydb/core/mind/hive/hive_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
253253
bool IsSafeOperation(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);
254254
bool IsItPossibleToStartBalancer(EBalancerType balancerType);
255255
void StartHiveBalancer(TBalancerSettings&& settings);
256-
void StartHiveDrain(TNodeId nodeId, TDrainSettings settings);
256+
void StartHiveDrain(TDrainTarget target, TDrainSettings settings);
257257
void StartHiveFill(TNodeId nodeId, const TActorId& initiator);
258258
void StartHiveStorageBalancer(TStorageBalancerSettings settings);
259259
void CreateEvMonitoring(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);
@@ -304,7 +304,7 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
304304
ITransaction* CreateReleaseTabletsReply(TEvHive::TEvReleaseTabletsReply::TPtr event);
305305
ITransaction* CreateConfigureSubdomain(TEvHive::TEvConfigureHive::TPtr event);
306306
ITransaction* CreateSwitchDrainOn(TNodeId nodeId, TDrainSettings settings, const TActorId& initiator, ui64 seqNo = 0);
307-
ITransaction* CreateSwitchDrainOff(TNodeId nodeId, TDrainSettings settings, NKikimrProto::EReplyStatus status, ui32 movements);
307+
ITransaction* CreateSwitchDrainOff(TDrainTarget target, TDrainSettings settings, NKikimrProto::EReplyStatus status, ui32 movements);
308308
ITransaction* CreateTabletOwnersReply(TEvHive::TEvTabletOwnersReply::TPtr event);
309309
ITransaction* CreateRequestTabletOwners(TEvHive::TEvRequestTabletOwners::TPtr event);
310310
ITransaction* CreateUpdateTabletsObject(TEvHive::TEvUpdateTabletsObject::TPtr event);

ydb/core/mind/hive/hive_schema.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,10 @@ struct Schema : NIceDb::Schema {
326326
struct State : Column<2, NScheme::NTypeIds::Uint32> { using Type = NKikimrBridge::TClusterState::EPileState; };
327327
struct IsPrimary : Column<3, NScheme::NTypeIds::Bool> {};
328328
struct IsPromoted : Column<4, NScheme::NTypeIds::Bool> {};
329+
struct Drain : Column<5, NScheme::NTypeIds::Bool> { static constexpr bool Default = false; };
329330

330331
using TKey = TableKey<Id>;
331-
using TColumns = TableColumns<Id, State, IsPrimary, IsPromoted>;
332+
using TColumns = TableColumns<Id, State, IsPrimary, IsPromoted, Drain>;
332333
};
333334

334335
using TTables = SchemaTables<

ydb/core/mind/hive/hive_ut.cpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7949,6 +7949,78 @@ Y_UNIT_TEST_SUITE(THiveTest) {
79497949
TestBridgeDisconnect(runtime, bridge, NUM_TABLETS, activeZone);
79507950
});
79517951
}
7952+
7953+
Y_UNIT_TEST(TestBridgeDemotion) {
7954+
static constexpr ui32 NUM_NODES = 4;
7955+
static constexpr ui32 NUM_TABLETS = 4;
7956+
TTestBasicRuntime runtime(NUM_NODES, false);
7957+
TDummyBridge bridge(runtime);
7958+
Setup(runtime, true, 2, [](TAppPrepare& app) {
7959+
app.HiveConfig.SetDrainInflight(2);
7960+
});
7961+
const ui64 hiveTablet = MakeDefaultHiveID();
7962+
const ui64 testerTablet = MakeTabletID(false, 1);
7963+
const TActorId senderA = runtime.AllocateEdgeActor(0);
7964+
const int nodeBase = runtime.GetNodeId(0);
7965+
const ui32 numNodes = runtime.GetNodeCount();
7966+
CreateTestBootstrapper(runtime, CreateTestTabletInfo(hiveTablet, TTabletTypes::Hive), &CreateDefaultHive);
7967+
MakeSureTabletIsUp(runtime, hiveTablet, 0);
7968+
bridge.Subscribe(GetHiveActor(runtime, hiveTablet));
7969+
TTabletTypes::EType tabletType = TTabletTypes::Dummy;
7970+
7971+
std::unordered_set<ui64> tablets;
7972+
for (ui32 i = 0; i < NUM_TABLETS; ++i) {
7973+
THolder<TEvHive::TEvCreateTablet> ev(new TEvHive::TEvCreateTablet(testerTablet, 100500 + i, tabletType, BINDED_CHANNELS));
7974+
ui64 tabletId = SendCreateTestTablet(runtime, hiveTablet, testerTablet, std::move(ev), 0, true);
7975+
tablets.insert(tabletId);
7976+
MakeSureTabletIsUp(runtime, tabletId, 0);
7977+
}
7978+
7979+
using TDistribution = std::vector<std::vector<ui64>>;
7980+
auto getDistribution = [hiveTablet, nodeBase, senderA, numNodes, &runtime]() -> TDistribution {
7981+
TDistribution nodeTablets(numNodes);
7982+
{
7983+
runtime.SendToPipe(hiveTablet, senderA, new TEvHive::TEvRequestHiveInfo());
7984+
TAutoPtr<IEventHandle> handle;
7985+
TEvHive::TEvResponseHiveInfo* response = runtime.GrabEdgeEventRethrow<TEvHive::TEvResponseHiveInfo>(handle);
7986+
for (const NKikimrHive::TTabletInfo& tablet : response->Record.GetTablets()) {
7987+
if (tablet.GetNodeID() == 0) {
7988+
continue;
7989+
}
7990+
UNIT_ASSERT_C(((int)tablet.GetNodeID() - nodeBase >= 0) && (tablet.GetNodeID() - nodeBase < numNodes),
7991+
"nodeId# " << tablet.GetNodeID() << " nodeBase# " << nodeBase);
7992+
nodeTablets[tablet.GetNodeID() - nodeBase].push_back(tablet.GetTabletID());
7993+
}
7994+
}
7995+
return nodeTablets;
7996+
};
7997+
7998+
auto tabletsInPile = [](const TDistribution& distribution, ui32 pile) -> size_t {
7999+
size_t sum = 0;
8000+
for (ui32 i = pile; i < distribution.size(); i += 2) {
8001+
sum += distribution[i].size();
8002+
}
8003+
return sum;
8004+
};
8005+
8006+
UNIT_ASSERT_VALUES_EQUAL(tabletsInPile(getDistribution(), 0), NUM_TABLETS);
8007+
8008+
TBlockEvents<TEvLocal::TEvBootTablet> blockBoot(runtime);
8009+
8010+
bridge.Promote(1);
8011+
8012+
while (blockBoot.empty()) {
8013+
runtime.DispatchEvents({}, TDuration::MilliSeconds(20));
8014+
}
8015+
8016+
// make sure tablets are moved with correct in-flight
8017+
UNIT_ASSERT_VALUES_EQUAL(tabletsInPile(getDistribution(), 0), NUM_TABLETS - 2);
8018+
8019+
blockBoot.Stop().Unblock();
8020+
runtime.DispatchEvents({}, TDuration::MilliSeconds(100));
8021+
8022+
UNIT_ASSERT_VALUES_EQUAL(tabletsInPile(getDistribution(), 0), 0);
8023+
}
79528024
}
79538025

79548026
Y_UNIT_TEST_SUITE(THeavyPerfTest) {

ydb/core/mind/hive/node_info.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ struct TNodeInfo {
164164
return VolatileState == EVolatileState::Connecting || VolatileState == EVolatileState::Connected;
165165
}
166166

167+
TNodeId GetId() const {
168+
return Id;
169+
}
170+
167171
bool MatchesFilter(const TNodeFilter& filter, TTabletDebugState* debugState = nullptr) const;
168172
bool IsAllowedToRunTablet(TTabletDebugState* debugState = nullptr) const;
169173
bool IsAllowedToRunTablet(const TTabletInfo& tablet, TTabletDebugState* debugState = nullptr) const;

0 commit comments

Comments
 (0)