Skip to content

Commit 01f56bd

Browse files
authored
ydb_topic: schedule SendImpl from OnCompressedImpl instead of calling it directly (#8372)
1 parent 0b3e86d commit 01f56bd

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1168,7 +1168,17 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
11681168
(*Counters->BytesInflightCompressed) += block.Data.size();
11691169

11701170
PackedMessagesToSend.emplace(std::move(block));
1171-
SendImpl();
1171+
1172+
if (!SendImplScheduled.exchange(true)) {
1173+
CompressionExecutor->Post([cbContext = SelfContext]() {
1174+
if (auto self = cbContext->LockShared()) {
1175+
self->SendImplScheduled = false;
1176+
with_lock (self->Lock) {
1177+
self->SendImpl();
1178+
}
1179+
}
1180+
});
1181+
}
11721182
return memoryUsage;
11731183
}
11741184

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
450450
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
451451
bool Connected = false;
452452
bool Started = false;
453+
std::atomic<bool> SendImplScheduled = false;
453454
TAtomic Aborting = 0;
454455
bool SessionEstablished = false;
455456
ui32 PartitionId = 0;

ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,14 +455,15 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
455455

456456
UNIT_ASSERT(!futureWrite.HasValue());
457457
Cerr << ">>>TEST: future write has no value " << Endl;
458-
RunTasks(stepByStepExecutor, {0});
458+
RunTasks(stepByStepExecutor, {0}); // Run compression task.
459+
RunTasks(stepByStepExecutor, {1}); // Run send task.
459460
futureWrite.GetValueSync();
460461
UNIT_ASSERT(futureWrite.HasValue());
461462
Cerr << ">>>TEST: future write has value " << Endl;
462463

463464
UNIT_ASSERT(!futureRead.HasValue());
464465
Cerr << ">>>TEST: future read has no value " << Endl;
465-
RunTasks(stepByStepExecutor, {1});
466+
RunTasks(stepByStepExecutor, {2}); // Run decompression task.
466467
futureRead.GetValueSync();
467468
UNIT_ASSERT(futureRead.HasValue());
468469
Cerr << ">>>TEST: future read has value " << Endl;

0 commit comments

Comments
 (0)