@@ -111,7 +111,9 @@ class TFixture : public NUnitTest::TBaseFixture {
111
111
void CreateTopic (const TString& path = TString{TEST_TOPIC},
112
112
const TString& consumer = TEST_CONSUMER,
113
113
size_t partitionCount = 1 ,
114
- std::optional<size_t > maxPartitionCount = std::nullopt);
114
+ std::optional<size_t > maxPartitionCount = std::nullopt,
115
+ const TDuration retention = TDuration::Hours(1 ),
116
+ bool important = false);
115
117
TTopicDescription DescribeTopic (const TString& path);
116
118
117
119
void AddConsumer (const TString& topicPath, const TVector<TString>& consumers);
@@ -862,10 +864,12 @@ void TFixture::WriteMessages(const TVector<TString>& messages,
862
864
void TFixture::CreateTopic (const TString& path,
863
865
const TString& consumer,
864
866
size_t partitionCount,
865
- std::optional<size_t > maxPartitionCount)
867
+ std::optional<size_t > maxPartitionCount,
868
+ const TDuration retention,
869
+ bool important)
866
870
867
871
{
868
- Setup->CreateTopic (path, consumer, partitionCount, maxPartitionCount);
872
+ Setup->CreateTopic (path, consumer, partitionCount, maxPartitionCount, retention, important );
869
873
}
870
874
871
875
void TFixture::AddConsumer (const TString& topicPath,
@@ -4429,6 +4433,56 @@ Y_UNIT_TEST_F(The_Transaction_Starts_On_One_Version_And_Ends_On_The_Other, TFixt
4429
4433
RestartPQTablet (" topic_A" , 1 );
4430
4434
}
4431
4435
4436
+ Y_UNIT_TEST_F (TestRetentionOnLongTxAndBigMessages, TFixtureQuery)
4437
+ {
4438
+ // TODO uncomment
4439
+ return ;
4440
+
4441
+ auto bigMessage = []() {
4442
+ TStringBuilder sb;
4443
+ sb.reserve (10_MB);
4444
+ for (size_t i = 0 ; i < sb.capacity (); ++i) {
4445
+ sb << RandomNumber<char >();
4446
+ }
4447
+ return sb;
4448
+ };
4449
+
4450
+ TString msg = bigMessage ();
4451
+
4452
+ CreateTopic (" topic_A" , TEST_CONSUMER, 1 , 1 , TDuration::Seconds (1 ), true );
4453
+
4454
+ auto session = CreateSession ();
4455
+ auto tx0 = session->BeginTx ();
4456
+ auto tx1 = session->BeginTx ();
4457
+
4458
+ WriteToTopic (" topic_A" , " grp-0" , msg, tx0.get ());
4459
+ WriteToTopic (" topic_A" , " grp-1" , msg, tx1.get ());
4460
+
4461
+ Sleep (TDuration::Seconds (3 ));
4462
+
4463
+ WriteToTopic (" topic_A" , " grp-0" , " short-msg" , tx0.get ());
4464
+ WriteToTopic (" topic_A" , " grp-1" , " short-msg" , tx1.get ());
4465
+
4466
+ WriteToTopic (" topic_A" , " grp-0" , msg, tx0.get ());
4467
+ WriteToTopic (" topic_A" , " grp-1" , msg, tx1.get ());
4468
+
4469
+ Sleep (TDuration::Seconds (3 ));
4470
+
4471
+ WriteToTopic (" topic_A" , " grp-0" , msg, tx0.get ());
4472
+ WriteToTopic (" topic_A" , " grp-1" , msg, tx1.get ());
4473
+
4474
+ Sleep (TDuration::Seconds (3 ));
4475
+
4476
+ session->CommitTx (*tx0, EStatus::SUCCESS);
4477
+ session->CommitTx (*tx1, EStatus::SUCCESS);
4478
+
4479
+ // RestartPQTablet("topic_A", 0);
4480
+
4481
+ auto read = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
4482
+ UNIT_ASSERT (read.size () > 0 );
4483
+ UNIT_ASSERT_EQUAL (msg, read[0 ]);
4484
+ }
4485
+
4432
4486
}
4433
4487
4434
4488
}
0 commit comments