Skip to content

Commit c9cb52e

Browse files
qyryqGazizonoki
authored andcommitted
Moved "Cancel existing Processor in OnConnect" commit from ydb repo
1 parent 995d8a8 commit c9cb52e

File tree

3 files changed

+11
-0
lines changed

3 files changed

+11
-0
lines changed

src/client/persqueue_public/impl/write_session_impl.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,10 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const std::string& end
454454
Y_ASSERT(ConnectContext);
455455
Y_ASSERT(ConnectTimeoutContext);
456456

457+
if (Processor) {
458+
Processor->Cancel();
459+
}
460+
457461
// Cancel previous operations.
458462
Cancel(prevConnectContext);
459463
if (prevConnectDelayContext)

src/client/topic/impl/read_session_impl.ipp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,9 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain
279279
Cancel(connectTimeoutContext);
280280
return false;
281281
}
282+
if (Processor) {
283+
Processor->Cancel();
284+
}
282285
Processor = nullptr;
283286
WaitingReadResponse = false;
284287
ServerMessage = std::make_shared<TServerMessage<UseMigrationProtocol>>();

src/client/topic/impl/write_session_impl.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,10 @@ void TWriteSessionImpl::Connect(const TDuration& delay) {
618618
}
619619
Cancel(prevConnectTimeoutContext);
620620

621+
if (Processor) {
622+
Processor->Cancel();
623+
}
624+
621625
reqSettings = TRpcRequestSettings::Make(Settings, PreferredPartitionLocation.Endpoint);
622626

623627
connectCallback = [cbContext = SelfContext,

0 commit comments

Comments
 (0)