Skip to content

Commit 665f332

Browse files
authored
25-1: Add transaction pipelining to NodeBroker (#16045)
2 parents f9f2c5c + efe7565 commit 665f332

11 files changed

+832
-325
lines changed

ydb/core/mind/node_broker.cpp

Lines changed: 124 additions & 128 deletions
Large diffs are not rendered by default.

ydb/core/mind/node_broker__extend_lease.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,28 +46,29 @@ class TNodeBroker::TTxExtendLease : public TTransactionBase<TNodeBroker> {
4646
Response = new TEvNodeBroker::TEvExtendLeaseResponse;
4747
Response->Record.SetNodeId(nodeId);
4848

49-
auto it = Self->Nodes.find(nodeId);
50-
if (it == Self->Nodes.end()) {
51-
if (Self->ExpiredNodes.contains(nodeId))
49+
auto it = Self->Dirty.Nodes.find(nodeId);
50+
if (it == Self->Dirty.Nodes.end()) {
51+
if (Self->Dirty.ExpiredNodes.contains(nodeId))
5252
return Error(TStatus::WRONG_REQUEST, "Node has expired", ctx);
5353
else
5454
return Error(TStatus::WRONG_REQUEST, "Unknown node", ctx);
5555
}
5656

57-
if (Self->IsBannedId(nodeId))
57+
if (Self->Dirty.IsBannedId(nodeId))
5858
return Error(TStatus::WRONG_REQUEST, "Node ID is banned", ctx);
5959

6060
auto &node = it->second;
6161
if (!node.IsFixed()) {
62-
Self->DbUpdateNodeLease(node, txc);
63-
Response->Record.SetExpire(Self->Epoch.NextEnd.GetValue());
62+
Self->Dirty.DbUpdateNodeLease(node, txc);
63+
Self->Dirty.ExtendLease(node);
64+
Response->Record.SetExpire(Self->Dirty.Epoch.NextEnd.GetValue());
6465
Update = true;
6566
} else {
6667
Response->Record.SetExpire(TInstant::Max().GetValue());
6768
}
6869

6970
Response->Record.MutableStatus()->SetCode(TStatus::OK);
70-
Self->Epoch.Serialize(*Response->Record.MutableEpoch());
71+
Self->Dirty.Epoch.Serialize(*Response->Record.MutableEpoch());
7172

7273
return true;
7374
}
@@ -82,9 +83,7 @@ class TNodeBroker::TTxExtendLease : public TTransactionBase<TNodeBroker> {
8283
ctx.Send(Event->Sender, Response.Release());
8384

8485
if (Update)
85-
Self->ExtendLease(Self->Nodes.at(Event->Get()->Record.GetNodeId()));
86-
87-
Self->TxCompleted(Event->Get()->Record.GetNodeId(), this, ctx);
86+
Self->Committed.ExtendLease(Self->Committed.Nodes.at(Event->Get()->Record.GetNodeId()));
8887
}
8988

9089
private:

ydb/core/mind/node_broker__graceful_shutdown.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ class TNodeBroker::TTxGracefulShutdown : public TTransactionBase<TNodeBroker> {
1111
TTxGracefulShutdown(TNodeBroker *self, TEvNodeBroker::TEvGracefulShutdownRequest::TPtr &ev)
1212
: TBase(self)
1313
, Event(ev)
14+
, Update(false)
1415
{
1516
}
1617

@@ -25,16 +26,16 @@ class TNodeBroker::TTxGracefulShutdown : public TTransactionBase<TNodeBroker> {
2526
"TTxGracefulShutdown Execute. Graceful Shutdown request from " << nodeId << " ");
2627

2728
Response = MakeHolder<TEvNodeBroker::TEvGracefulShutdownResponse>();
28-
const auto it = Self->Nodes.find(nodeId);
29+
const auto it = Self->Dirty.Nodes.find(nodeId);
2930

30-
if (it != Self->Nodes.end()) {
31+
if (it != Self->Dirty.Nodes.end()) {
3132
auto& node = it->second;
32-
Self->SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value());
33-
Self->DbReleaseSlotIndex(node, txc);
34-
node.SlotIndex.reset();
33+
Self->Dirty.DbReleaseSlotIndex(node, txc);
34+
Self->Dirty.ReleaseSlotIndex(node);
3535

3636
Response->Record.MutableStatus()->SetCode(TStatus::OK);
3737

38+
Update = true;
3839
return true;
3940
}
4041

@@ -47,13 +48,16 @@ class TNodeBroker::TTxGracefulShutdown : public TTransactionBase<TNodeBroker> {
4748
void Complete(const TActorContext &ctx) override
4849
{
4950
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxGracefulShutdown Complete");
51+
if (Update) {
52+
Self->Committed.ReleaseSlotIndex(Self->Committed.Nodes.at(Event->Get()->Record.GetNodeId()));
53+
}
5054
ctx.Send(Event->Sender, Response.Release());
51-
Self->TxCompleted(this, ctx);
5255
}
5356

5457
private:
5558
TEvNodeBroker::TEvGracefulShutdownRequest::TPtr Event;
5659
THolder<TEvNodeBroker::TEvGracefulShutdownResponse> Response;
60+
bool Update;
5761
};
5862

5963
ITransaction *TNodeBroker::CreateTxGracefulShutdown(TEvNodeBroker::TEvGracefulShutdownRequest::TPtr &ev)

ydb/core/mind/node_broker__init_scheme.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ class TNodeBroker::TTxInitScheme : public TTransactionBase<TNodeBroker> {
2828
{
2929
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxInitScheme Complete");
3030

31-
Self->ProcessTx(Self->CreateTxLoadState(), ctx);
32-
Self->TxCompleted(this, ctx);
31+
Self->Execute(Self->CreateTxLoadState(), ctx);
3332
}
3433
};
3534

ydb/core/mind/node_broker__load_state.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@ class TNodeBroker::TTxLoadState : public TTransactionBase<TNodeBroker> {
2020
{
2121
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxLoadState Execute");
2222

23-
if (!Self->DbLoadState(txc, ctx))
23+
if (!Self->Dirty.DbLoadState(txc, ctx))
2424
return false;
2525

2626
// Move epoch if required.
2727
auto now = ctx.Now();
28-
while (now > Self->Epoch.End) {
28+
while (now > Self->Dirty.Epoch.End) {
2929
TStateDiff diff;
30-
Self->ComputeNextEpochDiff(diff);
31-
Self->DbApplyStateDiff(diff, txc);
32-
Self->ApplyStateDiff(diff);
30+
Self->Dirty.ComputeNextEpochDiff(diff);
31+
Self->Dirty.DbApplyStateDiff(diff, txc);
32+
Self->Dirty.ApplyStateDiff(diff);
3333
}
3434

3535
return true;
@@ -39,12 +39,12 @@ class TNodeBroker::TTxLoadState : public TTransactionBase<TNodeBroker> {
3939
{
4040
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxLoadState Complete");
4141

42+
Self->Committed = Self->Dirty;
4243
Self->Become(&TNodeBroker::StateWork);
4344
Self->SubscribeForConfigUpdates(ctx);
4445
Self->ScheduleEpochUpdate(ctx);
4546
Self->PrepareEpochCache();
4647
Self->SignalTabletActive(ctx);
47-
Self->TxCompleted(this, ctx);
4848
}
4949

5050
private:

ydb/core/mind/node_broker__register_node.cpp

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
1919
, NodeId(0)
2020
, ExtendLease(false)
2121
, FixNodeId(false)
22+
, SetLocation(false)
23+
, UpdateNodeAuthorizedByCertificate(false)
24+
, AllocateSlotIndex(false)
25+
, SlotIndexSubdomainChanged(false)
2226
{
2327
}
2428

@@ -46,7 +50,7 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
4650
auto host = rec.GetHost();
4751
ui16 port = (ui16)rec.GetPort();
4852
TString addr = rec.GetAddress();
49-
auto expire = rec.GetFixedNodeId() ? TInstant::Max() : Self->Epoch.NextEnd;
53+
auto expire = rec.GetFixedNodeId() ? TInstant::Max() : Self->Dirty.Epoch.NextEnd;
5054

5155
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxRegisterNode Execute");
5256
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
@@ -71,9 +75,9 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
7175
}
7276

7377
// Already registered?
74-
auto it = Self->Hosts.find(std::make_tuple(host, addr, port));
75-
if (it != Self->Hosts.end()) {
76-
auto &node = Self->Nodes.find(it->second)->second;
78+
auto it = Self->Dirty.Hosts.find(std::make_tuple(host, addr, port));
79+
if (it != Self->Dirty.Hosts.end()) {
80+
auto &node = Self->Dirty.Nodes.find(it->second)->second;
7781
NodeId = node.NodeId;
7882

7983
if (node.Address != rec.GetAddress()
@@ -90,46 +94,49 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
9094
ctx);
9195
} else if (node.Location != loc) {
9296
node.Location = loc;
93-
Self->DbUpdateNodeLocation(node, txc);
97+
Self->Dirty.DbUpdateNodeLocation(node, txc);
98+
SetLocation = true;
9499
}
95100

96101
if (!node.IsFixed() && rec.GetFixedNodeId()) {
97-
Self->DbFixNodeId(node, txc);
102+
Self->Dirty.DbFixNodeId(node, txc);
103+
Self->Dirty.FixNodeId(node);
98104
FixNodeId = true;
99105
} else if (!node.IsFixed() && node.Expire < expire) {
100-
Self->DbUpdateNodeLease(node, txc);
106+
Self->Dirty.DbUpdateNodeLease(node, txc);
107+
Self->Dirty.ExtendLease(node);
101108
ExtendLease = true;
102109
}
103110
if (node.AuthorizedByCertificate != rec.GetAuthorizedByCertificate()) {
104111
node.AuthorizedByCertificate = rec.GetAuthorizedByCertificate();
105-
Self->DbUpdateNodeAuthorizedByCertificate(node, txc);
112+
Self->Dirty.DbUpdateNodeAuthorizedByCertificate(node, txc);
113+
UpdateNodeAuthorizedByCertificate = true;
106114
}
107115

108116
if (Self->EnableStableNodeNames) {
109117
if (ServicedSubDomain != node.ServicedSubDomain) {
110118
if (node.SlotIndex.has_value()) {
111-
Self->SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value());
119+
Self->Dirty.SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value());
112120
}
113121
node.ServicedSubDomain = ServicedSubDomain;
114-
node.SlotIndex = Self->SlotIndexesPools[node.ServicedSubDomain].AcquireLowestFreeIndex();
115-
Self->DbAddNode(node, txc);
122+
node.SlotIndex = Self->Dirty.SlotIndexesPools[node.ServicedSubDomain].AcquireLowestFreeIndex();
123+
Self->Dirty.DbAddNode(node, txc);
124+
SlotIndexSubdomainChanged = true;
116125
} else if (!node.SlotIndex.has_value()) {
117-
node.SlotIndex = Self->SlotIndexesPools[node.ServicedSubDomain].AcquireLowestFreeIndex();
118-
Self->DbAddNode(node, txc);
126+
node.SlotIndex = Self->Dirty.SlotIndexesPools[node.ServicedSubDomain].AcquireLowestFreeIndex();
127+
Self->Dirty.DbAddNode(node, txc);
128+
AllocateSlotIndex = true;
119129
}
120130
}
121131

122132
Response->Record.MutableStatus()->SetCode(TStatus::OK);
123-
Self->FillNodeInfo(node, *Response->Record.MutableNode());
124-
125133
return true;
126134
}
127135

128-
if (Self->FreeIds.Empty())
136+
if (Self->Dirty.FreeIds.Empty())
129137
return Error(TStatus::ERROR_TEMP, "No free node IDs", ctx);
130138

131-
NodeId = Self->FreeIds.FirstNonZeroBit();
132-
Self->FreeIds.Reset(NodeId);
139+
NodeId = Self->Dirty.FreeIds.FirstNonZeroBit();
133140

134141
Node = MakeHolder<TNodeInfo>(NodeId, rec.GetAddress(), host, rec.GetResolveHost(), port, loc);
135142
Node->AuthorizedByCertificate = rec.GetAuthorizedByCertificate();
@@ -138,13 +145,15 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
138145

139146
if (Self->EnableStableNodeNames) {
140147
Node->ServicedSubDomain = ServicedSubDomain;
141-
Node->SlotIndex = Self->SlotIndexesPools[Node->ServicedSubDomain].AcquireLowestFreeIndex();
148+
Node->SlotIndex = Self->Dirty.SlotIndexesPools[Node->ServicedSubDomain].AcquireLowestFreeIndex();
142149
}
143150

144151
Response->Record.MutableStatus()->SetCode(TStatus::OK);
145152

146-
Self->DbAddNode(*Node, txc);
147-
Self->DbUpdateEpochVersion(Self->Epoch.Version + 1, txc);
153+
Self->Dirty.DbAddNode(*Node, txc);
154+
Self->Dirty.AddNode(*Node);
155+
Self->Dirty.DbUpdateEpochVersion(Self->Dirty.Epoch.Version + 1, txc);
156+
Self->Dirty.UpdateEpochVersion();
148157

149158
return true;
150159
}
@@ -154,18 +163,37 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
154163
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxRegisterNode Complete");
155164

156165
if (Node) {
157-
Self->AddNode(*Node);
158-
Self->UpdateEpochVersion();
166+
Self->Committed.AddNode(*Node);
167+
Self->Committed.UpdateEpochVersion();
159168
Self->AddNodeToEpochCache(*Node);
160169
} else if (ExtendLease)
161-
Self->ExtendLease(Self->Nodes.at(NodeId));
170+
Self->Committed.ExtendLease(Self->Committed.Nodes.at(NodeId));
162171
else if (FixNodeId)
163-
Self->FixNodeId(Self->Nodes.at(NodeId));
172+
Self->Committed.FixNodeId(Self->Committed.Nodes.at(NodeId));
173+
174+
if (SetLocation) {
175+
Self->Committed.Nodes.at(NodeId).Location = TNodeLocation(Event->Get()->Record.GetLocation());
176+
}
177+
178+
if (UpdateNodeAuthorizedByCertificate) {
179+
Self->Committed.Nodes.at(NodeId).AuthorizedByCertificate = Event->Get()->Record.GetAuthorizedByCertificate();
180+
}
181+
182+
if (AllocateSlotIndex) {
183+
Self->Committed.Nodes.at(NodeId).SlotIndex = Self->Committed.SlotIndexesPools[ServicedSubDomain].AcquireLowestFreeIndex();
184+
} else if (SlotIndexSubdomainChanged) {
185+
auto& node = Self->Committed.Nodes.at(NodeId);
186+
if (node.SlotIndex.has_value()) {
187+
Self->Committed.SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value());
188+
}
189+
node.ServicedSubDomain = ServicedSubDomain;
190+
node.SlotIndex = Self->Committed.SlotIndexesPools[ServicedSubDomain].AcquireLowestFreeIndex();
191+
}
164192

165193
Y_ABORT_UNLESS(Response);
166194
// With all modifications applied we may fill node info.
167195
if (Response->Record.GetStatus().GetCode() == TStatus::OK)
168-
Self->FillNodeInfo(Self->Nodes.at(NodeId), *Response->Record.MutableNode());
196+
Self->FillNodeInfo(Self->Committed.Nodes.at(NodeId), *Response->Record.MutableNode());
169197
LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER,
170198
"TTxRegisterNode reply with: " << Response->Record.ShortDebugString());
171199

@@ -176,8 +204,6 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
176204
}
177205

