@@ -984,12 +984,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
984
984
UNIT_ASSERT (resp.server_message_case () == Topic::StreamReadMessage::FromServer::kCommitOffsetResponse );
985
985
}
986
986
987
- void DoRead (ui64 assignId, ui64& nextReadId, ui32& currTotalMessages, const ui32 messageLimit ) {
987
+ void DoRead (ui64 assignId, ui64& nextReadId, ui32& currTotalMessages, const ui32 messageCountMin, const ui32 messageCountMax = 0 ) {
988
988
// Get DirectReadResponse messages, send DirectReadAck messages.
989
989
990
990
auto endTime = TInstant::Now () + TDuration::Seconds (10 );
991
- while (currTotalMessages < messageLimit && endTime > TInstant::Now ()) {
992
- Cerr << " Wait for direct read id: " << nextReadId << " , currently have " << currTotalMessages << " messages, limit is " << messageLimit << Endl;
991
+ while (currTotalMessages < messageCountMin && endTime > TInstant::Now ()) {
992
+ Cerr << " Wait for direct read id: " << nextReadId << " , currently have " << currTotalMessages << " messages, expected count is " << messageCountMin << Endl;
993
993
994
994
Ydb::Topic::StreamDirectReadMessage::FromServer resp;
995
995
UNIT_ASSERT (DirectStream->Read (&resp));
@@ -1015,7 +1015,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
1015
1015
}
1016
1016
nextReadId++;
1017
1017
}
1018
- UNIT_ASSERT_VALUES_EQUAL (currTotalMessages, messageLimit);
1018
+ if (messageCountMax) {
1019
+ UNIT_ASSERT (currTotalMessages >= messageCountMin);
1020
+ UNIT_ASSERT (currTotalMessages <= messageCountMax);
1021
+ } else {
1022
+ UNIT_ASSERT_VALUES_EQUAL (currTotalMessages, messageCountMin);
1023
+ }
1019
1024
}
1020
1025
1021
1026
void InitDirectSession (
@@ -1146,7 +1151,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
1146
1151
UNIT_ASSERT_VALUES_EQUAL (cachedData->Data .begin ()->second .Reads .size (), 0 );
1147
1152
}
1148
1153
1149
- /*
1150
1154
Y_UNIT_TEST (DirectReadNotCached) {
1151
1155
TPersQueueV1TestServer server{{.CheckACL =true , .NodeCount =1 }};
1152
1156
SET_LOCALS;
@@ -1165,8 +1169,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
1165
1169
setup.DoWrite (pqClient->GetDriver (), " acc/topic1" , 1_MB, 50 );
1166
1170
1167
1171
Cerr << " First read\n " ;
1168
- setup.DoRead(assignId, nextReadId, totalMsg, 42);
1169
- setup.DoRead(assignId, nextReadId, totalMsg, 42);
1172
+ setup.DoRead (assignId, nextReadId, totalMsg, 40 , 48 );
1170
1173
1171
1174
Topic::StreamReadMessage::FromClient req;
1172
1175
req.mutable_read_request ()->set_bytes_size (50_MB);
@@ -1182,7 +1185,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
1182
1185
UNIT_ASSERT_VALUES_EQUAL (cachedData->Data .begin ()->second .StagedReads .size (), 0 );
1183
1186
UNIT_ASSERT_VALUES_EQUAL (cachedData->Data .begin ()->second .Reads .size (), 0 );
1184
1187
}
1185
- */
1186
1188
1187
1189
Y_UNIT_TEST (DirectReadBadCases) {
1188
1190
TPersQueueV1TestServer server{{.CheckACL =true , .NodeCount =1 }};
@@ -7741,4 +7743,4 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
7741
7743
}
7742
7744
7743
7745
}
7744
- }
7746
+ }
0 commit comments