Skip to content

Commit 568d2c2

Browse files
authored
Fix distconf IC session subscription state machine (#7143)
1 parent 9785f6a commit 568d2c2

File tree

8 files changed

+223
-105
lines changed

8 files changed

+223
-105
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
ydb/core/blobstorage/dsproxy/ut TBlobStorageProxySequenceTest.TestBlock42PutWithChangingSlowDisk
22
ydb/core/blobstorage/dsproxy/ut_fat TBlobStorageProxyTest.TestBatchedPutRequestDoesNotContainAHugeBlob
3-
ydb/core/blobstorage/ut_blobstorage BlobPatching.PatchBlock42
43
ydb/core/client/ut TClientTest.PromoteFollower
54
ydb/core/client/ut TClientTest.ReadFromFollower
65
ydb/core/client/ut TFlatTest.AutoSplitMergeQueue

ydb/core/blobstorage/nodewarden/distconf.cpp

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,30 +145,44 @@ namespace NKikimr::NStorage {
145145
}
146146
}
147147

148-
for (const auto& [nodeId, sessionId] : SubscribedSessions) {
148+
for (const auto& [nodeId, subs] : SubscribedSessions) {
149149
bool okay = false;
150150
if (Binding && Binding->NodeId == nodeId) {
151-
Y_ABORT_UNLESS(sessionId == Binding->SessionId);
151+
Y_VERIFY_S(subs.SessionId == Binding->SessionId || !Binding->SessionId,
152+
"Binding# " << Binding->ToString() << " Subscription# " << subs.ToString());
152153
okay = true;
153154
}
154155
if (const auto it = DirectBoundNodes.find(nodeId); it != DirectBoundNodes.end()) {
155-
Y_ABORT_UNLESS(!sessionId || sessionId == it->second.SessionId);
156+
Y_VERIFY_S(!subs.SessionId || subs.SessionId == it->second.SessionId, "sessionId# " << subs.SessionId
157+
<< " node.SessionId# " << it->second.SessionId);
156158
okay = true;
157159
}
158-
if (!sessionId) {
160+
if (!subs.SessionId) {
159161
okay = true; // may be just obsolete subscription request
160162
}
161163
if (ConnectedDynamicNodes.contains(nodeId)) {
162164
okay = true;
163165
}
164166
Y_ABORT_UNLESS(okay);
167+
if (subs.SubscriptionCookie) {
168+
const auto it = SubscriptionCookieMap.find(subs.SubscriptionCookie);
169+
Y_ABORT_UNLESS(it != SubscriptionCookieMap.end());
170+
Y_ABORT_UNLESS(it->second == nodeId);
171+
}
172+
}
173+
for (const auto& [cookie, nodeId] : SubscriptionCookieMap) {
174+
const auto it = SubscribedSessions.find(nodeId);
175+
Y_ABORT_UNLESS(it != SubscribedSessions.end());
176+
const TSessionSubscription& subs = it->second;
177+
Y_VERIFY_S(subs.SubscriptionCookie == cookie, "SubscriptionCookie# " << subs.SubscriptionCookie
178+
<< " cookie# " << cookie);
165179
}
166180

167181
if (Binding) {
168182
Y_ABORT_UNLESS(SubscribedSessions.contains(Binding->NodeId));
169183
}
170184
for (const auto& [nodeId, info] : DirectBoundNodes) {
171-
Y_ABORT_UNLESS(SubscribedSessions.contains(nodeId));
185+
Y_VERIFY_S(SubscribedSessions.contains(nodeId), "NodeId# " << nodeId);
172186
}
173187

174188
Y_ABORT_UNLESS(!StorageConfig || CheckFingerprint(*StorageConfig));
@@ -229,6 +243,13 @@ namespace NKikimr::NStorage {
229243
}
230244

231245
STFUNC(TDistributedConfigKeeper::StateFunc) {
246+
STLOG(PRI_DEBUG, BS_NODE, NWDC15, "StateFunc", (Type, ev->GetTypeRewrite()), (Sender, ev->Sender),
247+
(SessionId, ev->InterconnectSession), (Cookie, ev->Cookie));
248+
const ui32 senderNodeId = ev->Sender.NodeId();
249+
if (ev->InterconnectSession && SubscribedSessions.contains(senderNodeId)) {
250+
// keep session actors intact
251+
SubscribeToPeerNode(senderNodeId, ev->InterconnectSession);
252+
}
232253
STRICT_STFUNC_BODY(
233254
hFunc(TEvNodeConfigPush, Handle);
234255
hFunc(TEvNodeConfigReversePush, Handle);
@@ -239,7 +260,6 @@ namespace NKikimr::NStorage {
239260
hFunc(TEvInterconnect::TEvNodesInfo, Handle);
240261
hFunc(TEvInterconnect::TEvNodeConnected, Handle);
241262
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
242-
hFunc(TEvents::TEvUndelivered, Handle);
243263
cFunc(TEvPrivate::EvErrorTimeout, HandleErrorTimeout);
244264
hFunc(TEvPrivate::TEvStorageConfigLoaded, Handle);
245265
hFunc(TEvPrivate::TEvStorageConfigStored, Handle);
@@ -252,6 +272,12 @@ namespace NKikimr::NStorage {
252272
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
253273
cFunc(TEvents::TSystem::Poison, PassAway);
254274
)
275+
for (ui32 nodeId : std::exchange(UnsubscribeQueue, {})) {
276+
UnsubscribeInterconnect(nodeId);
277+
}
278+
if (IsSelfStatic && StorageConfig && NodeListObtained) {
279+
IssueNextBindRequest();
280+
}
255281
ConsistencyCheck();
256282
}
257283

ydb/core/blobstorage/nodewarden/distconf.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,20 @@ namespace NKikimr::NStorage {
240240
TString ErrorReason;
241241

242242
// subscribed IC sessions
243-
THashMap<ui32, TActorId> SubscribedSessions;
243+
struct TSessionSubscription {
244+
TActorId SessionId;
245+
ui64 SubscriptionCookie = 0; // when nonzero, we didn't have TEvNodeConnected yet
246+
247+
TSessionSubscription(TActorId sessionId) : SessionId(sessionId) {}
248+
249+
TString ToString() const {
250+
return TStringBuilder() << "{SessionId# " << SessionId << " SubscriptionCookie# " << SubscriptionCookie << "}";
251+
}
252+
};
253+
THashMap<ui32, TSessionSubscription> SubscribedSessions;
254+
ui64 NextSubscribeCookie = 1;
255+
THashMap<ui64, ui32> SubscriptionCookieMap;
256+
THashSet<ui32> UnsubscribeQueue;
244257

245258
// child actors
246259
THashSet<TActorId> ChildActors;
@@ -287,8 +300,9 @@ namespace NKikimr::NStorage {
287300
void BindToSession(TActorId sessionId);
288301
void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev);
289302
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev);
290-
void Handle(TEvents::TEvUndelivered::TPtr ev);
303+
void HandleDisconnect(ui32 nodeId, TActorId sessionId);
291304
void UnsubscribeInterconnect(ui32 nodeId);
305+
TActorId SubscribeToPeerNode(ui32 nodeId, TActorId sessionId);
292306
void AbortBinding(const char *reason, bool sendUnbindMessage = true);
293307
void HandleWakeup();
294308
void Handle(TEvNodeConfigReversePush::TPtr ev);

0 commit comments

Comments
 (0)