178206
ctx.Send(Event->Sender, Response.Release());
179-
180-
Self->TxCompleted(this, ctx);
181207
}
182208

183209
private:
@@ -189,6 +215,10 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
189215
ui32 NodeId;
190216
bool ExtendLease;
191217
bool FixNodeId;
218+
bool SetLocation;
219+
bool UpdateNodeAuthorizedByCertificate;
220+
bool AllocateSlotIndex;
221+
bool SlotIndexSubdomainChanged;
192222
};
193223

194224
ITransaction *TNodeBroker::CreateTxRegisterNode(TEvPrivate::TEvResolvedRegistrationRequest::TPtr &ev)

ydb/core/mind/node_broker__update_config.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class TNodeBroker::TTxUpdateConfig : public TTransactionBase<TNodeBroker> {
3535
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
3636
"TTxUpdateConfig Execute " << rec.ShortDebugString());
3737

38-
if (!google::protobuf::util::MessageDifferencer::Equals(Config, Self->Config))
38+
if (!google::protobuf::util::MessageDifferencer::Equals(Config, Self->Dirty.Config))
3939
Modify = true;
4040

4141
auto resp = MakeHolder<TEvConsole::TEvConfigNotificationResponse>(rec);
@@ -52,7 +52,7 @@ class TNodeBroker::TTxUpdateConfig : public TTransactionBase<TNodeBroker> {
5252
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
5353
"TTxUpdateConfig Execute " << rec.ShortDebugString());
5454

