@@ -19,6 +19,10 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
19
19
, NodeId(0 )
20
20
, ExtendLease(false )
21
21
, FixNodeId(false )
22
+ , SetLocation(false )
23
+ , UpdateNodeAuthorizedByCertificate(false )
24
+ , AllocateSlotIndex(false )
25
+ , SlotIndexSubdomainChanged(false )
22
26
{
23
27
}
24
28
@@ -46,7 +50,7 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
46
50
auto host = rec.GetHost ();
47
51
ui16 port = (ui16)rec.GetPort ();
48
52
TString addr = rec.GetAddress ();
49
- auto expire = rec.GetFixedNodeId () ? TInstant::Max () : Self->Epoch .NextEnd ;
53
+ auto expire = rec.GetFixedNodeId () ? TInstant::Max () : Self->Dirty . Epoch .NextEnd ;
50
54
51
55
LOG_DEBUG (ctx, NKikimrServices::NODE_BROKER, " TTxRegisterNode Execute" );
52
56
LOG_DEBUG_S (ctx, NKikimrServices::NODE_BROKER,
@@ -71,9 +75,9 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
71
75
}
72
76
73
77
// Already registered?
74
- auto it = Self->Hosts .find (std::make_tuple (host, addr, port));
75
- if (it != Self->Hosts .end ()) {
76
- auto &node = Self->Nodes .find (it->second )->second ;
78
+ auto it = Self->Dirty . Hosts .find (std::make_tuple (host, addr, port));
79
+ if (it != Self->Dirty . Hosts .end ()) {
80
+ auto &node = Self->Dirty . Nodes .find (it->second )->second ;
77
81
NodeId = node.NodeId ;
78
82
79
83
if (node.Address != rec.GetAddress ()
@@ -90,46 +94,49 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
90
94
ctx);
91
95
} else if (node.Location != loc) {
92
96
node.Location = loc;
93
- Self->DbUpdateNodeLocation (node, txc);
97
+ Self->Dirty .DbUpdateNodeLocation (node, txc);
98
+ SetLocation = true ;
94
99
}
95
100
96
101
if (!node.IsFixed () && rec.GetFixedNodeId ()) {
97
- Self->DbFixNodeId (node, txc);
102
+ Self->Dirty .DbFixNodeId (node, txc);
103
+ Self->Dirty .FixNodeId (node);
98
104
FixNodeId = true ;
99
105
} else if (!node.IsFixed () && node.Expire < expire) {
100
- Self->DbUpdateNodeLease (node, txc);
106
+ Self->Dirty .DbUpdateNodeLease (node, txc);
107
+ Self->Dirty .ExtendLease (node);
101
108
ExtendLease = true ;
102
109
}
103
110
if (node.AuthorizedByCertificate != rec.GetAuthorizedByCertificate ()) {
104
111
node.AuthorizedByCertificate = rec.GetAuthorizedByCertificate ();
105
- Self->DbUpdateNodeAuthorizedByCertificate (node, txc);
112
+ Self->Dirty .DbUpdateNodeAuthorizedByCertificate (node, txc);
113
+ UpdateNodeAuthorizedByCertificate = true ;
106
114
}
107
115
108
116
if (Self->EnableStableNodeNames ) {
109
117
if (ServicedSubDomain != node.ServicedSubDomain ) {
110
118
if (node.SlotIndex .has_value ()) {
111
- Self->SlotIndexesPools [node.ServicedSubDomain ].Release (node.SlotIndex .value ());
119
+ Self->Dirty . SlotIndexesPools [node.ServicedSubDomain ].Release (node.SlotIndex .value ());
112
120
}
113
121
node.ServicedSubDomain = ServicedSubDomain;
114
- node.SlotIndex = Self->SlotIndexesPools [node.ServicedSubDomain ].AcquireLowestFreeIndex ();
115
- Self->DbAddNode (node, txc);
122
+ node.SlotIndex = Self->Dirty .SlotIndexesPools [node.ServicedSubDomain ].AcquireLowestFreeIndex ();
123
+ Self->Dirty .DbAddNode (node, txc);
124
+ SlotIndexSubdomainChanged = true ;
116
125
} else if (!node.SlotIndex .has_value ()) {
117
- node.SlotIndex = Self->SlotIndexesPools [node.ServicedSubDomain ].AcquireLowestFreeIndex ();
118
- Self->DbAddNode (node, txc);
126
+ node.SlotIndex = Self->Dirty .SlotIndexesPools [node.ServicedSubDomain ].AcquireLowestFreeIndex ();
127
+ Self->Dirty .DbAddNode (node, txc);
128
+ AllocateSlotIndex = true ;
119
129
}
120
130
}
121
131
122
132
Response->Record .MutableStatus ()->SetCode (TStatus::OK);
123
- Self->FillNodeInfo (node, *Response->Record .MutableNode ());
124
-
125
133
return true ;
126
134
}
127
135
128
- if (Self->FreeIds .Empty ())
136
+ if (Self->Dirty . FreeIds .Empty ())
129
137
return Error (TStatus::ERROR_TEMP, " No free node IDs" , ctx);
130
138
131
- NodeId = Self->FreeIds .FirstNonZeroBit ();
132
- Self->FreeIds .Reset (NodeId);
139
+ NodeId = Self->Dirty .FreeIds .FirstNonZeroBit ();
133
140
134
141
Node = MakeHolder<TNodeInfo>(NodeId, rec.GetAddress (), host, rec.GetResolveHost (), port, loc);
135
142
Node->AuthorizedByCertificate = rec.GetAuthorizedByCertificate ();
@@ -138,13 +145,15 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
138
145
139
146
if (Self->EnableStableNodeNames ) {
140
147
Node->ServicedSubDomain = ServicedSubDomain;
141
- Node->SlotIndex = Self->SlotIndexesPools [Node->ServicedSubDomain ].AcquireLowestFreeIndex ();
148
+ Node->SlotIndex = Self->Dirty . SlotIndexesPools [Node->ServicedSubDomain ].AcquireLowestFreeIndex ();
142
149
}
143
150
144
151
Response->Record .MutableStatus ()->SetCode (TStatus::OK);
145
152
146
- Self->DbAddNode (*Node, txc);
147
- Self->DbUpdateEpochVersion (Self->Epoch .Version + 1 , txc);
153
+ Self->Dirty .DbAddNode (*Node, txc);
154
+ Self->Dirty .AddNode (*Node);
155
+ Self->Dirty .DbUpdateEpochVersion (Self->Dirty .Epoch .Version + 1 , txc);
156
+ Self->Dirty .UpdateEpochVersion ();
148
157
149
158
return true ;
150
159
}
@@ -154,18 +163,37 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
154
163
LOG_DEBUG (ctx, NKikimrServices::NODE_BROKER, " TTxRegisterNode Complete" );
155
164
156
165
if (Node) {
157
- Self->AddNode (*Node);
158
- Self->UpdateEpochVersion ();
166
+ Self->Committed . AddNode (*Node);
167
+ Self->Committed . UpdateEpochVersion ();
159
168
Self->AddNodeToEpochCache (*Node);
160
169
} else if (ExtendLease)
161
- Self->ExtendLease (Self->Nodes .at (NodeId));
170
+ Self->Committed . ExtendLease (Self->Committed . Nodes .at (NodeId));
162
171
else if (FixNodeId)
163
- Self->FixNodeId (Self->Nodes .at (NodeId));
172
+ Self->Committed .FixNodeId (Self->Committed .Nodes .at (NodeId));
173
+
174
+ if (SetLocation) {
175
+ Self->Committed .Nodes .at (NodeId).Location = TNodeLocation (Event->Get ()->Record .GetLocation ());
176
+ }
177
+
178
+ if (UpdateNodeAuthorizedByCertificate) {
179
+ Self->Committed .Nodes .at (NodeId).AuthorizedByCertificate = Event->Get ()->Record .GetAuthorizedByCertificate ();
180
+ }
181
+
182
+ if (AllocateSlotIndex) {
183
+ Self->Committed .Nodes .at (NodeId).SlotIndex = Self->Committed .SlotIndexesPools [ServicedSubDomain].AcquireLowestFreeIndex ();
184
+ } else if (SlotIndexSubdomainChanged) {
185
+ auto & node = Self->Committed .Nodes .at (NodeId);
186
+ if (node.SlotIndex .has_value ()) {
187
+ Self->Committed .SlotIndexesPools [node.ServicedSubDomain ].Release (node.SlotIndex .value ());
188
+ }
189
+ node.ServicedSubDomain = ServicedSubDomain;
190
+ node.SlotIndex = Self->Committed .SlotIndexesPools [ServicedSubDomain].AcquireLowestFreeIndex ();
191
+ }
164
192
165
193
Y_ABORT_UNLESS (Response);
166
194
// With all modifications applied we may fill node info.
167
195
if (Response->Record .GetStatus ().GetCode () == TStatus::OK)
168
- Self->FillNodeInfo (Self->Nodes .at (NodeId), *Response->Record .MutableNode ());
196
+ Self->FillNodeInfo (Self->Committed . Nodes .at (NodeId), *Response->Record .MutableNode ());
169
197
LOG_TRACE_S (ctx, NKikimrServices::NODE_BROKER,
170
198
" TTxRegisterNode reply with: " << Response->Record .ShortDebugString ());
171
199
@@ -176,8 +204,6 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
176
204
}
177
205
178
206
ctx.Send (Event->Sender , Response.Release ());
179
-
180
- Self->TxCompleted (this , ctx);
181
207
}
182
208
183
209
private:
@@ -189,6 +215,10 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
189
215
ui32 NodeId;
190
216
bool ExtendLease;
191
217
bool FixNodeId;
218
+ bool SetLocation;
219
+ bool UpdateNodeAuthorizedByCertificate;
220
+ bool AllocateSlotIndex;
221
+ bool SlotIndexSubdomainChanged;
192
222
};
193
223
194
224
ITransaction *TNodeBroker::CreateTxRegisterNode (TEvPrivate::TEvResolvedRegistrationRequest::TPtr &ev)
0 commit comments