@@ -27,13 +27,13 @@ NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings&
27
27
TFederatedWriteSessionImpl::TFederatedWriteSessionImpl (
28
28
const TFederatedWriteSessionSettings& settings,
29
29
std::shared_ptr<TGRpcConnectionsImpl> connections,
30
- const TFederatedTopicClientSettings& clientSetttings ,
30
+ const TFederatedTopicClientSettings& clientSettings ,
31
31
std::shared_ptr<TFederatedDbObserver> observer,
32
32
std::shared_ptr<std::unordered_map<NTopic::ECodec, THolder<NTopic::ICodec>>> codecs
33
33
)
34
34
: Settings(settings)
35
35
, Connections(std::move(connections))
36
- , SubClientSetttings (FromFederated(clientSetttings ))
36
+ , SubclientSettings (FromFederated(clientSettings ))
37
37
, ProvidedCodecs(std::move(codecs))
38
38
, Observer(std::move(observer))
39
39
, AsyncInit(Observer->WaitForFirstState ())
@@ -44,37 +44,72 @@ TFederatedWriteSessionImpl::TFederatedWriteSessionImpl(
44
44
{
45
45
}
46
46
47
- TStringBuilder TFederatedWriteSessionImpl::GetLogPrefix () const {
48
- return TStringBuilder () << GetDatabaseLogPrefix (SubClientSetttings.Database_ .value_or (" " )) << " [" << SessionId << " ] " ;
47
+ TStringBuilder TFederatedWriteSessionImpl::GetLogPrefixImpl () const {
48
+ return TStringBuilder () << GetDatabaseLogPrefix (SubclientSettings.Database_ .value_or (" " )) << " [" << SessionId << " ] " ;
49
+ }
50
+
51
+
52
+ bool TFederatedWriteSessionImpl::MessageQueuesAreEmptyImpl () const {
53
+ Y_ABORT_UNLESS (Lock.IsLocked ());
54
+ return OriginalMessagesToGetAck.empty () && OriginalMessagesToPassDown.empty ();
55
+ }
56
+
57
+ void TFederatedWriteSessionImpl::IssueTokenIfAllowed () {
58
+ // The session should not issue tokens after it has transitioned to CLOSING or CLOSE state.
59
+ // A user may have one spare token, so at most one additional message
60
+ // could be written to the internal queue after the transition.
61
+ bool issue = false ;
62
+ with_lock (Lock) {
63
+ if (BufferFreeSpace > 0 && !ClientHasToken && SessionState < State::CLOSING) {
64
+ ClientHasToken = true ;
65
+ issue = true ;
66
+ }
67
+ }
68
+ if (issue) {
69
+ ClientEventsQueue->PushEvent (NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
70
+ }
71
+ }
72
+
73
+ void TFederatedWriteSessionImpl::UpdateFederationStateImpl () {
74
+ Y_ABORT_UNLESS (Lock.IsLocked ());
75
+ // Even after the user has called the Close method, transitioning the session to the CLOSING state,
76
+ // we keep updating the federation state, as the session may still have some messages to send in its queues,
77
+ // and for that we need to know the current state of the federation.
78
+ if (SessionState < State::CLOSED) {
79
+ FederationState = Observer->GetState ();
80
+ OnFederationStateUpdateImpl ();
81
+ }
49
82
}
50
83
51
84
void TFederatedWriteSessionImpl::Start () {
52
85
// TODO validate settings?
53
- Settings.EventHandlers_ .HandlersExecutor_ ->Start ();
54
86
with_lock (Lock){
55
- ClientEventsQueue->PushEvent (NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
56
- ClientHasToken = true ;
87
+ if (SessionState != State::CREATED) {
88
+ return ;
89
+ }
90
+ SessionState = State::WORKING;
91
+ Settings.EventHandlers_ .HandlersExecutor_ ->Start ();
57
92
}
58
93
59
- AsyncInit.Subscribe ([selfCtx = SelfContext](const auto & f){
94
+ IssueTokenIfAllowed ();
95
+
96
+ AsyncInit.Subscribe ([selfCtx = SelfContext](const auto & f) {
60
97
Y_UNUSED (f);
61
98
if (auto self = selfCtx->LockShared ()) {
62
99
with_lock (self->Lock ) {
63
- if (!self->Closing ) {
64
- self->FederationState = self->Observer ->GetState ();
65
- self->OnFederatedStateUpdateImpl ();
66
- }
100
+ self->UpdateFederationStateImpl ();
67
101
}
68
102
}
69
103
});
70
104
}
71
105
72
- void TFederatedWriteSessionImpl::OpenSubSessionImpl (std::shared_ptr<TDbInfo> db) {
106
+ void TFederatedWriteSessionImpl::OpenSubsessionImpl (std::shared_ptr<TDbInfo> db) {
107
+ Y_ABORT_UNLESS (Lock.IsLocked ());
73
108
if (Subsession) {
74
109
PendingToken.reset ();
75
110
Subsession->Close (TDuration::Zero ());
76
111
}
77
- NTopic::TTopicClientSettings clientSettings = SubClientSetttings ;
112
+ auto clientSettings = SubclientSettings ;
78
113
clientSettings
79
114
.Database (db->path ())
80
115
.DiscoveryEndpoint (db->endpoint ());
@@ -89,7 +124,7 @@ void TFederatedWriteSessionImpl::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db)
89
124
with_lock (self->Lock ) {
90
125
Y_ABORT_UNLESS (!self->PendingToken .has_value ());
91
126
self->PendingToken = std::move (ev.ContinuationToken );
92
- self->PrepareDeferredWrite (deferred);
127
+ self->PrepareDeferredWriteImpl (deferred);
93
128
}
94
129
deferred.DoWrite ();
95
130
}
@@ -98,24 +133,25 @@ void TFederatedWriteSessionImpl::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db)
98
133
if (auto self = selfCtx->LockShared ()) {
99
134
with_lock (self->Lock ) {
100
135
Y_ABORT_UNLESS (ev.Acks .size () <= self->OriginalMessagesToGetAck .size ());
136
+
101
137
for (size_t i = 0 ; i < ev.Acks .size (); ++i) {
102
138
self->BufferFreeSpace += self->OriginalMessagesToGetAck .front ().Data .size ();
103
139
self->OriginalMessagesToGetAck .pop_front ();
104
140
}
105
- self->ClientEventsQueue ->PushEvent (std::move (ev));
106
- if (self->BufferFreeSpace > 0 && !self->ClientHasToken ) {
107
- self->ClientEventsQueue ->PushEvent (NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
108
- self->ClientHasToken = true ;
141
+
142
+ if (self->MessageQueuesAreEmptyImpl () && self->MessageQueuesHaveBeenEmptied .Initialized () && !self->MessageQueuesHaveBeenEmptied .HasValue ()) {
143
+ self->MessageQueuesHaveBeenEmptied .SetValue ();
109
144
}
110
145
}
146
+
147
+ self->ClientEventsQueue ->PushEvent (std::move (ev));
148
+ self->IssueTokenIfAllowed ();
111
149
}
112
150
})
113
151
.SessionClosedHandler ([selfCtx = SelfContext](const NTopic::TSessionClosedEvent & ev) {
114
152
if (auto self = selfCtx->LockShared ()) {
115
153
with_lock (self->Lock ) {
116
- if (!self->Closing ) {
117
- self->CloseImpl (ev);
118
- }
154
+ self->CloseImpl (ev);
119
155
}
120
156
}
121
157
});
@@ -129,7 +165,7 @@ void TFederatedWriteSessionImpl::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db)
129
165
CurrentDatabase = db;
130
166
}
131
167
132
- std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHash (
168
+ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHashImpl (
133
169
NTopic::TFederatedWriteSessionSettings const & settings,
134
170
std::vector<std::shared_ptr<TDbInfo>> const & dbInfos
135
171
) {
@@ -163,7 +199,7 @@ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseByHash(
163
199
Y_UNREACHABLE ();
164
200
}
165
201
166
- std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabase (
202
+ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabaseImpl (
167
203
NTopic::TFederatedWriteSessionSettings const & settings,
168
204
std::vector<std::shared_ptr<TDbInfo>> const & dbInfos, std::string const & selfLocation
169
205
) {
@@ -197,7 +233,7 @@ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabase(
197
233
if (!settings.AllowFallback_ ) {
198
234
return {nullptr , EStatus::NOT_FOUND};
199
235
}
200
- return SelectDatabaseByHash (settings, dbInfos);
236
+ return SelectDatabaseByHashImpl (settings, dbInfos);
201
237
}
202
238
}
203
239
@@ -208,69 +244,64 @@ std::pair<std::shared_ptr<TDbInfo>, EStatus> SelectDatabase(
208
244
if (!settings.AllowFallback_ ) {
209
245
return {nullptr , EStatus::UNAVAILABLE};
210
246
}
211
- return SelectDatabaseByHash (settings, dbInfos);
212
- }
213
-
214
- std::pair<std::shared_ptr<TDbInfo>, EStatus> TFederatedWriteSessionImpl::SelectDatabaseImpl () {
215
- return SelectDatabase (Settings, FederationState->DbInfos , FederationState->SelfLocation );
247
+ return SelectDatabaseByHashImpl (settings, dbInfos);
216
248
}
217
249
218
- void TFederatedWriteSessionImpl::OnFederatedStateUpdateImpl () {
250
+ void TFederatedWriteSessionImpl::OnFederationStateUpdateImpl () {
251
+ Y_ABORT_UNLESS (Lock.IsLocked ());
219
252
if (!FederationState->Status .IsSuccess ()) {
253
+ // The observer became stale, it won't try to get federation state anymore due to retry policy,
254
+ // so there's no reason to keep the write session alive.
220
255
CloseImpl (FederationState->Status .GetStatus (), NYql::TIssues (FederationState->Status .GetIssues ()));
221
256
return ;
222
257
}
223
258
224
259
Y_ABORT_UNLESS (!FederationState->DbInfos .empty ());
225
260
226
- auto [preferrableDb, status] = SelectDatabaseImpl ();
261
+ auto [preferrableDb, status] = SelectDatabaseImpl (Settings, FederationState-> DbInfos , FederationState-> SelfLocation );
227
262
228
263
if (!preferrableDb) {
229
264
if (!RetryState) {
230
265
RetryState = Settings.RetryPolicy_ ->CreateRetryState ();
231
266
}
232
267
if (auto delay = RetryState->GetNextRetryDelay (status)) {
233
- LOG_LAZY (Log, TLOG_NOTICE, GetLogPrefix () << " Retry to update federation state in " << delay);
234
- ScheduleFederatedStateUpdateImpl (*delay);
268
+ LOG_LAZY (Log, TLOG_NOTICE, GetLogPrefixImpl () << " Retry to update federation state in " << delay);
269
+ ScheduleFederationStateUpdateImpl (*delay);
235
270
} else {
236
271
std::string message = " Failed to select database: no available database" ;
237
- LOG_LAZY (Log, TLOG_ERR, GetLogPrefix () << message);
272
+ LOG_LAZY (Log, TLOG_ERR, GetLogPrefixImpl () << message << " . Status: " << status );
238
273
CloseImpl (status, NYql::TIssues{NYql::TIssue (message)});
239
274
}
240
275
return ;
241
276
}
242
277
RetryState.reset ();
243
278
244
279
if (!DatabasesAreSame (preferrableDb, CurrentDatabase)) {
245
- LOG_LAZY (Log, TLOG_INFO, GetLogPrefix ()
280
+ LOG_LAZY (Log, TLOG_INFO, GetLogPrefixImpl ()
246
281
<< " Start federated write session to database '" << preferrableDb->name ()
247
282
<< " ' (previous was " << (CurrentDatabase ? CurrentDatabase->name () : " <empty>" ) << " )"
248
283
<< " FederationState: " << *FederationState);
249
- OpenSubSessionImpl (preferrableDb);
284
+ OpenSubsessionImpl (preferrableDb);
250
285
}
251
286
252
- ScheduleFederatedStateUpdateImpl (UPDATE_FEDERATION_STATE_DELAY);
287
+ ScheduleFederationStateUpdateImpl (UPDATE_FEDERATION_STATE_DELAY);
253
288
}
254
289
255
- void TFederatedWriteSessionImpl::ScheduleFederatedStateUpdateImpl (TDuration delay) {
290
+ void TFederatedWriteSessionImpl::ScheduleFederationStateUpdateImpl (TDuration delay) {
256
291
Y_ABORT_UNLESS (Lock.IsLocked ());
257
292
auto cb = [selfCtx = SelfContext](bool ok) {
258
293
if (ok) {
259
294
if (auto self = selfCtx->LockShared ()) {
260
295
with_lock (self->Lock ) {
261
- if (self->Closing ) {
262
- return ;
263
- }
264
- self->FederationState = self->Observer ->GetState ();
265
- self->OnFederatedStateUpdateImpl ();
296
+ self->UpdateFederationStateImpl ();
266
297
}
267
298
}
268
299
}
269
300
};
270
301
271
302
UpdateStateDelayContext = Connections->CreateContext ();
272
303
if (!UpdateStateDelayContext) {
273
- Closing = true ;
304
+ CloseImpl (EStatus::TRANSPORT_UNAVAILABLE, NYql::TIssues{ NYql::TIssue ( " Could not update federation state " )}) ;
274
305
// TODO log DRIVER_IS_STOPPING_DESCRIPTION
275
306
return ;
276
307
}
@@ -325,28 +356,25 @@ void TFederatedWriteSessionImpl::WriteEncoded(NTopic::TContinuationToken&& token
325
356
}
326
357
327
358
void TFederatedWriteSessionImpl::WriteInternal (NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) {
328
- ClientHasToken = false ;
329
- if (!wrapped.Message .CreateTimestamp_ .has_value ()) {
330
- wrapped.Message .CreateTimestamp_ = TInstant::Now ();
331
- }
359
+ TDeferredWrite deferred (Subsession);
332
360
333
- {
334
- TDeferredWrite deferred (Subsession);
335
- with_lock (Lock) {
336
- BufferFreeSpace -= wrapped.Message .Data .size ();
337
- OriginalMessagesToPassDown.emplace_back (std::move (wrapped));
338
-
339
- PrepareDeferredWrite (deferred);
361
+ with_lock (Lock) {
362
+ ClientHasToken = false ;
363
+ if (!wrapped.Message .CreateTimestamp_ .has_value ()) {
364
+ wrapped.Message .CreateTimestamp_ = TInstant::Now ();
340
365
}
341
- deferred.DoWrite ();
342
- }
343
- if (BufferFreeSpace > 0 ) {
344
- ClientEventsQueue->PushEvent (NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
345
- ClientHasToken = true ;
366
+ BufferFreeSpace -= wrapped.Message .Data .size ();
367
+ OriginalMessagesToPassDown.emplace_back (std::move (wrapped));
368
+ PrepareDeferredWriteImpl (deferred);
346
369
}
370
+
371
+ deferred.DoWrite ();
372
+
373
+ IssueTokenIfAllowed ();
347
374
}
348
375
349
- bool TFederatedWriteSessionImpl::PrepareDeferredWrite (TDeferredWrite& deferred) {
376
+ bool TFederatedWriteSessionImpl::PrepareDeferredWriteImpl (TDeferredWrite& deferred) {
377
+ Y_ABORT_UNLESS (Lock.IsLocked ());
350
378
if (!PendingToken.has_value ()) {
351
379
return false ;
352
380
}
@@ -361,27 +389,47 @@ bool TFederatedWriteSessionImpl::PrepareDeferredWrite(TDeferredWrite& deferred)
361
389
return true ;
362
390
}
363
391
364
- void TFederatedWriteSessionImpl::CloseImpl (EStatus statusCode, NYql::TIssues&& issues, TDuration timeout ) {
365
- CloseImpl (TPlainStatus (statusCode, std::move (issues)), timeout );
392
+ void TFederatedWriteSessionImpl::CloseImpl (EStatus statusCode, NYql::TIssues&& issues) {
393
+ CloseImpl (TPlainStatus (statusCode, std::move (issues)));
366
394
}
367
395
368
- void TFederatedWriteSessionImpl::CloseImpl (NTopic::TSessionClosedEvent const & ev, TDuration timeout) {
369
- if (Closing) {
396
+ void TFederatedWriteSessionImpl::CloseImpl (NTopic::TSessionClosedEvent const & ev) {
397
+ Y_ABORT_UNLESS (Lock.IsLocked ());
398
+ if (SessionState == State::CLOSED) {
370
399
return ;
371
400
}
372
- Closing = true ;
373
- if (Subsession) {
374
- Subsession->Close (timeout);
375
- }
376
- ClientEventsQueue->Close (ev);
401
+ SessionState = State::CLOSED;
377
402
NTopic::Cancel (UpdateStateDelayContext);
403
+ if (!HasBeenClosed.HasValue ()) {
404
+ HasBeenClosed.SetValue ();
405
+ }
406
+ {
407
+ auto unguard = Unguard (Lock);
408
+ ClientEventsQueue->Close (ev);
409
+ }
378
410
}
379
411
380
412
bool TFederatedWriteSessionImpl::Close (TDuration timeout) {
381
413
with_lock (Lock) {
382
- CloseImpl (EStatus::SUCCESS, {}, timeout);
414
+ if (SessionState == State::CLOSED) {
415
+ return MessageQueuesAreEmptyImpl ();
416
+ }
417
+ SessionState = State::CLOSING;
418
+ if (!MessageQueuesHaveBeenEmptied.Initialized ()) {
419
+ MessageQueuesHaveBeenEmptied = NThreading::NewPromise ();
420
+ if (MessageQueuesAreEmptyImpl ()) {
421
+ MessageQueuesHaveBeenEmptied.SetValue ();
422
+ }
423
+ }
424
+ }
425
+
426
+ std::vector<NThreading::TFuture<void >> futures{MessageQueuesHaveBeenEmptied.GetFuture (), HasBeenClosed.GetFuture ()};
427
+ NThreading::WaitAny (futures).Wait (timeout);
428
+
429
+ with_lock (Lock) {
430
+ CloseImpl (EStatus::SUCCESS, NYql::TIssues{});
431
+ return MessageQueuesAreEmptyImpl ();
383
432
}
384
- return true ;
385
433
}
386
434
387
435
} // namespace NYdb::NFederatedTopic
0 commit comments