Skip to content

Commit 2f1f994

Browse files
qyryqGazizonoki
authored andcommitted
Moved "ydb_persqueue_public: cancel previous client context on reconnect" commit from ydb repo
1 parent c16fe48 commit 2f1f994

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

src/client/persqueue_public/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
add_subdirectory(codecs)
2+
add_subdirectory(impl)
23

34
add_library(cpp-client-ydb_persqueue_public INTERFACE)
45

56
target_link_libraries(cpp-client-ydb_persqueue_public INTERFACE
67
yutil
78
cpp-client-ydb_persqueue_core
9+
client-ydb_persqueue_public-impl
810
client-ydb_persqueue_public-codecs
911
)
1012

src/client/persqueue_public/impl/write_session_impl.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -453,18 +453,18 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const std::string& end
453453
}
454454
++ConnectionGeneration;
455455
auto subclient = Client->GetClientForEndpoint(endpoint);
456-
connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory();
457456
auto clientContext = subclient->CreateContext();
458-
ConnectionFactory = connectionFactory;
459-
460-
ClientContext = std::move(clientContext);
461-
ServerMessage = std::make_shared<TServerMessage>();
462-
463-
if (!ClientContext) {
457+
if (!clientContext) {
464458
AbortImpl();
465459
// Grpc and WriteSession is closing right now.
466460
return;
467461
}
462+
auto prevClientContext = std::exchange(ClientContext, clientContext);
463+
464+
ServerMessage = std::make_shared<TServerMessage>();
465+
466+
connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory();
467+
ConnectionFactory = connectionFactory;
468468

469469
connectContext = ClientContext->CreateContext();
470470
if (delay)
@@ -485,8 +485,10 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const std::string& end
485485
if (prevConnectDelayContext)
486486
Cancel(prevConnectDelayContext);
487487
Cancel(prevConnectTimeoutContext);
488+
Cancel(prevClientContext);
488489
Y_ASSERT(connectContext);
489490
Y_ASSERT(connectTimeoutContext);
491+
490492
reqSettings = TRpcRequestSettings::Make(Settings);
491493

492494
connectCallback = [cbContext = SelfContext,

0 commit comments

Comments
 (0)