Skip to content

Commit 588e6a9

Browse files
qyryqGazizonoki
authored andcommitted
Moved commit "Revert separate lock for Processor->Write calls" from ydb repo
1 parent e3f9cf1 commit 588e6a9

File tree

2 files changed

+10
-29
lines changed

2 files changed

+10
-29
lines changed

src/client/topic/impl/write_session_impl.cpp

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -978,8 +978,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
978978
FirstTokenSent = true;
979979
}
980980
// Kickstart send after session reestablishment
981-
FormGrpcMessagesImpl();
982-
SendGrpcMessages();
981+
SendImpl();
983982
break;
984983
}
985984
case TServerMessage::kWriteResponse: {
@@ -1167,14 +1166,12 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
11671166

11681167
void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
11691168
TMemoryUsageChange memoryUsage;
1170-
if (isSyncCompression) {
1171-
// The Lock is already held somewhere up the stack.
1169+
if (!isSyncCompression) {
1170+
std::lock_guard guard(Lock);
11721171
memoryUsage = OnCompressedImpl(std::move(block));
11731172
} else {
1174-
std::lock_guard guard(Lock);
11751173
memoryUsage = OnCompressedImpl(std::move(block));
11761174
}
1177-
SendGrpcMessages();
11781175
if (memoryUsage.NowOk && !memoryUsage.WasOk) {
11791176
EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
11801177
}
@@ -1190,7 +1187,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
11901187
(*Counters->BytesInflightCompressed) += block.Data.size();
11911188

11921189
PackedMessagesToSend.emplace(std::move(block));
1193-
FormGrpcMessagesImpl();
1190+
SendImpl();
11941191
return memoryUsage;
11951192
}
11961193

@@ -1307,7 +1304,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
13071304
}
13081305
CurrentBatch.Reset();
13091306
if (skipCompression) {
1310-
FormGrpcMessagesImpl();
1307+
SendImpl();
13111308
}
13121309
return size;
13131310
}
@@ -1371,16 +1368,7 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe
13711368
return GetTransactionId(*writeRequest) != GetTransactionId(OriginalMessagesToSend.front().Tx);
13721369
}
13731370

1374-
void TWriteSessionImpl::SendGrpcMessages() {
1375-
with_lock(ProcessorLock) {
1376-
TClientMessage message;
1377-
while (GrpcMessagesToSend.Dequeue(&message)) {
1378-
Processor->Write(std::move(message));
1379-
}
1380-
}
1381-
}
1382-
1383-
void TWriteSessionImpl::FormGrpcMessagesImpl() {
1371+
void TWriteSessionImpl::SendImpl() {
13841372
Y_ABORT_UNLESS(Lock.IsLocked());
13851373

13861374
// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
@@ -1451,7 +1439,7 @@ void TWriteSessionImpl::FormGrpcMessagesImpl() {
14511439
<< OriginalMessagesToSend.size() << " left), first sequence number is "
14521440
<< writeRequest->messages(0).seq_no()
14531441
);
1454-
GrpcMessagesToSend.Enqueue(std::move(clientMessage));
1442+
Processor->Write(std::move(clientMessage));
14551443
}
14561444
}
14571445

@@ -1513,10 +1501,8 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
15131501
return;
15141502
}
15151503
if (auto self = cbContext->LockShared()) {
1516-
with_lock(self->Lock) {
1517-
self->HandleWakeUpImpl();
1518-
}
1519-
self->SendGrpcMessages();
1504+
std::lock_guard guard(self->Lock);
1505+
self->HandleWakeUpImpl();
15201506
}
15211507
};
15221508
if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {

src/client/topic/impl/write_session_impl.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include <src/client/topic/impl/topic_impl.h>
66

77
#include <util/generic/buffer.h>
8-
#include <util/thread/lfqueue.h>
98

109

1110
namespace NYdb::NTopic {
@@ -391,8 +390,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
391390
uint64_t GetNextIdImpl(const std::optional<uint64_t>& seqNo);
392391
uint64_t GetSeqNoImpl(uint64_t id);
393392
uint64_t GetIdImpl(uint64_t seqNo);
394-
void FormGrpcMessagesImpl();
395-
void SendGrpcMessages();
393+
void SendImpl();
396394
void AbortImpl();
397395
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
398396
void CloseImpl(EStatus statusCode, const std::string& message);
@@ -453,9 +451,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
453451
std::queue<TOriginalMessage> SentOriginalMessages;
454452
std::queue<TBlock> SentPackedMessage;
455453

456-
TLockFreeQueue<TClientMessage> GrpcMessagesToSend;
457-
TAdaptiveLock ProcessorLock;
458-
459454
const size_t MaxBlockSize = std::numeric_limits<size_t>::max();
460455
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
461456
bool Connected = false;

0 commit comments

Comments
 (0)