Skip to content

Commit 48594d3

Browse files
qyryqGazizonoki
authored andcommitted
Moved commit "Use a separate lock for Processor->Write calls" from ydb repo
1 parent f055278 commit 48594d3

File tree

2 files changed

+29
-10
lines changed

2 files changed

+29
-10
lines changed

src/client/topic/impl/write_session_impl.cpp

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -978,7 +978,8 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
978978
FirstTokenSent = true;
979979
}
980980
// Kickstart send after session reestablishment
981-
SendImpl();
981+
FormGrpcMessagesImpl();
982+
SendGrpcMessages();
982983
break;
983984
}
984985
case TServerMessage::kWriteResponse: {
@@ -1160,12 +1161,14 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
11601161

11611162
void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
11621163
TMemoryUsageChange memoryUsage;
1163-
if (!isSyncCompression) {
1164-
std::lock_guard guard(Lock);
1164+
if (isSyncCompression) {
1165+
// The Lock is already held somewhere up the stack.
11651166
memoryUsage = OnCompressedImpl(std::move(block));
11661167
} else {
1168+
std::lock_guard guard(Lock);
11671169
memoryUsage = OnCompressedImpl(std::move(block));
11681170
}
1171+
SendGrpcMessages();
11691172
if (memoryUsage.NowOk && !memoryUsage.WasOk) {
11701173
EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
11711174
}
@@ -1181,7 +1184,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
11811184
(*Counters->BytesInflightCompressed) += block.Data.size();
11821185

11831186
PackedMessagesToSend.emplace(std::move(block));
1184-
SendImpl();
1187+
FormGrpcMessagesImpl();
11851188
return memoryUsage;
11861189
}
11871190

@@ -1298,7 +1301,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
12981301
}
12991302
CurrentBatch.Reset();
13001303
if (skipCompression) {
1301-
SendImpl();
1304+
FormGrpcMessagesImpl();
13021305
}
13031306
return size;
13041307
}
@@ -1362,7 +1365,16 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe
13621365
return GetTransactionId(*writeRequest) != GetTransactionId(OriginalMessagesToSend.front().Tx);
13631366
}
13641367

1365-
void TWriteSessionImpl::SendImpl() {
1368+
void TWriteSessionImpl::SendGrpcMessages() {
1369+
with_lock(ProcessorLock) {
1370+
TClientMessage message;
1371+
while (GrpcMessagesToSend.Dequeue(&message)) {
1372+
Processor->Write(std::move(message));
1373+
}
1374+
}
1375+
}
1376+
1377+
void TWriteSessionImpl::FormGrpcMessagesImpl() {
13661378
Y_ABORT_UNLESS(Lock.IsLocked());
13671379

13681380
// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
@@ -1433,7 +1445,7 @@ void TWriteSessionImpl::SendImpl() {
14331445
<< OriginalMessagesToSend.size() << " left), first sequence number is "
14341446
<< writeRequest->messages(0).seq_no()
14351447
);
1436-
Processor->Write(std::move(clientMessage));
1448+
GrpcMessagesToSend.Enqueue(std::move(clientMessage));
14371449
}
14381450
}
14391451

@@ -1495,8 +1507,10 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
14951507
return;
14961508
}
14971509
if (auto self = cbContext->LockShared()) {
1498-
std::lock_guard guard(self->Lock);
1499-
self->HandleWakeUpImpl();
1510+
with_lock(self->Lock) {
1511+
self->HandleWakeUpImpl();
1512+
}
1513+
self->SendGrpcMessages();
15001514
}
15011515
};
15021516
if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {

src/client/topic/impl/write_session_impl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <src/client/topic/impl/topic_impl.h>
66

77
#include <util/generic/buffer.h>
8+
#include <util/thread/lfqueue.h>
89

910

1011
namespace NYdb::NTopic {
@@ -390,7 +391,8 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
390391
uint64_t GetNextIdImpl(const std::optional<uint64_t>& seqNo);
391392
uint64_t GetSeqNoImpl(uint64_t id);
392393
uint64_t GetIdImpl(uint64_t seqNo);
393-
void SendImpl();
394+
void FormGrpcMessagesImpl();
395+
void SendGrpcMessages();
394396
void AbortImpl();
395397
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
396398
void CloseImpl(EStatus statusCode, const std::string& message);
@@ -451,6 +453,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
451453
std::queue<TOriginalMessage> SentOriginalMessages;
452454
std::queue<TBlock> SentPackedMessage;
453455

456+
TLockFreeQueue<TClientMessage> GrpcMessagesToSend;
457+
TAdaptiveLock ProcessorLock;
458+
454459
const size_t MaxBlockSize = std::numeric_limits<size_t>::max();
455460
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
456461
bool Connected = false;

0 commit comments

Comments
 (0)