Skip to content

Commit 8e429b8

Browse files
committed
Moved commit "Federated write session: fix deadlock" from ydb repo
1 parent 102877c commit 8e429b8

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

src/client/federated_topic/impl/federated_write_session.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ bool DatabasesAreSame(std::shared_ptr<TDbInfo> lhs, std::shared_ptr<TDbInfo> rhs
1919
if (!lhs || !rhs) {
2020
return false;
2121
}
22-
return lhs->path() == rhs->path() && lhs->endpoint() == rhs->endpoint();
22+
return lhs->name() == rhs->name() && lhs->path() == rhs->path() && lhs->endpoint() == rhs->endpoint();
2323
}
2424

2525
NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings);
@@ -107,8 +107,10 @@ void TFederatedWriteSessionImpl::OpenSubsessionImpl(std::shared_ptr<TDbInfo> db)
107107
Y_ABORT_UNLESS(Lock.IsLocked());
108108
if (Subsession) {
109109
PendingToken.reset();
110-
Subsession->Close(TDuration::Zero());
110+
OldSubsession = std::move(Subsession);
111+
OldSubsession->Close(TDuration::Zero());
111112
}
113+
112114
auto clientSettings = SubclientSettings;
113115
clientSettings
114116
.Database(db->path())
@@ -148,6 +150,11 @@ void TFederatedWriteSessionImpl::OpenSubsessionImpl(std::shared_ptr<TDbInfo> db)
148150
}
149151
})
150152
.SessionClosedHandler([selfCtx = SelfContext](const NTopic::TSessionClosedEvent & ev) {
153+
if (ev.IsSuccess()) {
154+
// The subsession was closed by the federated write session itself while creating a new subsession.
155+
// In this case we get SUCCESS status and don't need to propagate it further.
156+
return;
157+
}
151158
if (auto self = selfCtx->LockShared()) {
152159
with_lock(self->Lock) {
153160
self->CloseImpl(ev);
@@ -291,8 +298,10 @@ void TFederatedWriteSessionImpl::ScheduleFederationStateUpdateImpl(TDuration del
291298
auto cb = [selfCtx = SelfContext](bool ok) {
292299
if (ok) {
293300
if (auto self = selfCtx->LockShared()) {
301+
std::shared_ptr<NTopic::IWriteSession> old;
294302
with_lock(self->Lock) {
295303
self->UpdateFederationStateImpl();
304+
old = std::move(self->OldSubsession);
296305
}
297306
}
298307
}

src/client/federated_topic/impl/federated_write_session.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class TFederatedWriteSessionImpl : public NTopic::TContinuationTokenIssuer,
122122
TAdaptiveLock Lock;
123123

124124
std::shared_ptr<NTopic::IWriteSession> Subsession;
125+
std::shared_ptr<NTopic::IWriteSession> OldSubsession;
125126

126127
std::shared_ptr<NTopic::TWriteSessionEventsQueue> ClientEventsQueue;
127128

0 commit comments

Comments
 (0)