@@ -181,6 +181,10 @@ class TFixture : public NUnitTest::TBaseFixture {
181
181
182
182
NTable::TDataQueryResult ExecuteDataQuery (NTable::TSession session, const TString& query, const NTable::TTxControl& control);
183
183
184
+ void Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
185
+ const TString& consumerName,
186
+ size_t count);
187
+
184
188
private:
185
189
template <class E >
186
190
E ReadEvent (TTopicReadSessionPtr reader, NTable::TTransaction& tx);
@@ -531,10 +535,13 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture)
531
535
Y_UNIT_TEST_F (Offsets_Cannot_Be_Promoted_When_Reading_In_A_Transaction, TFixture)
532
536
{
533
537
WriteMessage (" message" );
538
+
534
539
auto session = CreateTableSession ();
535
540
auto tx = BeginTx (session);
541
+
536
542
auto reader = CreateReader ();
537
543
StartPartitionSession (reader, tx, 0 );
544
+
538
545
UNIT_ASSERT_EXCEPTION (ReadMessage (reader, {.Tx = tx, .CommitOffsets = true }), yexception);
539
546
}
540
547
@@ -1002,6 +1009,18 @@ void TFixture::RestartLongTxService()
1002
1009
}
1003
1010
}
1004
1011
1012
+ void TFixture::Read_Exactly_N_Messages_From_Topic (const TString& topicPath,
1013
+ const TString& consumerName,
1014
+ size_t limit)
1015
+ {
1016
+ size_t count = 0 ;
1017
+ while (count < limit) {
1018
+ auto messages = ReadFromTopic (topicPath, consumerName, TDuration::Seconds (2 ));
1019
+ count += messages.size ();
1020
+ }
1021
+ UNIT_ASSERT_VALUES_EQUAL (count, limit);
1022
+ }
1023
+
1005
1024
Y_UNIT_TEST_F (WriteToTopic_Demo_1, TFixture)
1006
1025
{
1007
1026
CreateTopic (" topic_A" );
@@ -2166,8 +2185,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_40, TFixture)
2166
2185
2167
2186
CommitTx (tx, EStatus::SUCCESS);
2168
2187
2169
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (60 ));
2170
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 100 );
2188
+ Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 100 );
2171
2189
}
2172
2190
2173
2191
Y_UNIT_TEST_F (WriteToTopic_Demo_41, TFixture)
@@ -2202,8 +2220,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_42, TFixture)
2202
2220
2203
2221
CommitTx (tx, EStatus::SUCCESS);
2204
2222
2205
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (2 ));
2206
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 100 );
2223
+ Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 100 );
2207
2224
}
2208
2225
2209
2226
Y_UNIT_TEST_F (WriteToTopic_Demo_43, TFixture)
@@ -2221,8 +2238,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_43, TFixture)
2221
2238
2222
2239
ExecuteDataQuery (tableSession, " SELECT 1" , NTable::TTxControl::Tx (tx).CommitTx (true ));
2223
2240
2224
- auto messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (60 ));
2225
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 100 );
2241
+ Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 100 );
2226
2242
}
2227
2243
2228
2244
Y_UNIT_TEST_F (WriteToTopic_Demo_44, TFixture)
@@ -2246,8 +2262,7 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_44, TFixture)
2246
2262
2247
2263
ExecuteDataQuery (tableSession, " SELECT 2" , NTable::TTxControl::Tx (tx).CommitTx (true ));
2248
2264
2249
- messages = ReadFromTopic (" topic_A" , TEST_CONSUMER, TDuration::Seconds (60 ));
2250
- UNIT_ASSERT_VALUES_EQUAL (messages.size (), 100 );
2265
+ Read_Exactly_N_Messages_From_Topic (" topic_A" , TEST_CONSUMER, 100 );
2251
2266
}
2252
2267
2253
2268
}
0 commit comments