@@ -29,12 +29,14 @@ TFederatedWriteSessionImpl::TFederatedWriteSessionImpl(
29
29
std::shared_ptr<TGRpcConnectionsImpl> connections,
30
30
const TFederatedTopicClientSettings& clientSettings,
31
31
std::shared_ptr<TFederatedDbObserver> observer,
32
- std::shared_ptr<std::unordered_map<NTopic::ECodec, std::unique_ptr<NTopic::ICodec>>> codecs
32
+ std::shared_ptr<std::unordered_map<NTopic::ECodec, std::unique_ptr<NTopic::ICodec>>> codecs,
33
+ NTopic::IExecutor::TPtr subsessionHandlersExecutor
33
34
)
34
35
: Settings(settings)
35
36
, Connections(std::move(connections))
36
37
, SubclientSettings(FromFederated(clientSettings))
37
38
, ProvidedCodecs(std::move(codecs))
39
+ , SubsessionHandlersExecutor(subsessionHandlersExecutor)
38
40
, Observer(std::move(observer))
39
41
, AsyncInit(Observer->WaitForFirstState ())
40
42
, FederationState(nullptr )
@@ -70,15 +72,16 @@ void TFederatedWriteSessionImpl::IssueTokenIfAllowed() {
70
72
}
71
73
}
72
74
73
- void TFederatedWriteSessionImpl::UpdateFederationStateImpl () {
75
+ std::shared_ptr<NTopic::IWriteSession> TFederatedWriteSessionImpl::UpdateFederationStateImpl () {
74
76
Y_ABORT_UNLESS (Lock.IsLocked ());
75
77
// Even after the user has called the Close method, transitioning the session to the CLOSING state,
76
78
// we keep updating the federation state, as the session may still have some messages to send in its queues,
77
79
// and for that we need to know the current state of the federation.
78
80
if (SessionState < State::CLOSED) {
79
81
FederationState = Observer->GetState ();
80
- OnFederationStateUpdateImpl ();
82
+ return OnFederationStateUpdateImpl ();
81
83
}
84
+ return {};
82
85
}
83
86
84
87
void TFederatedWriteSessionImpl::Start () {
@@ -103,15 +106,16 @@ void TFederatedWriteSessionImpl::Start() {
103
106
});
104
107
}
105
108
106
- void TFederatedWriteSessionImpl::OpenSubsessionImpl (std::shared_ptr<TDbInfo> db) {
109
+ std::shared_ptr<NTopic::IWriteSession> TFederatedWriteSessionImpl::OpenSubsessionImpl (std::shared_ptr<TDbInfo> db) {
107
110
Y_ABORT_UNLESS (Lock.IsLocked ());
108
111
109
112
++SubsessionGeneration;
110
113
114
+ std::shared_ptr<NTopic::IWriteSession> oldSubsession;
115
+
111
116
if (Subsession) {
112
117
PendingToken.reset ();
113
- OldSubsession = std::move (Subsession);
114
- OldSubsession->Close (TDuration::Zero ());
118
+ std::swap (oldSubsession, Subsession);
115
119
}
116
120
117
121
auto clientSettings = SubclientSettings;
@@ -121,22 +125,18 @@ void TFederatedWriteSessionImpl::OpenSubsessionImpl(std::shared_ptr<TDbInfo> db)
121
125
auto subclient = std::make_shared<NTopic::TTopicClient::TImpl>(Connections, clientSettings);
122
126
123
127
auto handlers = NTopic::TWriteSessionSettings::TEventHandlers ()
124
- .HandlersExecutor (Settings. EventHandlers_ . HandlersExecutor_ )
128
+ .HandlersExecutor (SubsessionHandlersExecutor )
125
129
.ReadyToAcceptHandler ([selfCtx = SelfContext, generation = SubsessionGeneration](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev) {
126
130
if (auto self = selfCtx->LockShared ()) {
127
- TDeferredWrite deferred;
128
-
129
131
with_lock (self->Lock ) {
130
132
if (generation != self->SubsessionGeneration ) {
131
133
return ;
132
134
}
133
135
134
136
Y_ABORT_UNLESS (!self->PendingToken .has_value ());
135
137
self->PendingToken = std::move (ev.ContinuationToken );
136
- self->PrepareDeferredWriteImpl (deferred );
138
+ self->MaybeWriteImpl ( );
137
139
}
138
-
139
- deferred.DoWrite ();
140
140
}
141
141
})
142
142
.AcksHandler ([selfCtx = SelfContext](NTopic::TWriteSessionEvent::TAcksEvent& ev) {
@@ -181,6 +181,8 @@ void TFederatedWriteSessionImpl::OpenSubsessionImpl(std::shared_ptr<TDbInfo> db)
181
181
182
182
Subsession = subclient->CreateWriteSession (wsSettings);
183
183
CurrentDatabase = db;
184
+
185
+ return oldSubsession;
184
186
}
185
187
186
188
std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHashImpl (
@@ -265,13 +267,13 @@ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseImpl(
265
267
return SelectDatabaseByHashImpl (settings, dbInfos);
266
268
}
267
269
268
- void TFederatedWriteSessionImpl::OnFederationStateUpdateImpl () {
270
+ std::shared_ptr<NTopic::IWriteSession> TFederatedWriteSessionImpl::OnFederationStateUpdateImpl () {
269
271
Y_ABORT_UNLESS (Lock.IsLocked ());
270
272
if (!FederationState->Status .IsSuccess ()) {
271
273
// The observer became stale, it won't try to get federation state anymore due to retry policy,
272
274
// so there's no reason to keep the write session alive.
273
275
CloseImpl (FederationState->Status .GetStatus (), NYql::TIssues (FederationState->Status .GetIssues ()));
274
- return ;
276
+ return {} ;
275
277
}
276
278
277
279
Y_ABORT_UNLESS (!FederationState->DbInfos .empty ());
@@ -290,19 +292,22 @@ void TFederatedWriteSessionImpl::OnFederationStateUpdateImpl() {
290
292
LOG_LAZY (Log, TLOG_ERR, GetLogPrefixImpl () << message << " . Status: " << status);
291
293
CloseImpl (status, NYql::TIssues{NYql::TIssue (message)});
292
294
}
293
- return ;
295
+ return {} ;
294
296
}
295
297
RetryState.reset ();
296
298
299
+ std::shared_ptr<NTopic::IWriteSession> oldSubsession;
297
300
if (!DatabasesAreSame (preferrableDb, CurrentDatabase)) {
298
301
LOG_LAZY (Log, TLOG_INFO, GetLogPrefixImpl ()
299
302
<< " Start federated write session to database '" << preferrableDb->name ()
300
303
<< " ' (previous was " << (CurrentDatabase ? CurrentDatabase->name () : " <empty>" ) << " )"
301
304
<< " FederationState: " << *FederationState);
302
- OpenSubsessionImpl (preferrableDb);
305
+ oldSubsession = OpenSubsessionImpl (preferrableDb);
303
306
}
304
307
305
308
ScheduleFederationStateUpdateImpl (UPDATE_FEDERATION_STATE_DELAY);
309
+
310
+ return oldSubsession;
306
311
}
307
312
308
313
void TFederatedWriteSessionImpl::ScheduleFederationStateUpdateImpl (TDuration delay) {
@@ -312,8 +317,10 @@ void TFederatedWriteSessionImpl::ScheduleFederationStateUpdateImpl(TDuration del
312
317
if (auto self = selfCtx->LockShared ()) {
313
318
std::shared_ptr<NTopic::IWriteSession> old;
314
319
with_lock (self->Lock ) {
315
- self->UpdateFederationStateImpl ();
316
- old = std::move (self->OldSubsession );
320
+ old = self->UpdateFederationStateImpl ();
321
+ }
322
+ if (old) {
323
+ old->Close (TDuration::Zero ());
317
324
}
318
325
}
319
326
}
@@ -376,24 +383,20 @@ void TFederatedWriteSessionImpl::WriteEncoded(NTopic::TContinuationToken&& token
376
383
}
377
384
378
385
void TFederatedWriteSessionImpl::WriteInternal (NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) {
379
- TDeferredWrite deferred (Subsession);
380
-
381
386
with_lock (Lock) {
382
387
ClientHasToken = false ;
383
388
if (!wrapped.Message .CreateTimestamp_ .has_value ()) {
384
389
wrapped.Message .CreateTimestamp_ = TInstant::Now ();
385
390
}
386
391
BufferFreeSpace -= wrapped.Message .Data .size ();
387
392
OriginalMessagesToPassDown.emplace_back (std::move (wrapped));
388
- PrepareDeferredWriteImpl (deferred );
393
+ MaybeWriteImpl ( );
389
394
}
390
395
391
- deferred.DoWrite ();
392
-
393
396
IssueTokenIfAllowed ();
394
397
}
395
398
396
- bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl (TDeferredWrite& deferred ) {
399
+ bool TFederatedWriteSessionImpl::MaybeWriteImpl ( ) {
397
400
Y_ABORT_UNLESS (Lock.IsLocked ());
398
401
if (!PendingToken.has_value ()) {
399
402
return false ;
@@ -403,9 +406,7 @@ bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl(TDeferredWrite& deferr
403
406
}
404
407
OriginalMessagesToGetAck.push_back (std::move (OriginalMessagesToPassDown.front ()));
405
408
OriginalMessagesToPassDown.pop_front ();
406
- deferred.Writer = Subsession;
407
- deferred.Token .emplace (std::move (*PendingToken));
408
- deferred.Message .emplace (std::move (OriginalMessagesToGetAck.back ().Message ));
409
+ Subsession->Write (std::move (*PendingToken), std::move (OriginalMessagesToGetAck.back ().Message ));
409
410
PendingToken.reset ();
410
411
return true ;
411
412
}
0 commit comments