Skip to content

Commit 5366a11

Browse files
Alek5andr-KotovGazizonoki
authored andcommitted
Workaround for batch processing of transactions (#14874)
1 parent d328b7b commit 5366a11

File tree

1 file changed

+151
-3
lines changed

1 file changed

+151
-3
lines changed

src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 151 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ class TFixture : public NUnitTest::TBaseFixture {
9494
TDuration stabilizationWindow,
9595
ui64 downUtilizationPercent,
9696
ui64 upUtilizationPercent);
97+
void SetPartitionWriteSpeed(const std::string& topicPath,
98+
size_t bytesPerSeconds);
9799

98100
void WriteToTopicWithInvalidTxId(bool invalidTxId);
99101

@@ -511,6 +513,18 @@ void TFixture::AlterAutoPartitioning(const TString& topicPath,
511513
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
512514
}
513515

516+
void TFixture::SetPartitionWriteSpeed(const std::string& topicPath,
517+
size_t bytesPerSeconds)
518+
{
519+
NTopic::TTopicClient client(GetDriver());
520+
NTopic::TAlterTopicSettings settings;
521+
522+
settings.SetPartitionWriteSpeedBytesPerSecond(bytesPerSeconds);
523+
524+
auto result = client.AlterTopic(topicPath, settings).GetValueSync();
525+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
526+
}
527+
514528
TTopicDescription TFixture::DescribeTopic(const TString& path)
515529
{
516530
return Setup->DescribeTopic(path);
@@ -3005,9 +3019,6 @@ Y_UNIT_TEST_F(Sinks_Olap_WriteToTopicAndTable_3, TFixtureSinks)
30053019

30063020
Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
30073021
{
3008-
// Consumes a lot of memory. Temporarily disabled
3009-
return;
3010-
30113022
// The test verifies the simultaneous execution of several transactions. There is a topic
30123023
// with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions.
30133024
// The size of the messages is random. Such that both large blobs in the body and small ones in
@@ -3019,6 +3030,8 @@ Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
30193030

30203031
CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);
30213032

3033+
SetPartitionWriteSpeed("topic_A", 50'000'000);
3034+
30223035
std::vector<NTable::TSession> sessions;
30233036
std::vector<NTable::TTransaction> transactions;
30243037

@@ -3059,6 +3072,141 @@ Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
30593072
}
30603073
}
30613074

