@@ -24,11 +24,13 @@ bool DatabasesAreSame(std::shared_ptr<TDbInfo> lhs, std::shared_ptr<TDbInfo> rhs
24
24
25
25
NTopic::TTopicClientSettings FromFederated (const TFederatedTopicClientSettings& settings);
26
26
27
- TFederatedWriteSession::TFederatedWriteSession (const TFederatedWriteSessionSettings& settings,
28
- std::shared_ptr<TGRpcConnectionsImpl> connections,
29
- const TFederatedTopicClientSettings& clientSetttings,
30
- std::shared_ptr<TFederatedDbObserver> observer,
31
- std::shared_ptr<std::unordered_map<NTopic::ECodec, THolder<NTopic::ICodec>>> codecs)
27
+ TFederatedWriteSessionImpl::TFederatedWriteSessionImpl (
28
+ const TFederatedWriteSessionSettings& settings,
29
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
30
+ const TFederatedTopicClientSettings& clientSetttings,
31
+ std::shared_ptr<TFederatedDbObserver> observer,
32
+ std::shared_ptr<std::unordered_map<NTopic::ECodec, THolder<NTopic::ICodec>>> codecs
33
+ )
32
34
: Settings(settings)
33
35
, Connections(std::move(connections))
34
36
, SubClientSetttings(FromFederated(clientSetttings))
@@ -42,27 +44,32 @@ TFederatedWriteSession::TFederatedWriteSession(const TFederatedWriteSessionSetti
42
44
{
43
45
}
44
46
45
- TStringBuilder TFederatedWriteSession ::GetLogPrefix () const {
47
+ TStringBuilder TFederatedWriteSessionImpl ::GetLogPrefix () const {
46
48
return TStringBuilder () << GetDatabaseLogPrefix (SubClientSetttings.Database_ .value_or (" " )) << " [" << SessionId << " ] " ;
47
49
}
48
50
49
- void TFederatedWriteSession ::Start () {
51
+ void TFederatedWriteSessionImpl ::Start () {
50
52
// TODO validate settings?
51
53
Settings.EventHandlers_ .HandlersExecutor_ ->Start ();
52
54
with_lock (Lock){
53
55
ClientEventsQueue->PushEvent (NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
54
56
ClientHasToken = true ;
55
57
}
56
58
57
- AsyncInit.Subscribe ([self = shared_from_this () ](const auto & f){
59
+ AsyncInit.Subscribe ([selfCtx = SelfContext ](const auto & f){
58
60
Y_UNUSED (f);
59
- std::lock_guard guard (self->Lock );
60
- self->FederationState = self->Observer ->GetState ();
61
- self->OnFederatedStateUpdateImpl ();
61
+ if (auto self = selfCtx->LockShared ()) {
62
+ with_lock (self->Lock ) {
63
+ if (!self->Closing ) {
64
+ self->FederationState = self->Observer ->GetState ();
65
+ self->OnFederatedStateUpdateImpl ();
66
+ }
67
+ }
68
+ }
62
69
});
63
70
}
64
71
65
- void TFederatedWriteSession ::OpenSubSessionImpl (std::shared_ptr<TDbInfo> db) {
72
+ void TFederatedWriteSessionImpl ::OpenSubSessionImpl (std::shared_ptr<TDbInfo> db) {
66
73
if (Subsession) {
67
74
PendingToken.reset ();
68
75
Subsession->Close (TDuration::Zero ());
@@ -76,32 +83,41 @@ void TFederatedWriteSession::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db) {
76
83
77
84
auto handlers = NTopic::TWriteSessionSettings::TEventHandlers ()
78
85
.HandlersExecutor (Settings.EventHandlers_ .HandlersExecutor_ )
79
- .ReadyToAcceptHandler ([self = shared_from_this ()](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev){
80
- TDeferredWrite deferred (self->Subsession );
81
- {
82
- std::lock_guard guard (self->Lock );
83
- Y_ABORT_UNLESS (!self->PendingToken .has_value ());
84
- self->PendingToken = std::move (ev.ContinuationToken );
85
- self->PrepareDeferredWrite (deferred);
86
+ .ReadyToAcceptHandler ([selfCtx = SelfContext](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev) {
87
+ if (auto self = selfCtx->LockShared ()) {
88
+ TDeferredWrite deferred (self->Subsession );
89
+ with_lock (self->Lock ) {
90
+ Y_ABORT_UNLESS (!self->PendingToken .has_value ());
91
+ self->PendingToken = std::move (ev.ContinuationToken );
92
+ self->PrepareDeferredWrite (deferred);
93
+ }
94
+ deferred.DoWrite ();
86
95
}
87
- deferred.DoWrite ();
88
96
})
89
- .AcksHandler ([self = shared_from_this ()](NTopic::TWriteSessionEvent::TAcksEvent& ev){
90
- std::lock_guard guard (self->Lock );
91
- Y_ABORT_UNLESS (ev.Acks .size () <= self->OriginalMessagesToGetAck .size ());
92
- for (size_t i = 0 ; i < ev.Acks .size (); ++i) {
93
- self->BufferFreeSpace += self->OriginalMessagesToGetAck .front ().Data .size ();
94
- self->OriginalMessagesToGetAck .pop_front ();
95
- }
96
- self->ClientEventsQueue ->PushEvent (std::move (ev));
97
- if (self->BufferFreeSpace > 0 && !self->ClientHasToken ) {
98
- self->ClientEventsQueue ->PushEvent (NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
99
- self->ClientHasToken = true ;
97
+ .AcksHandler ([selfCtx = SelfContext](NTopic::TWriteSessionEvent::TAcksEvent& ev) {
98
+ if (auto self = selfCtx->LockShared ()) {
99
+ with_lock (self->Lock ) {
100
+ Y_ABORT_UNLESS (ev.Acks .size () <= self->OriginalMessagesToGetAck .size ());
101
+ for (size_t i = 0 ; i < ev.Acks .size (); ++i) {
102
+ self->BufferFreeSpace += self->OriginalMessagesToGetAck .front ().Data .size ();
103
+ self->OriginalMessagesToGetAck .pop_front ();
104
+ }
105
+ self->ClientEventsQueue ->PushEvent (std::move (ev));
106
+ if (self->BufferFreeSpace > 0 && !self->ClientHasToken ) {
107
+ self->ClientEventsQueue ->PushEvent (NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
108
+ self->ClientHasToken = true ;
109
+ }
110
+ }
100
111
}
101
112
})
102
- .SessionClosedHandler ([self = shared_from_this ()](const NTopic::TSessionClosedEvent & ev){
103
- std::lock_guard guard (self->Lock );
104
- self->ClientEventsQueue ->PushEvent (ev);
113
+ .SessionClosedHandler ([selfCtx = SelfContext](const NTopic::TSessionClosedEvent & ev) {
114
+ if (auto self = selfCtx->LockShared ()) {
115
+ with_lock (self->Lock ) {
116
+ if (!self->Closing ) {
117
+ self->CloseImpl (ev);
118
+ }
119
+ }
120
+ }
105
121
});
106
122
107
123
NTopic::TWriteSessionSettings wsSettings = Settings;
@@ -113,7 +129,7 @@ void TFederatedWriteSession::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db) {
113
129
CurrentDatabase = db;
114
130
}
115
131
116
- std::shared_ptr<TDbInfo> TFederatedWriteSession ::SelectDatabaseImpl () {
132
+ std::shared_ptr<TDbInfo> TFederatedWriteSessionImpl ::SelectDatabaseImpl () {
117
133
std::vector<std::shared_ptr<TDbInfo>> availableDatabases;
118
134
ui64 totalWeight = 0 ;
119
135
@@ -157,7 +173,7 @@ std::shared_ptr<TDbInfo> TFederatedWriteSession::SelectDatabaseImpl() {
157
173
Y_UNREACHABLE ();
158
174
}
159
175
160
- void TFederatedWriteSession ::OnFederatedStateUpdateImpl () {
176
+ void TFederatedWriteSessionImpl ::OnFederatedStateUpdateImpl () {
161
177
if (!FederationState->Status .IsSuccess ()) {
162
178
CloseImpl (FederationState->Status .GetStatus (), NYql::TIssues (FederationState->Status .GetIssues ()));
163
179
return ;
@@ -184,16 +200,19 @@ void TFederatedWriteSession::OnFederatedStateUpdateImpl() {
184
200
ScheduleFederatedStateUpdateImpl (UPDATE_FEDERATION_STATE_DELAY);
185
201
}
186
202
187
- void TFederatedWriteSession ::ScheduleFederatedStateUpdateImpl (TDuration delay) {
203
+ void TFederatedWriteSessionImpl ::ScheduleFederatedStateUpdateImpl (TDuration delay) {
188
204
Y_ABORT_UNLESS (Lock.IsLocked ());
189
- auto cb = [self = shared_from_this () ](bool ok) {
205
+ auto cb = [selfCtx = SelfContext ](bool ok) {
190
206
if (ok) {
191
- std::lock_guard guard (self->Lock );
192
- if (self->Closing ) {
193
- return ;
207
+ if (auto self = selfCtx->LockShared ()) {
208
+ with_lock (self->Lock ) {
209
+ if (self->Closing ) {
210
+ return ;
211
+ }
212
+ self->FederationState = self->Observer ->GetState ();
213
+ self->OnFederatedStateUpdateImpl ();
214
+ }
194
215
}
195
- self->FederationState = self->Observer ->GetState ();
196
- self->OnFederatedStateUpdateImpl ();
197
216
}
198
217
};
199
218
@@ -208,24 +227,24 @@ void TFederatedWriteSession::ScheduleFederatedStateUpdateImpl(TDuration delay) {
208
227
UpdateStateDelayContext);
209
228
}
210
229
211
- NThreading::TFuture<void > TFederatedWriteSession ::WaitEvent () {
230
+ NThreading::TFuture<void > TFederatedWriteSessionImpl ::WaitEvent () {
212
231
return ClientEventsQueue->WaitEvent ();
213
232
}
214
233
215
- std::vector<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSession ::GetEvents (bool block, std::optional<size_t > maxEventsCount) {
234
+ std::vector<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSessionImpl ::GetEvents (bool block, std::optional<size_t > maxEventsCount) {
216
235
return ClientEventsQueue->GetEvents (block, maxEventsCount);
217
236
}
218
237
219
- std::optional<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSession ::GetEvent (bool block) {
238
+ std::optional<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSessionImpl ::GetEvent (bool block) {
220
239
auto events = GetEvents (block, 1 );
221
240
return events.empty () ? std::nullopt : std::optional<NTopic::TWriteSessionEvent::TEvent>{std::move (events.front ())};
222
241
}
223
242
224
- NThreading::TFuture<ui64> TFederatedWriteSession ::GetInitSeqNo () {
243
+ NThreading::TFuture<ui64> TFederatedWriteSessionImpl ::GetInitSeqNo () {
225
244
return NThreading::MakeFuture<ui64>(0u );
226
245
}
227
246
228
- void TFederatedWriteSession ::Write (NTopic::TContinuationToken&& token, std::string_view data, std::optional<ui64> seqNo,
247
+ void TFederatedWriteSessionImpl ::Write (NTopic::TContinuationToken&& token, std::string_view data, std::optional<ui64> seqNo,
229
248
std::optional<TInstant> createTimestamp) {
230
249
NTopic::TWriteMessage message{std::move (data)};
231
250
if (seqNo.has_value ())
@@ -235,11 +254,11 @@ void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, std::stri
235
254
return WriteInternal (std::move (token), std::move (message));
236
255
}
237
256
238
- void TFederatedWriteSession ::Write (NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
257
+ void TFederatedWriteSessionImpl ::Write (NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
239
258
return WriteInternal (std::move (token), TWrappedWriteMessage (std::move (message)));
240
259
}
241
260
242
- void TFederatedWriteSession ::WriteEncoded (NTopic::TContinuationToken&& token, std::string_view data, NTopic::ECodec codec,
261
+ void TFederatedWriteSessionImpl ::WriteEncoded (NTopic::TContinuationToken&& token, std::string_view data, NTopic::ECodec codec,
243
262
ui32 originalSize, std::optional<ui64> seqNo, std::optional<TInstant> createTimestamp) {
244
263
auto message = NTopic::TWriteMessage::CompressedMessage (std::move (data), codec, originalSize);
245
264
if (seqNo.has_value ())
@@ -249,11 +268,11 @@ void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, st
249
268
return WriteInternal (std::move (token), TWrappedWriteMessage (std::move (message)));
250
269
}
251
270
252
- void TFederatedWriteSession ::WriteEncoded (NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
271
+ void TFederatedWriteSessionImpl ::WriteEncoded (NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
253
272
return WriteInternal (std::move (token), TWrappedWriteMessage (std::move (message)));
254
273
}
255
274
256
- void TFederatedWriteSession ::WriteInternal (NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) {
275
+ void TFederatedWriteSessionImpl ::WriteInternal (NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) {
257
276
ClientHasToken = false ;
258
277
if (!wrapped.Message .CreateTimestamp_ .has_value ()) {
259
278
wrapped.Message .CreateTimestamp_ = TInstant::Now ();
@@ -275,7 +294,7 @@ void TFederatedWriteSession::WriteInternal(NTopic::TContinuationToken&&, TWrappe
275
294
}
276
295
}
277
296
278
- bool TFederatedWriteSession ::PrepareDeferredWrite (TDeferredWrite& deferred) {
297
+ bool TFederatedWriteSessionImpl ::PrepareDeferredWrite (TDeferredWrite& deferred) {
279
298
if (!PendingToken.has_value ()) {
280
299
return false ;
281
300
}
@@ -290,17 +309,25 @@ bool TFederatedWriteSession::PrepareDeferredWrite(TDeferredWrite& deferred) {
290
309
return true ;
291
310
}
292
311
293
- void TFederatedWriteSession::CloseImpl (EStatus statusCode, NYql::TIssues&& issues) {
312
+ void TFederatedWriteSessionImpl::CloseImpl (EStatus statusCode, NYql::TIssues&& issues, TDuration timeout) {
313
+ CloseImpl (TPlainStatus (statusCode, std::move (issues)), timeout);
314
+ }
315
+
316
+ void TFederatedWriteSessionImpl::CloseImpl (NTopic::TSessionClosedEvent const & ev, TDuration timeout) {
317
+ if (Closing) {
318
+ return ;
319
+ }
294
320
Closing = true ;
295
321
if (Subsession) {
296
- Subsession->Close (TDuration::Zero () );
322
+ Subsession->Close (timeout );
297
323
}
298
- ClientEventsQueue->Close (TSessionClosedEvent (statusCode, std::move (issues)));
324
+ ClientEventsQueue->Close (ev);
325
+ NTopic::Cancel (UpdateStateDelayContext);
299
326
}
300
327
301
- bool TFederatedWriteSession ::Close (TDuration timeout) {
302
- if (Subsession ) {
303
- return Subsession-> Close ( timeout);
328
+ bool TFederatedWriteSessionImpl ::Close (TDuration timeout) {
329
+ with_lock (Lock ) {
330
+ CloseImpl (EStatus::SUCCESS, {}, timeout);
304
331
}
305
332
return true ;
306
333
}
0 commit comments