@@ -978,7 +978,8 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
978
978
FirstTokenSent = true ;
979
979
}
980
980
// Kickstart send after session reestablishment
981
- SendImpl ();
981
+ FormGrpcMessagesImpl ();
982
+ SendGrpcMessages ();
982
983
break ;
983
984
}
984
985
case TServerMessage::kWriteResponse : {
@@ -1160,12 +1161,14 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
1160
1161
1161
1162
void TWriteSessionImpl::OnCompressed (TBlock&& block, bool isSyncCompression) {
1162
1163
TMemoryUsageChange memoryUsage;
1163
- if (! isSyncCompression) {
1164
- std::lock_guard guard ( Lock);
1164
+ if (isSyncCompression) {
1165
+ // The Lock is already held somewhere up the stack.
1165
1166
memoryUsage = OnCompressedImpl (std::move (block));
1166
1167
} else {
1168
+ std::lock_guard guard (Lock);
1167
1169
memoryUsage = OnCompressedImpl (std::move (block));
1168
1170
}
1171
+ SendGrpcMessages ();
1169
1172
if (memoryUsage.NowOk && !memoryUsage.WasOk ) {
1170
1173
EventsQueue->PushEvent (TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken ()});
1171
1174
}
@@ -1181,7 +1184,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
1181
1184
(*Counters->BytesInflightCompressed ) += block.Data .size ();
1182
1185
1183
1186
PackedMessagesToSend.emplace (std::move (block));
1184
- SendImpl ();
1187
+ FormGrpcMessagesImpl ();
1185
1188
return memoryUsage;
1186
1189
}
1187
1190
@@ -1298,7 +1301,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
1298
1301
}
1299
1302
CurrentBatch.Reset ();
1300
1303
if (skipCompression) {
1301
- SendImpl ();
1304
+ FormGrpcMessagesImpl ();
1302
1305
}
1303
1306
return size;
1304
1307
}
@@ -1362,7 +1365,16 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe
1362
1365
return GetTransactionId (*writeRequest) != GetTransactionId (OriginalMessagesToSend.front ().Tx );
1363
1366
}
1364
1367
1365
- void TWriteSessionImpl::SendImpl () {
1368
+ void TWriteSessionImpl::SendGrpcMessages () {
1369
+ with_lock (ProcessorLock) {
1370
+ TClientMessage message;
1371
+ while (GrpcMessagesToSend.Dequeue (&message)) {
1372
+ Processor->Write (std::move (message));
1373
+ }
1374
+ }
1375
+ }
1376
+
1377
+ void TWriteSessionImpl::FormGrpcMessagesImpl () {
1366
1378
Y_ABORT_UNLESS (Lock.IsLocked ());
1367
1379
1368
1380
// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
@@ -1433,7 +1445,7 @@ void TWriteSessionImpl::SendImpl() {
1433
1445
<< OriginalMessagesToSend.size () << " left), first sequence number is "
1434
1446
<< writeRequest->messages (0 ).seq_no ()
1435
1447
);
1436
- Processor-> Write (std::move (clientMessage));
1448
+ GrpcMessagesToSend. Enqueue (std::move (clientMessage));
1437
1449
}
1438
1450
}
1439
1451
@@ -1495,8 +1507,10 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
1495
1507
return ;
1496
1508
}
1497
1509
if (auto self = cbContext->LockShared ()) {
1498
- std::lock_guard guard (self->Lock );
1499
- self->HandleWakeUpImpl ();
1510
+ with_lock (self->Lock ) {
1511
+ self->HandleWakeUpImpl ();
1512
+ }
1513
+ self->SendGrpcMessages ();
1500
1514
}
1501
1515
};
1502
1516
if (TInstant::Now () - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {
0 commit comments