3075+
Y_UNIT_TEST_F(Write_Only_Big_Messages_In_Wide_Transactions, TFixture)
3076+
{
3077+
// The test verifies the simultaneous execution of several transactions. There is a topic `topic_A` and
3078+
// it contains a `PARTITIONS_COUNT' of partitions. In each transaction, the test writes to all partitions.
3079+
// The size of the messages is chosen so that only large blobs are recorded in the transaction and there
3080+
// are no records in the head. Thus, we verify that transaction bundling is working correctly.
3081+
3082+
const size_t PARTITIONS_COUNT = 20;
3083+
const size_t TXS_COUNT = 100;
3084+
3085+
CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);
3086+
3087+
SetPartitionWriteSpeed("topic_A", 50'000'000);
3088+
3089+
std::vector<NTable::TSession> sessions;
3090+
std::vector<NTable::TTransaction> transactions;
3091+
3092+
// We open TXS_COUNT transactions and write messages to the topic.
3093+
for (size_t i = 0; i < TXS_COUNT; ++i) {
3094+
sessions.push_back(CreateTableSession());
3095+
auto& session = sessions.back();
3096+
3097+
transactions.push_back(BeginTx(session));
3098+
auto& tx = transactions.back();
3099+
3100+
for (size_t j = 0; j < PARTITIONS_COUNT; ++j) {
3101+
TString sourceId = TEST_MESSAGE_GROUP_ID;
3102+
sourceId += "_";
3103+
sourceId += ToString(i);
3104+
sourceId += "_";
3105+
sourceId += ToString(j);
3106+
3107+
WriteToTopic("topic_A", sourceId, TString(6'500'000, 'x'), &tx, j);
3108+
3109+
WaitForAcks("topic_A", sourceId);
3110+
}
3111+
}
3112+
3113+
// We are doing an asynchronous commit of transactions. They will be executed simultaneously.
3114+
std::vector<NTable::TAsyncCommitTransactionResult> futures;
3115+
3116+
for (size_t i = 0; i < TXS_COUNT; ++i) {
3117+
futures.push_back(transactions[i].Commit());
3118+
}
3119+
3120+
// All transactions must be completed successfully.
3121+
for (size_t i = 0; i < TXS_COUNT; ++i) {
3122+
futures[i].Wait();
3123+
const auto& result = futures[i].GetValueSync();
3124+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
3125+
}
3126+
}
3127+
3128+
Y_UNIT_TEST_F(Transactions_Conflict_On_SeqNo, TFixture)
3129+
{
3130+
const ui32 PARTITIONS_COUNT = 20;
3131+
const size_t TXS_COUNT = 100;
3132+
3133+
CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);
3134+
3135+
SetPartitionWriteSpeed("topic_A", 50'000'000);
3136+
3137+
auto tableSession = CreateTableSession();
3138+
std::vector<std::shared_ptr<NTopic::ISimpleBlockingWriteSession>> topicWriteSessions;
3139+
3140+
for (ui32 i = 0; i < PARTITIONS_COUNT; ++i) {
3141+
TString sourceId = TEST_MESSAGE_GROUP_ID;
3142+
sourceId += "_";
3143+
sourceId += ToString(i);
3144+
3145+
NTopic::TTopicClient client(GetDriver());
3146+
NTopic::TWriteSessionSettings options;
3147+
options.Path("topic_A");
3148+
options.ProducerId(sourceId);
3149+
options.MessageGroupId(sourceId);
3150+
options.PartitionId(i);
3151+
options.Codec(ECodec::RAW);
3152+
3153+
auto session = client.CreateSimpleBlockingWriteSession(options);
3154+
3155+
topicWriteSessions.push_back(std::move(session));
3156+
}
3157+
3158+
std::vector<NTable::TSession> sessions;
3159+
std::vector<NTable::TTransaction> transactions;
3160+
3161+
for (size_t i = 0; i < TXS_COUNT; ++i) {
3162+
sessions.push_back(CreateTableSession());
3163+
auto& session = sessions.back();
3164+
3165+
transactions.push_back(BeginTx(session));
3166+
auto& tx = transactions.back();
3167+
3168+
for (size_t j = 0; j < PARTITIONS_COUNT; ++j) {
3169+
TString sourceId = TEST_MESSAGE_GROUP_ID;
3170+
sourceId += "_";
3171+
sourceId += ToString(j);
3172+
3173+
for (size_t k = 0, count = RandomNumber<size_t>(20) + 1; k < count; ++k) {
3174+
const std::string data(RandomNumber<size_t>(1'000) + 100, 'x');
3175+
NTopic::TWriteMessage params(data);
3176+
params.Tx(tx);
3177+
3178+
topicWriteSessions[j]->Write(std::move(params));
3179+
}
3180+
}
3181+
}
3182+
3183+
std::vector<NTable::TAsyncCommitTransactionResult> futures;
3184+
3185+
for (size_t i = 0; i < TXS_COUNT; ++i) {
3186+
futures.push_back(transactions[i].Commit());
3187+
}
3188+
3189+
// Some transactions should end with the error `ABORTED`
3190+
size_t successCount = 0;
3191+
3192+
for (size_t i = 0; i < TXS_COUNT; ++i) {
3193+
futures[i].Wait();
3194+
const auto& result = futures[i].GetValueSync();
3195+
switch (result.GetStatus()) {
3196+
case EStatus::SUCCESS:
3197+
++successCount;
3198+
break;
3199+
case EStatus::ABORTED:
3200+
break;
3201+
default:
3202+
UNIT_FAIL("unexpected status: " << static_cast<const NYdb::TStatus&>(result));
3203+
break;
3204+
}
3205+
}
3206+
3207+
UNIT_ASSERT_VALUES_UNEQUAL(successCount, TXS_COUNT);
3208+
}
3209+
30623210
}
30633211

30643212
}

0 commit comments

Comments
 (0)