@@ -192,9 +192,9 @@ class TFixture : public NUnitTest::TBaseFixture {
192
192
193
193
NTable::TDataQueryResult ExecuteDataQuery (NTable::TSession session, const TString& query, const NTable::TTxControl& control);
194
194
195
- void Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
196
- const TString& consumerName,
197
- size_t count);
195
+ TVector<TString> Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
196
+ const TString& consumerName,
197
+ size_t count);
198
198
199
199
struct TAvgWriteBytes {
200
200
ui64 PerSec = 0 ;
@@ -1069,16 +1069,22 @@ void TFixture::RestartLongTxService()
1069
1069
}
1070
1070
}
1071
1071
1072
- void TFixture::Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
1073
- const TString& consumerName,
1074
- size_t limit)
1072
+ TVector<TString> TFixture::Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
1073
+ const TString& consumerName,
1074
+ size_t limit)
1075
1075
{
1076
- size_t count = 0 ;
1077
- while (count < limit) {
1076
+ TVector<TString> result;
1077
+
1078
+ while (result.size () < limit) {
1078
1079
auto messages = ReadFromTopic (topicPath, consumerName, TDuration::Seconds (2 ));
1079
- count += messages.size ();
1080
+ for (auto & m : messages) {
1081
+ result.push_back (std::move (m));
1082
+ }
1080
1083
}
1081
- UNIT_ASSERT_VALUES_EQUAL (count, limit);
1084
+
1085
+ UNIT_ASSERT_VALUES_EQUAL (result.size (), limit);
1086
+
1087
+ return result;
1082
1088
}
1083
1089
1084
1090
auto TFixture::GetAvgWriteBytes (const TString& topicName,
@@ -1141,15 +1147,13 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture)
1141
1147
CommitTx (tx, EStatus::SUCCESS);
1142
1148
1143
1149
{
1144
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1145
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 4 );
1150
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 4 );
1146
1151
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
1147
1152
UNIT_ASSERT_VALUES_EQUAL (messages[3 ], " message #4" );
1148
1153
}
1149
1154
1150
1155
{
1151
- auto messages = ReadFromTopic (" topic_B" , TEST_CONSUMER, TDuration::Seconds (2 ));
1152
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 5 );
1156
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, 5 );
1153
1157
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #5" );
1154
1158
UNIT_ASSERT_VALUES_EQUAL (messages[4 ], " message #9" );
1155
1159
}
@@ -1190,15 +1194,13 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_2, TFixture)
1190
1194
CommitTx (tx, EStatus::SUCCESS);
1191
1195
1192
1196
{
1193
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1194
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 4 );
1197
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 4 );
1195
1198
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
1196
1199
UNIT_ASSERT_VALUES_EQUAL (messages[3 ], " message #4" );
1197
1200
}
1198
1201
1199
1202
{
1200
- auto messages = ReadFromTopic (" topic_B" , TEST_CONSUMER, TDuration::Seconds (2 ));
1201
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 3 );
1203
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, 3 );
1202
1204
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #7" );
1203
1205
UNIT_ASSERT_VALUES_EQUAL (messages[2 ], " message #9" );
1204
1206
}
@@ -1299,15 +1301,13 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_5, TFixture)
1299
1301
}
1300
1302
1301
1303
{
1302
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1303
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1304
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
1304
1305
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
1305
1306
UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #3" );
1306
1307
}
1307
1308
1308
1309
{
1309
- auto messages = ReadFromTopic (" topic_B" , TEST_CONSUMER, TDuration::Seconds (2 ));
1310
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1310
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, 2 );
1311
1311
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #2" );
1312
1312
UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #4" );
1313
1313
}
@@ -1331,8 +1331,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_6, TFixture)
1331
1331
CommitTx (tx, EStatus::SUCCESS);
1332
1332
1333
1333
{
1334
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1335
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1334
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
1336
1335
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
1337
1336
UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #2" );
1338
1337
}
@@ -1357,17 +1356,15 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture)
1357
1356
WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID_1, " message #6" , &tx);
1358
1357
1359
1358
{
1360
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1361
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1359
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
1362
1360
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #3" );
1363
1361
UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #4" );
1364
1362
}
1365
1363
1366
1364
CommitTx (tx, EStatus::SUCCESS);
1367
1365
1368
1366
{
1369
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1370
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 4 );
1367
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 4 );
1371
1368
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
1372
1369
UNIT_ASSERT_VALUES_EQUAL (messages[3 ], " message #6" );
1373
1370
}
@@ -1456,8 +1453,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture)
1456
1453
}
1457
1454
1458
1455
{
1459
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1460
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1456
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
1461
1457
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
1462
1458
UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #2" );
1463
1459
}
@@ -1753,8 +1749,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_15, TFixture)
1753
1749
1754
1750
CommitTx (tx, EStatus::SUCCESS);
1755
1751
1756
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1757
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1752
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
1758
1753
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
1759
1754
UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #2" );
1760
1755
}
@@ -1773,8 +1768,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_16, TFixture)
1773
1768
1774
1769
CommitTx (tx, EStatus::SUCCESS);
1775
1770
1776
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1777
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
1771
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 2 );
1778
1772
UNIT_ASSERT_VALUES_EQUAL (messages[0 ], " message #1" );
1779
1773
UNIT_ASSERT_VALUES_EQUAL (messages[1 ], " message #2" );
1780
1774
}
@@ -1800,8 +1794,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_17, TFixture)
1800
1794
1801
1795
// RestartPQTablet("topic_A", 0);
1802
1796
1803
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
1804
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 8 );
1797
+ auto messages = Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 8 );
1805
1798
UNIT_ASSERT_VALUES_EQUAL (messages[0 ].size (), 22'000'000 );
1806
1799
UNIT_ASSERT_VALUES_EQUAL (messages[1 ].size (), 100 );
1807
1800
UNIT_ASSERT_VALUES_EQUAL (messages[2 ].size (), 200 );
@@ -2033,8 +2026,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_25, TFixture)
2033
2026
2034
2027
CommitTx (tx, EStatus::SUCCESS);
2035
2028
2036
- messages = ReadFromTopic (" topic_B" , TEST_CONSUMER, TDuration::Seconds (2 ));
2037
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 3 );
2029
+ Read_Exactly_N_Messages_From_Topic (" topic_B" , TEST_CONSUMER, 3 );
2038
2030
}
2039
2031
2040
2032
Y_UNIT_TEST_F (WriteToTopic_Demo_26, TFixture)
@@ -2219,8 +2211,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture)
2219
2211
2220
2212
CommitTx (tx, EStatus::SUCCESS);
2221
2213
2222
- auto messages = ReadFromTopic (" topic_A" , " consumer" , TDuration::Seconds (2 ));
2223
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 2 );
2214
+ Read_Exactly_N_Messages_From_Topic (" topic_A" , " consumer" , 2 );
2224
2215
}
2225
2216
2226
2217
Y_UNIT_TEST_F (ReadRuleGeneration, TFixture)
@@ -2240,8 +2231,7 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
2240
2231
AddConsumer (TString{TEST_TOPIC}, {" consumer-1" });
2241
2232
2242
2233
// We read messages from the topic and committed offsets
2243
- auto messages = ReadFromTopic (TString{TEST_TOPIC}, " consumer-1" , TDuration::Seconds (2 ));
2244
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 3 );
2234
+ Read_Exactly_N_Messages_From_Topic (TString{TEST_TOPIC}, " consumer-1" , 3 );
2245
2235
CloseTopicReadSession (TString{TEST_TOPIC}, " consumer-1" );
2246
2236
2247
2237
// And then the Logbroker team turned on the feature flag
@@ -2254,8 +2244,7 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
2254
2244
AddConsumer (TString{TEST_TOPIC}, {" consumer-2" });
2255
2245
2256
2246
// And they wanted to continue reading their messages
2257
- messages = ReadFromTopic (TString{TEST_TOPIC}, " consumer-1" , TDuration::Seconds (2 ));
2258
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 1 );
2247
+ Read_Exactly_N_Messages_From_Topic (TString{TEST_TOPIC}, " consumer-1" , 1 );
2259
2248
}
2260
2249
2261
2250
Y_UNIT_TEST_F (WriteToTopic_Demo_40, TFixture)
0 commit comments