Skip to content

Commit f4b2e42

Browse files
committed
Moved commit "Fix for federated topic" from ydb repo
1 parent c65fe0f commit f4b2e42

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

src/client/federated_topic/impl/federated_write_session.cpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ void TFederatedWriteSessionImpl::Start() {
105105

106106
void TFederatedWriteSessionImpl::OpenSubsessionImpl(std::shared_ptr<TDbInfo> db) {
107107
Y_ABORT_UNLESS(Lock.IsLocked());
108+
109+
++SubsessionGeneration;
110+
108111
if (Subsession) {
109112
PendingToken.reset();
110113
OldSubsession = std::move(Subsession);
@@ -119,14 +122,20 @@ void TFederatedWriteSessionImpl::OpenSubsessionImpl(std::shared_ptr<TDbInfo> db)
119122

120123
auto handlers = NTopic::TWriteSessionSettings::TEventHandlers()
121124
.HandlersExecutor(Settings.EventHandlers_.HandlersExecutor_)
122-
.ReadyToAcceptHandler([selfCtx = SelfContext](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev) {
125+
.ReadyToAcceptHandler([selfCtx = SelfContext, generation = SubsessionGeneration](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev) {
123126
if (auto self = selfCtx->LockShared()) {
124-
TDeferredWrite deferred(self->Subsession);
127+
TDeferredWrite deferred;
128+
125129
with_lock(self->Lock) {
130+
if (generation != self->SubsessionGeneration) {
131+
return;
132+
}
133+
126134
Y_ABORT_UNLESS(!self->PendingToken.has_value());
127135
self->PendingToken = std::move(ev.ContinuationToken);
128136
self->PrepareDeferredWriteImpl(deferred);
129137
}
138+
130139
deferred.DoWrite();
131140
}
132141
})
@@ -149,14 +158,17 @@ void TFederatedWriteSessionImpl::OpenSubsessionImpl(std::shared_ptr<TDbInfo> db)
149158
self->IssueTokenIfAllowed();
150159
}
151160
})
152-
.SessionClosedHandler([selfCtx = SelfContext](const NTopic::TSessionClosedEvent & ev) {
161+
.SessionClosedHandler([selfCtx = SelfContext, generation = SubsessionGeneration](const NTopic::TSessionClosedEvent & ev) {
153162
if (ev.IsSuccess()) {
154163
// The subsession was closed by the federated write session itself while creating a new subsession.
155164
// In this case we get SUCCESS status and don't need to propagate it further.
156165
return;
157166
}
158167
if (auto self = selfCtx->LockShared()) {
159168
with_lock(self->Lock) {
169+
if (generation != self->SubsessionGeneration) {
170+
return;
171+
}
160172
self->CloseImpl(ev);
161173
}
162174
}
@@ -391,6 +403,7 @@ bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl(TDeferredWrite& deferr
391403
}
392404
OriginalMessagesToGetAck.push_back(std::move(OriginalMessagesToPassDown.front()));
393405
OriginalMessagesToPassDown.pop_front();
406+
deferred.Writer = Subsession;
394407
deferred.Token.emplace(std::move(*PendingToken));
395408
deferred.Message.emplace(std::move(OriginalMessagesToGetAck.back().Message));
396409
PendingToken.reset();

src/client/federated_topic/impl/federated_write_session.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class TFederatedWriteSessionImpl : public NTopic::TContinuationTokenIssuer,
6262
};
6363

6464
struct TDeferredWrite {
65+
TDeferredWrite() {}
6566
explicit TDeferredWrite(std::shared_ptr<NTopic::IWriteSession> writer)
6667
: Writer(std::move(writer)) {
6768
}
@@ -121,6 +122,7 @@ class TFederatedWriteSessionImpl : public NTopic::TContinuationTokenIssuer,
121122

122123
TAdaptiveLock Lock;
123124

125+
size_t SubsessionGeneration = 0;
124126
std::shared_ptr<NTopic::IWriteSession> Subsession;
125127
std::shared_ptr<NTopic::IWriteSession> OldSubsession;
126128

0 commit comments

Comments
 (0)