Skip to content

Commit de42a46

Browse files
The value of the WriteInflightSize in the main partition (#8116) (#8142)
1 parent f68af91 commit de42a46

File tree

3 files changed

+32
-0
lines changed

3 files changed

+32
-0
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2229,6 +2229,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
22292229
}, std::nullopt};
22302230
msg.Internal = true;
22312231

2232+
WriteInflightSize += msg.Msg.Data.size();
22322233
ExecRequest(msg, *Parameters, PersistRequest.Get());
22332234

22342235
auto& info = TxSourceIdForPostPersist[blob.SourceId];

ydb/core/persqueue/partition_write.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,10 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
11221122
auto& sourceIdBatch = parameters.SourceIdBatch;
11231123
auto sourceId = sourceIdBatch.GetSource(p.Msg.SourceId);
11241124

1125+
Y_DEBUG_ABORT_UNLESS(WriteInflightSize >= p.Msg.Data.size(),
1126+
"PQ %" PRIu64 ", Partition {%" PRIu32 ", %" PRIu32 "}, WriteInflightSize=%" PRIu64 ", p.Msg.Data.size=%" PRISZT,
1127+
TabletID, Partition.OriginalPartitionId, Partition.InternalPartitionId,
1128+
WriteInflightSize, p.Msg.Data.size());
11251129
WriteInflightSize -= p.Msg.Data.size();
11261130

11271131
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp);
@@ -1540,6 +1544,10 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue&
15401544

15411545
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
15421546
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size());
1547+
Y_DEBUG_ABORT_UNLESS(WriteInflightSize >= msg.Data.size(),
1548+
"PQ %" PRIu64 ", Partition {%" PRIu32 ", %" PRIu32 "}, WriteInflightSize=%" PRIu64 ", msg.Data.size=%" PRISZT,
1549+
TabletID, Partition.OriginalPartitionId, Partition.InternalPartitionId,
1550+
WriteInflightSize, msg.Data.size());
15431551
WriteInflightSize -= msg.Data.size();
15441552
}
15451553

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1898,6 +1898,29 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
18981898
}
18991899
}
19001900

1901+
Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
1902+
{
1903+
// The test verifies that the `WriteInflightSize` is correctly considered for the main partition.
1904+
// Writing to the service partition does not change the `WriteInflightSize` of the main one.
1905+
CreateTopic("topic_A", TEST_CONSUMER);
1906+
1907+
NTable::TSession tableSession = CreateTableSession();
1908+
NTable::TTransaction tx = BeginTx(tableSession);
1909+
1910+
TString message(16'000, 'a');
1911+
1912+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, TString(16'000, 'a'), &tx, 0);
1913+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);
1914+
1915+
CommitTx(tx, EStatus::SUCCESS);
1916+
1917+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, TString(20'000, 'b'), nullptr, 0);
1918+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2);
1919+
1920+
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, 0);
1921+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
1922+
}
1923+
19011924
}
19021925

19031926
}

0 commit comments

Comments
 (0)