Skip to content

Commit dfbab97

Browse files
Fix kafka read in federation mode
1 parent 9888760 commit dfbab97

File tree

3 files changed

+125
-6
lines changed

3 files changed

+125
-6
lines changed

ydb/core/kafka_proxy/actors/kafka_offset_commit_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ void TKafkaOffsetCommitActor::Bootstrap(const NActors::TActorContext& ctx) {
177177
}
178178

179179
auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
180-
NKikimr::AppData(ctx)->PQConfig, ""
180+
true, "", ""
181181
);
182182

183183
auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(

ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ void TKafkaReadSessionActor::FillTopicsFromJoinGroupMetadata(TKafkaBytes& metada
389389
if (topic.has_value()) {
390390
auto normalizedTopicName = NormalizePath(Context->DatabasePath, topic.value());
391391
OriginalTopicNames[normalizedTopicName] = topic.value();
392+
OriginalTopicNames[normalizedTopicName + "/streamImpl"] = topic.value();
392393
topics.emplace(normalizedTopicName);
393394
KAFKA_LOG_D("JOIN_GROUP requested topic to read: " << topic);
394395
}
@@ -442,7 +443,7 @@ void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorCont
442443
void TKafkaReadSessionActor::AuthAndFindBalancers(const TActorContext& ctx) {
443444

444445
auto topicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
445-
AppData(ctx)->PQConfig, ""
446+
true, "", ""
446447
);
447448
auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(
448449
topicConverterFactory

ydb/core/kafka_proxy/ut/ut_protocol.cpp

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,22 @@ void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32
443443

444444
}
445445

446+
void AlterTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, std::vector<TString> consumers) {
447+
auto topicSettings = NYdb::NTopic::TAlterTopicSettings();
448+
449+
for (auto& consumer : consumers) {
450+
topicSettings.BeginAddConsumer(consumer).EndAddConsumer();
451+
}
452+
453+
auto result = pqClient
454+
.AlterTopic(topicName, topicSettings)
455+
.ExtractValueSync();
456+
457+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
458+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
459+
460+
}
461+
446462
TConsumerProtocolAssignment GetAssignments(NKafka::TSyncGroupResponseData::AssignmentMeta::Type metadata) {
447463
TKafkaVersion version = *(TKafkaVersion*)(metadata.value().data() + sizeof(TKafkaVersion));
448464
TBuffer buffer(metadata.value().data() + sizeof(TKafkaVersion), metadata.value().size_bytes() - sizeof(TKafkaVersion));
@@ -937,8 +953,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
937953
}
938954
} // Y_UNIT_TEST(FetchScenario)
939955

940-
Y_UNIT_TEST(BalanceScenario) {
941-
956+
void RunBalanceScenarionTest(bool forFederation) {
942957
TString protocolName = "roundrobin";
943958
TInsecureTestServer testServer("2");
944959

@@ -958,6 +973,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
958973
CreateTopic(pqClient, topicName, minActivePartitions, {group});
959974
CreateTopic(pqClient, secondTopicName, minActivePartitions, {group});
960975

976+
if (forFederation) {
977+
testServer.KikimrServer->GetServer().GetRuntime()->GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(false);
978+
}
961979
TKafkaTestClient clientA(testServer.Port);
962980
TKafkaTestClient clientB(testServer.Port);
963981
TKafkaTestClient clientC(testServer.Port);
@@ -991,6 +1009,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
9911009
// clientA join group, and get all partitions
9921010
auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, minActivePartitions, protocolName, minActivePartitions);
9931011
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1012+
UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions[0].Topic, topicName);
9941013

9951014
// clientB join group, and get 0 partitions, becouse it's all at clientA
9961015
UNIT_ASSERT_VALUES_EQUAL(clientB.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
@@ -1151,7 +1170,105 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
11511170
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); // tell client to rejoin
11521171
}
11531172

