Skip to content

Commit f54fa07

Browse files
The values of PartNo must be added (#13863)
1 parent 06b6cf3 commit f54fa07

File tree

1 file changed

+54
-0
lines changed

1 file changed

+54
-0
lines changed

src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2910,6 +2910,60 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks)
29102910

29112911
CheckTabletKeys("topic_A");
29122912
}
2913+
2914+
Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
2915+
{
2916+
// The test verifies the simultaneous execution of several transactions. There is a topic
2917+
// with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions.
2918+
// The size of the messages is random. Such that both large blobs in the body and small ones in
2919+
// the head of the partition are obtained. Message sizes are multiples of 500 KB. This way we
2920+
// will make sure that when committing transactions, the division into blocks is taken into account.
2921+
2922+
const size_t PARTITIONS_COUNT = 20;
2923+
const size_t TXS_COUNT = 100;
2924+
2925+
CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);
2926+
2927+
std::vector<NTable::TSession> sessions;
2928+
std::vector<NTable::TTransaction> transactions;
2929+
2930+
// We open TXS_COUNT transactions and write messages to the topic.
2931+
for (size_t i = 0; i < TXS_COUNT; ++i) {
2932+
sessions.push_back(CreateTableSession());
2933+
auto& session = sessions.back();
2934+
2935+
transactions.push_back(BeginTx(session));
2936+
auto& tx = transactions.back();
2937+
2938+
for (size_t j = 0; j < PARTITIONS_COUNT; ++j) {
2939+
TString sourceId = TEST_MESSAGE_GROUP_ID;
2940+
sourceId += "_";
2941+
sourceId += ToString(i);
2942+
sourceId += "_";
2943+
sourceId += ToString(j);
2944+
2945+
size_t count = RandomNumber<size_t>(20) + 3;
2946+
WriteToTopic("topic_A", sourceId, TString(512 * 1000 * count, 'x'), &tx, j);
2947+
2948+
WaitForAcks("topic_A", sourceId);
2949+
}
2950+
}
2951+
2952+
// We are doing an asynchronous commit of transactions. They will be executed simultaneously.
2953+
std::vector<NTable::TAsyncCommitTransactionResult> futures;
2954+
2955+
for (size_t i = 0; i < TXS_COUNT; ++i) {
2956+
futures.push_back(transactions[i].Commit());
2957+
}
2958+
2959+
// All transactions must be completed successfully.
2960+
for (size_t i = 0; i < TXS_COUNT; ++i) {
2961+
futures[i].Wait();
2962+
const auto& result = futures[i].GetValueSync();
2963+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2964+
}
2965+
}
2966+
29132967
}
29142968

29152969
}

0 commit comments

Comments
 (0)