Skip to content

Commit e8eb447

Browse files
qyryqGazizonoki
authored andcommitted
Moved "ydb_topic: split write messages on different codecs" commit from ydb repo
1 parent 573fc26 commit e8eb447

File tree

1 file changed

+19
-13
lines changed

1 file changed

+19
-13
lines changed

src/client/topic/impl/write_session_impl.cpp

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,6 +1212,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
12121212
TBlock block{};
12131213
for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) {
12141214
auto& currMessage = CurrentBatch.Messages[i];
1215+
1216+
// If MaxBlockSize or MaxBlockMessageCount values are ever changed from infinity and 1 correspondingly,
1217+
// create a new block, if the existing one is non-empty AND (adding another message will overflow it OR
1218+
// its codec is different from the codec of the next message).
1219+
12151220
auto id = currMessage.Id;
12161221
auto createTs = currMessage.CreatedAt;
12171222

@@ -1303,22 +1308,26 @@ void TWriteSessionImpl::UpdateTokenIfNeededImpl() {
13031308
void TWriteSessionImpl::SendImpl() {
13041309
Y_ABORT_UNLESS(Lock.IsLocked());
13051310

1306-
// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB
1307-
while(IsReadyToSendNextImpl()) {
1311+
// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
1312+
while (IsReadyToSendNextImpl()) {
13081313
TClientMessage clientMessage;
13091314
auto* writeRequest = clientMessage.mutable_write_request();
13101315

1311-
// Sent blocks while we can without messages reordering
1316+
ui32 prevCodec = 0;
1317+
// Send blocks while we can without messages reordering.
13121318
while (IsReadyToSendNextImpl() && clientMessage.ByteSizeLong() < GetMaxGrpcMessageSize()) {
13131319
const auto& block = PackedMessagesToSend.top();
13141320
Y_ABORT_UNLESS(block.Valid);
1321+
if (writeRequest->messages_size() > 0 && prevCodec != block.CodecID) {
1322+
break;
1323+
}
1324+
prevCodec = block.CodecID;
13151325
writeRequest->set_codec(static_cast<i32>(block.CodecID));
13161326
Y_ABORT_UNLESS(block.MessageCount == 1);
13171327
for (size_t i = 0; i != block.MessageCount; ++i) {
13181328
Y_ABORT_UNLESS(!OriginalMessagesToSend.empty());
13191329

13201330
auto& message = OriginalMessagesToSend.front();
1321-
13221331
auto* msgData = writeRequest->add_messages();
13231332

13241333
if (message.Tx) {
@@ -1329,27 +1338,24 @@ void TWriteSessionImpl::SendImpl() {
13291338
msgData->set_seq_no(GetSeqNoImpl(message.Id));
13301339
*msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds());
13311340

1332-
if (!message.MessageMeta.empty()) {
1333-
for (auto& [k, v] : message.MessageMeta) {
1334-
auto* pair = msgData->add_metadata_items();
1335-
pair->set_key(k);
1336-
pair->set_value(v);
1337-
}
1341+
for (auto& [k, v] : message.MessageMeta) {
1342+
auto* pair = msgData->add_metadata_items();
1343+
pair->set_key(k);
1344+
pair->set_value(v);
13381345
}
13391346
SentOriginalMessages.emplace(std::move(message));
13401347
OriginalMessagesToSend.pop();
13411348

13421349
msgData->set_uncompressed_size(block.OriginalSize);
1343-
if (block.Compressed)
1350+
if (block.Compressed) {
13441351
msgData->set_data(block.Data.data(), block.Data.size());
1345-
else {
1352+
} else {
13461353
for (auto& buffer: block.OriginalDataRefs) {
13471354
msgData->set_data(buffer.data(), buffer.size());
13481355
}
13491356
}
13501357
}
13511358

1352-
13531359
TBlock moveBlock;
13541360
moveBlock.Move(block);
13551361
SentPackedMessage.emplace(std::move(moveBlock));

0 commit comments

Comments
 (0)