@@ -443,6 +443,22 @@ void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32
443
443
444
444
}
445
445
446
+ void AlterTopic (NYdb::NTopic::TTopicClient& pqClient, TString& topicName, std::vector<TString> consumers) {
447
+ auto topicSettings = NYdb::NTopic::TAlterTopicSettings ();
448
+
449
+ for (auto & consumer : consumers) {
450
+ topicSettings.BeginAddConsumer (consumer).EndAddConsumer ();
451
+ }
452
+
453
+ auto result = pqClient
454
+ .AlterTopic (topicName, topicSettings)
455
+ .ExtractValueSync ();
456
+
457
+ UNIT_ASSERT_VALUES_EQUAL (result.IsTransportError (), false );
458
+ UNIT_ASSERT_VALUES_EQUAL (result.GetStatus (), EStatus::SUCCESS);
459
+
460
+ }
461
+
446
462
TConsumerProtocolAssignment GetAssignments (NKafka::TSyncGroupResponseData::AssignmentMeta::Type metadata) {
447
463
TKafkaVersion version = *(TKafkaVersion*)(metadata.value ().data () + sizeof (TKafkaVersion));
448
464
TBuffer buffer (metadata.value ().data () + sizeof (TKafkaVersion), metadata.value ().size_bytes () - sizeof (TKafkaVersion));
@@ -937,8 +953,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
937
953
}
938
954
} // Y_UNIT_TEST(FetchScenario)
939
955
940
- Y_UNIT_TEST (BalanceScenario) {
941
-
956
+ void RunBalanceScenarionTest (bool forFederation) {
942
957
TString protocolName = " roundrobin" ;
943
958
TInsecureTestServer testServer (" 2" );
944
959
@@ -958,6 +973,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
958
973
CreateTopic (pqClient, topicName, minActivePartitions, {group});
959
974
CreateTopic (pqClient, secondTopicName, minActivePartitions, {group});
960
975
976
+ if (forFederation) {
977
+ testServer.KikimrServer ->GetServer ().GetRuntime ()->GetAppData ().PQConfig .SetTopicsAreFirstClassCitizen (false );
978
+ }
961
979
TKafkaTestClient clientA (testServer.Port );
962
980
TKafkaTestClient clientB (testServer.Port );
963
981
TKafkaTestClient clientC (testServer.Port );
@@ -991,6 +1009,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
991
1009
// clientA join group, and get all partitions
992
1010
auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions (topics, group, minActivePartitions, protocolName, minActivePartitions);
993
1011
UNIT_ASSERT_VALUES_EQUAL (clientA.Heartbeat (readInfoA.MemberId , readInfoA.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1012
+ UNIT_ASSERT_VALUES_EQUAL (readInfoA.Partitions [0 ].Topic , topicName);
994
1013
995
1014
// clientB join group, and get 0 partitions, becouse it's all at clientA
996
1015
UNIT_ASSERT_VALUES_EQUAL (clientB.SaslHandshake ()->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
@@ -1151,7 +1170,105 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
1151
1170
UNIT_ASSERT_VALUES_EQUAL (joinResponse->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); // tell client to rejoin
1152
1171
}
1153
1172
1154
- } // Y_UNIT_TEST(BalanceScenario)
1173
+ } // RunBalanceScenarionTest()
1174
+
1175
+ Y_UNIT_TEST (BalanceScenario) {
1176
+ RunBalanceScenarionTest (false );
1177
+ }
1178
+
1179
+ Y_UNIT_TEST (BalanceScenarioForFederation) {
1180
+ RunBalanceScenarionTest (true );
1181
+ }
1182
+
1183
+ Y_UNIT_TEST (BalanceScenarioCdc) {
1184
+
1185
+ TString protocolName = " roundrobin" ;
1186
+ TInsecureTestServer testServer (" 2" );
1187
+
1188
+
1189
+ TString tableName = " /Root/table-0-test" ;
1190
+ TString feedName = " feed" ;
1191
+ TString feedPath = tableName + " /" + feedName;
1192
+ TString tableShortName = " table-0-test" ;
1193
+ TString feedShortPath = tableShortName + " /" + feedName;
1194
+
1195
+ TString group = " consumer-0" ;
1196
+ TString notExistsGroup = " consumer-not-exists" ;
1197
+
1198
+ // create table and init cdc for it
1199
+ {
1200
+ NYdb::NTable::TTableClient tableClient (*testServer.Driver );
1201
+ tableClient.RetryOperationSync ([&](TSession session)
1202
+ {
1203
+ NYdb::NTable::TTableBuilder builder;
1204
+ builder.AddNonNullableColumn (" key" , NYdb::EPrimitiveType::Int64).SetPrimaryKeyColumn (" key" );
1205
+ builder.AddNonNullableColumn (" value" , NYdb::EPrimitiveType::Int64);
1206
+
1207
+ auto createResult = session.CreateTable (tableName, builder.Build ()).ExtractValueSync ();
1208
+ UNIT_ASSERT_VALUES_EQUAL (createResult.IsTransportError (), false );
1209
+ Cerr << createResult.GetIssues ().ToString () << " \n " ;
1210
+ UNIT_ASSERT_VALUES_EQUAL (createResult.GetStatus (), EStatus::SUCCESS);
1211
+
1212
+ auto alterResult = session.AlterTable (tableName, NYdb::NTable::TAlterTableSettings ()
1213
+ .AppendAddChangefeeds (NYdb::NTable::TChangefeedDescription (feedName,
1214
+ NYdb::NTable::EChangefeedMode::Updates,
1215
+ NYdb::NTable::EChangefeedFormat::Json))
1216
+ ).ExtractValueSync ();
1217
+ Cerr << alterResult.GetIssues ().ToString () << " \n " ;
1218
+ UNIT_ASSERT_VALUES_EQUAL (alterResult.IsTransportError (), false );
1219
+ UNIT_ASSERT_VALUES_EQUAL (alterResult.GetStatus (), EStatus::SUCCESS);
1220
+ return alterResult;
1221
+ }
1222
+ );
1223
+
1224
+ TValueBuilder rows;
1225
+ rows.BeginList ();
1226
+ rows.AddListItem ()
1227
+ .BeginStruct ()
1228
+ .AddMember (" key" ).Int64 (1 )
1229
+ .AddMember (" value" ).Int64 (2 )
1230
+ .EndStruct ();
1231
+ rows.EndList ();
1232
+
1233
+ auto upsertResult = tableClient.BulkUpsert (tableName, rows.Build ()).GetValueSync ();
1234
+ UNIT_ASSERT_EQUAL (upsertResult.GetStatus (), EStatus::SUCCESS);
1235
+ }
1236
+
1237
+ NYdb::NTopic::TTopicClient pqClient (*testServer.Driver );
1238
+ AlterTopic (pqClient, feedPath, {group});
1239
+
1240
+ for (auto name : {feedPath, feedShortPath} ) {
1241
+ TKafkaTestClient clientA (testServer.Port );
1242
+ {
1243
+ auto msg = clientA.ApiVersions ();
1244
+ UNIT_ASSERT_VALUES_EQUAL (msg->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1245
+ UNIT_ASSERT_VALUES_EQUAL (msg->ApiKeys .size (), 18u );
1246
+ }
1247
+ {
1248
+ auto msg = clientA.SaslHandshake ();
1249
+ UNIT_ASSERT_VALUES_EQUAL (msg->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1250
+ UNIT_ASSERT_VALUES_EQUAL (msg->Mechanisms .size (), 1u );
1251
+ UNIT_ASSERT_VALUES_EQUAL (*msg->Mechanisms [0 ], " PLAIN" );
1252
+ }
1253
+ {
1254
+ auto msg = clientA.SaslAuthenticate (" ouruser@/Root" , " ourUserPassword" );
1255
+ UNIT_ASSERT_VALUES_EQUAL (msg->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1256
+ }
1257
+
1258
+ {
1259
+ // Check partitions balance
1260
+ std::vector<TString> topics;
1261
+ topics.push_back (name);
1262
+
1263
+ // clientA join group, and get all partitions
1264
+ auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions (topics, group, 1 , protocolName, 1 );
1265
+ UNIT_ASSERT_VALUES_EQUAL (clientA.Heartbeat (readInfoA.MemberId , readInfoA.GenerationId , group)->ErrorCode , static_cast <TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1266
+
1267
+ UNIT_ASSERT_VALUES_EQUAL (readInfoA.Partitions .size (), 1 );
1268
+ UNIT_ASSERT_VALUES_EQUAL (readInfoA.Partitions [0 ].Topic , name);
1269
+ }
1270
+ }
1271
+ } // Y_UNIT_TEST(BalanceScenarioCdc)
1155
1272
1156
1273
Y_UNIT_TEST (OffsetCommitAndFetchScenario) {
1157
1274
TInsecureTestServer testServer (" 2" );
@@ -2178,9 +2295,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
2178
2295
UNIT_ASSERT_VALUES_EQUAL (syncRespB->ErrorCode , (TKafkaInt16)EKafkaErrors::NONE_ERROR);
2179
2296
UNIT_ASSERT_VALUES_EQUAL (syncRespC->ErrorCode , (TKafkaInt16)EKafkaErrors::NONE_ERROR);
2180
2297
2181
- auto countPartitions = [](const TConsumerProtocolAssignment& assignment) {
2298
+ auto countPartitions = [topicName ](const TConsumerProtocolAssignment& assignment) {
2182
2299
size_t sum = 0 ;
2183
2300
for (auto & ta : assignment.AssignedPartitions ) {
2301
+ UNIT_ASSERT_VALUES_EQUAL (ta.Topic , topicName);
2184
2302
sum += ta.Partitions .size ();
2185
2303
}
2186
2304
return sum;
0 commit comments