55-
if (!google::protobuf::util::MessageDifferencer::Equals(Config, Self->Config))
55+
if (!google::protobuf::util::MessageDifferencer::Equals(Config, Self->Dirty.Config))
5656
Modify = true;
5757

5858
auto resp = MakeHolder<TEvNodeBroker::TEvSetConfigResponse>();
@@ -72,8 +72,10 @@ class TNodeBroker::TTxUpdateConfig : public TTransactionBase<TNodeBroker> {
7272
if (Request && !ProcessRequest(ctx))
7373
return true;
7474

75-
if (Modify)
76-
Self->DbUpdateConfig(Config, txc);
75+
if (Modify) {
76+
Self->Dirty.DbUpdateConfig(Config, txc);
77+
Self->Dirty.LoadConfigFromProto(Config);
78+
}
7779

7880
return true;
7981
}
@@ -83,15 +85,13 @@ class TNodeBroker::TTxUpdateConfig : public TTransactionBase<TNodeBroker> {
8385
LOG_DEBUG(ctx, NKikimrServices::NODE_BROKER, "TTxUpdateConfig Complete");
8486

8587
if (Modify)
86-
Self->LoadConfigFromProto(Config);
88+
Self->Committed.LoadConfigFromProto(Config);
8789

8890
if (Response) {
8991
LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER,
9092
"TTxUpdateConfig reply with: " << Response->ToString());
9193
ctx.Send(Response);
9294
}
93-
94-
Self->TxCompleted(this, ctx);
9595
}
9696

9797
private:

ydb/core/mind/node_broker__update_config_subscription.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class TNodeBroker::TTxUpdateConfigSubscription : public TTransactionBase<TNodeBr
2727
Y_ABORT_UNLESS(rec.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
2828

2929
SubscriptionId = rec.GetSubscriptionId();
30-
Self->DbUpdateConfigSubscription(SubscriptionId, txc);
30+
Self->Dirty.DbUpdateConfigSubscription(SubscriptionId, txc);
31+
Self->Dirty.ConfigSubscriptionId = SubscriptionId;
3132

3233
return true;
3334
}
@@ -39,9 +40,7 @@ class TNodeBroker::TTxUpdateConfigSubscription : public TTransactionBase<TNodeBr
3940
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
4041
"Using new subscription id=" << SubscriptionId);
4142

42-
Self->ConfigSubscriptionId = SubscriptionId;
43-
44-
Self->TxCompleted(0, this, ctx);
43+
Self->Committed.ConfigSubscriptionId = SubscriptionId;
4544
}
4645

4746
private:

0 commit comments

Comments
 (0)