@@ -29,12 +29,14 @@ TFederatedWriteSessionImpl::TFederatedWriteSessionImpl(
29
29
std::shared_ptr<TGRpcConnectionsImpl> connections,
30
30
const TFederatedTopicClientSettings& clientSettings,
31
31
std::shared_ptr<TFederatedDbObserver> observer,
32
- std::shared_ptr<std::unordered_map<NTopic::ECodec, THolder<NTopic::ICodec>>> codecs
32
+ std::shared_ptr<std::unordered_map<NTopic::ECodec, THolder<NTopic::ICodec>>> codecs,
33
+ NTopic::IExecutor::TPtr subsessionHandlersExecutor
33
34
)
34
35
: Settings(settings)
35
36
, Connections(std::move(connections))
36
37
, SubclientSettings(FromFederated(clientSettings))
37
38
, ProvidedCodecs(std::move(codecs))
39
+ , SubsessionHandlersExecutor(subsessionHandlersExecutor)
38
40
, Observer(std::move(observer))
39
41
, AsyncInit(Observer->WaitForFirstState ())
40
42
, FederationState(nullptr )
@@ -71,15 +73,16 @@ void TFederatedWriteSessionImpl::IssueTokenIfAllowed() {
71
73
}
72
74
}
73
75
74
- void TFederatedWriteSessionImpl::UpdateFederationStateImpl () {
76
+ std::shared_ptr<NTopic::IWriteSession> TFederatedWriteSessionImpl::UpdateFederationStateImpl () {
75
77
Y_ABORT_UNLESS (Lock.IsLocked ());
76
78
// Even after the user has called the Close method, transitioning the session to the CLOSING state,
77
79
// we keep updating the federation state, as the session may still have some messages to send in its queues,
78
80
// and for that we need to know the current state of the federation.
79
81
if (SessionState < State::CLOSED) {
80
82
FederationState = Observer->GetState ();
81
- OnFederationStateUpdateImpl ();
83
+ return OnFederationStateUpdateImpl ();
82
84
}
85
+ return {};
83
86
}
84
87
85
88
void TFederatedWriteSessionImpl::Start () {
@@ -104,15 +107,16 @@ void TFederatedWriteSessionImpl::Start() {
104
107
});
105
108
}
106
109
107
- void TFederatedWriteSessionImpl::OpenSubsessionImpl (std::shared_ptr<TDbInfo> db) {
110
+ std::shared_ptr<NTopic::IWriteSession> TFederatedWriteSessionImpl::OpenSubsessionImpl (std::shared_ptr<TDbInfo> db) {
108
111
Y_ABORT_UNLESS (Lock.IsLocked ());
109
112
110
113
++SubsessionGeneration;
111
114
115
+ std::shared_ptr<NTopic::IWriteSession> oldSubsession;
116
+
112
117
if (Subsession) {
113
118
PendingToken.Clear ();
114
- OldSubsession = std::move (Subsession);
115
- OldSubsession->Close (TDuration::Zero ());
119
+ std::swap (oldSubsession, Subsession);
116
120
}
117
121
118
122
auto clientSettings = SubclientSettings;
@@ -122,22 +126,18 @@ void TFederatedWriteSessionImpl::OpenSubsessionImpl(std::shared_ptr<TDbInfo> db)
122
126
auto subclient = make_shared<NTopic::TTopicClient::TImpl>(Connections, clientSettings);
123
127
124
128
auto handlers = NTopic::TWriteSessionSettings::TEventHandlers ()
125
- .HandlersExecutor (Settings. EventHandlers_ . HandlersExecutor_ )
129
+ .HandlersExecutor (SubsessionHandlersExecutor )
126
130
.ReadyToAcceptHandler ([selfCtx = SelfContext, generation = SubsessionGeneration](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev) {
127
131
if (auto self = selfCtx->LockShared ()) {
128
- TDeferredWrite deferred;
129
-
130
132
with_lock (self->Lock ) {
131
133
if (generation != self->SubsessionGeneration ) {
132
134
return ;
133
135
}
134
136
135
137
Y_ABORT_UNLESS (self->PendingToken .Empty ());
136
138
self->PendingToken = std::move (ev.ContinuationToken );
137
- self->PrepareDeferredWriteImpl (deferred );
139
+ self->MaybeWriteImpl ( );
138
140
}
139
-
140
- deferred.DoWrite ();
141
141
}
142
142
})
143
143
.AcksHandler ([selfCtx = SelfContext](NTopic::TWriteSessionEvent::TAcksEvent& ev) {
@@ -182,6 +182,8 @@ void TFederatedWriteSessionImpl::OpenSubsessionImpl(std::shared_ptr<TDbInfo> db)
182
182
183
183
Subsession = subclient->CreateWriteSession (wsSettings);
184
184
CurrentDatabase = db;
185
+
186
+ return oldSubsession;
185
187
}
186
188
187
189
std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHashImpl (
@@ -267,13 +269,13 @@ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseImpl(
267
269
return SelectDatabaseByHashImpl (settings, dbInfos);
268
270
}
269
271
270
- void TFederatedWriteSessionImpl::OnFederationStateUpdateImpl () {
272
+ std::shared_ptr<NTopic::IWriteSession> TFederatedWriteSessionImpl::OnFederationStateUpdateImpl () {
271
273
Y_ABORT_UNLESS (Lock.IsLocked ());
272
274
if (!FederationState->Status .IsSuccess ()) {
273
275
// The observer became stale, it won't try to get federation state anymore due to retry policy,
274
276
// so there's no reason to keep the write session alive.
275
277
CloseImpl (FederationState->Status .GetStatus (), NYql::TIssues (FederationState->Status .GetIssues ()));
276
- return ;
278
+ return {} ;
277
279
}
278
280
279
281
Y_ABORT_UNLESS (!FederationState->DbInfos .empty ());
@@ -292,19 +294,22 @@ void TFederatedWriteSessionImpl::OnFederationStateUpdateImpl() {
292
294
LOG_LAZY (Log, TLOG_ERR, GetLogPrefixImpl () << message << " . Status: " << status);
293
295
CloseImpl (status, NYql::TIssues{NYql::TIssue (message)});
294
296
}
295
- return ;
297
+ return {} ;
296
298
}
297
299
RetryState.reset ();
298
300
301
+ std::shared_ptr<NTopic::IWriteSession> oldSubsession;
299
302
if (!DatabasesAreSame (preferrableDb, CurrentDatabase)) {
300
303
LOG_LAZY (Log, TLOG_INFO, GetLogPrefixImpl ()
301
304
<< " Start federated write session to database '" << preferrableDb->name ()
302
305
<< " ' (previous was " << (CurrentDatabase ? CurrentDatabase->name () : " <empty>" ) << " )"
303
306
<< " FederationState: " << *FederationState);
304
- OpenSubsessionImpl (preferrableDb);
307
+ oldSubsession = OpenSubsessionImpl (preferrableDb);
305
308
}
306
309
307
310
ScheduleFederationStateUpdateImpl (UPDATE_FEDERATION_STATE_DELAY);
311
+
312
+ return oldSubsession;
308
313
}
309
314
310
315
void TFederatedWriteSessionImpl::ScheduleFederationStateUpdateImpl (TDuration delay) {
@@ -314,8 +319,10 @@ void TFederatedWriteSessionImpl::ScheduleFederationStateUpdateImpl(TDuration del
314
319
if (auto self = selfCtx->LockShared ()) {
315
320
std::shared_ptr<NTopic::IWriteSession> old;
316
321
with_lock (self->Lock ) {
317
- self->UpdateFederationStateImpl ();
318
- old = std::move (self->OldSubsession );
322
+ old = self->UpdateFederationStateImpl ();
323
+ }
324
+ if (old) {
325
+ old->Close (TDuration::Zero ());
319
326
}
320
327
}
321
328
}
@@ -378,24 +385,20 @@ void TFederatedWriteSessionImpl::WriteEncoded(NTopic::TContinuationToken&& token
378
385
}
379
386
380
387
void TFederatedWriteSessionImpl::WriteInternal (NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) {
381
- TDeferredWrite deferred (Subsession);
382
-
383
388
with_lock (Lock) {
384
389
ClientHasToken = false ;
385
390
if (!wrapped.Message .CreateTimestamp_ .Defined ()) {
386
391
wrapped.Message .CreateTimestamp_ = TInstant::Now ();
387
392
}
388
393
BufferFreeSpace -= wrapped.Message .Data .size ();
389
394
OriginalMessagesToPassDown.emplace_back (std::move (wrapped));
390
- PrepareDeferredWriteImpl (deferred );
395
+ MaybeWriteImpl ( );
391
396
}
392
397
393
- deferred.DoWrite ();
394
-
395
398
IssueTokenIfAllowed ();
396
399
}
397
400
398
- bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl (TDeferredWrite& deferred ) {
401
+ bool TFederatedWriteSessionImpl::MaybeWriteImpl ( ) {
399
402
Y_ABORT_UNLESS (Lock.IsLocked ());
400
403
if (PendingToken.Empty ()) {
401
404
return false ;
@@ -405,9 +408,7 @@ bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl(TDeferredWrite& deferr
405
408
}
406
409
OriginalMessagesToGetAck.push_back (std::move (OriginalMessagesToPassDown.front ()));
407
410
OriginalMessagesToPassDown.pop_front ();
408
- deferred.Writer = Subsession;
409
- deferred.Token .ConstructInPlace (std::move (*PendingToken));
410
- deferred.Message .ConstructInPlace (std::move (OriginalMessagesToGetAck.back ().Message ));
411
+ Subsession->Write (std::move (*PendingToken), std::move (OriginalMessagesToGetAck.back ().Message ));
411
412
PendingToken.Clear ();
412
413
return true ;
413
414
}
0 commit comments