1154-
} // Y_UNIT_TEST(BalanceScenario)
1173+
} // RunBalanceScenarionTest()
1174+
1175+
Y_UNIT_TEST(BalanceScenario) {
1176+
RunBalanceScenarionTest(false);
1177+
}
1178+
1179+
Y_UNIT_TEST(BalanceScenarioForFederation) {
1180+
RunBalanceScenarionTest(true);
1181+
}
1182+
1183+
Y_UNIT_TEST(BalanceScenarioCdc) {
1184+
1185+
TString protocolName = "roundrobin";
1186+
TInsecureTestServer testServer("2");
1187+
1188+
1189+
TString tableName = "/Root/table-0-test";
1190+
TString feedName = "feed";
1191+
TString feedPath = tableName + "/" + feedName;
1192+
TString tableShortName = "table-0-test";
1193+
TString feedShortPath = tableShortName + "/" + feedName;
1194+
1195+
TString group = "consumer-0";
1196+
TString notExistsGroup = "consumer-not-exists";
1197+
1198+
// create table and init cdc for it
1199+
{
1200+
NYdb::NTable::TTableClient tableClient(*testServer.Driver);
1201+
tableClient.RetryOperationSync([&](TSession session)
1202+
{
1203+
NYdb::NTable::TTableBuilder builder;
1204+
builder.AddNonNullableColumn("key", NYdb::EPrimitiveType::Int64).SetPrimaryKeyColumn("key");
1205+
builder.AddNonNullableColumn("value", NYdb::EPrimitiveType::Int64);
1206+
1207+
auto createResult = session.CreateTable(tableName, builder.Build()).ExtractValueSync();
1208+
UNIT_ASSERT_VALUES_EQUAL(createResult.IsTransportError(), false);
1209+
Cerr << createResult.GetIssues().ToString() << "\n";
1210+
UNIT_ASSERT_VALUES_EQUAL(createResult.GetStatus(), EStatus::SUCCESS);
1211+
1212+
auto alterResult = session.AlterTable(tableName, NYdb::NTable::TAlterTableSettings()
1213+
.AppendAddChangefeeds(NYdb::NTable::TChangefeedDescription(feedName,
1214+
NYdb::NTable::EChangefeedMode::Updates,
1215+
NYdb::NTable::EChangefeedFormat::Json))
1216+
).ExtractValueSync();
1217+
Cerr << alterResult.GetIssues().ToString() << "\n";
1218+
UNIT_ASSERT_VALUES_EQUAL(alterResult.IsTransportError(), false);
1219+
UNIT_ASSERT_VALUES_EQUAL(alterResult.GetStatus(), EStatus::SUCCESS);
1220+
return alterResult;
1221+
}
1222+
);
1223+
1224+
TValueBuilder rows;
1225+
rows.BeginList();
1226+
rows.AddListItem()
1227+
.BeginStruct()
1228+
.AddMember("key").Int64(1)
1229+
.AddMember("value").Int64(2)
1230+
.EndStruct();
1231+
rows.EndList();
1232+
1233+
auto upsertResult = tableClient.BulkUpsert(tableName, rows.Build()).GetValueSync();
1234+
UNIT_ASSERT_EQUAL(upsertResult.GetStatus(), EStatus::SUCCESS);
1235+
}
1236+
1237+
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
1238+
AlterTopic(pqClient, feedPath, {group});
1239+
1240+
for(auto name : {feedPath, feedShortPath} ) {
1241+
TKafkaTestClient clientA(testServer.Port);
1242+
{
1243+
auto msg = clientA.ApiVersions();
1244+
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1245+
UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
1246+
}
1247+
{
1248+
auto msg = clientA.SaslHandshake();
1249+
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1250+
UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
1251+
UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
1252+
}
1253+
{
1254+
auto msg = clientA.SaslAuthenticate("ouruser@/Root", "ourUserPassword");
1255+
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1256+
}
1257+
1258+
{
1259+
// Check partitions balance
1260+
std::vector<TString> topics;
1261+
topics.push_back(name);
1262+
1263+
// clientA join group, and get all partitions
1264+
auto readInfoA = clientA.JoinAndSyncGroupAndWaitPartitions(topics, group, 1, protocolName, 1);
1265+
UNIT_ASSERT_VALUES_EQUAL(clientA.Heartbeat(readInfoA.MemberId, readInfoA.GenerationId, group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1266+
1267+
UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions.size(), 1);
1268+
UNIT_ASSERT_VALUES_EQUAL(readInfoA.Partitions[0].Topic, name);
1269+
}
1270+
}
1271+
} // Y_UNIT_TEST(BalanceScenarioCdc)
11551272

11561273
Y_UNIT_TEST(OffsetCommitAndFetchScenario) {
11571274
TInsecureTestServer testServer("2");
@@ -2178,9 +2295,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
21782295
UNIT_ASSERT_VALUES_EQUAL(syncRespB->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR);
21792296
UNIT_ASSERT_VALUES_EQUAL(syncRespC->ErrorCode, (TKafkaInt16)EKafkaErrors::NONE_ERROR);
21802297

2181-
auto countPartitions = [](const TConsumerProtocolAssignment& assignment) {
2298+
auto countPartitions = [topicName](const TConsumerProtocolAssignment& assignment) {
21822299
size_t sum = 0;
21832300
for (auto& ta : assignment.AssignedPartitions) {
2301+
UNIT_ASSERT_VALUES_EQUAL(ta.Topic, topicName);
21842302
sum += ta.Partitions.size();
21852303
}
21862304
return sum;

0 commit comments

Comments
 (0)