Skip to content

Commit 48b56a4

Browse files
committed
fix
1 parent 77ab3d7 commit 48b56a4

File tree

4 files changed

+13
-11
lines changed

4 files changed

+13
-11
lines changed

ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,4 +218,4 @@ const TString UPDATE_TTL_LEAVE_GROUP = R"(
218218
} // namespace NKafka
219219

220220

221-
// savnik check max members count
221+
// savnik check max members count

ydb/core/kafka_proxy/kafka_messages.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4099,6 +4099,7 @@ class TJoinGroupResponseData : public TApiMessage {
40994099
static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
41004100
};
41014101
MetadataMeta::Type Metadata;
4102+
TString MetaStr;
41024103

41034104
i32 Size(TKafkaVersion version) const override;
41044105
void Read(TKafkaReadable& readable, TKafkaVersion version) override;
@@ -4692,6 +4693,7 @@ class TSyncGroupRequestData : public TApiMessage {
46924693
static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
46934694
};
46944695
AssignmentMeta::Type Assignment;
4696+
TString AssignmentStr;
46954697

46964698
i32 Size(TKafkaVersion version) const override;
46974699
void Read(TKafkaReadable& readable, TKafkaVersion version) override;

ydb/core/kafka_proxy/ut/ut_protocol.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,7 +1472,7 @@ Y_UNIT_TEST(ProduceScenario) {
14721472
// clientB join group, and get 0 partitions, becouse it's all at clientA
14731473
UNIT_ASSERT_VALUES_EQUAL(clientB.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
14741474
UNIT_ASSERT_VALUES_EQUAL(clientB.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1475-
auto readInfoB = clientB.JoinAndSyncGroup(topics, group);
1475+
auto readInfoB = clientB.JoinAndSyncGroup(topics, group, protocolName, 1000000, minActivePartitions);
14761476
UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 0);
14771477

14781478
// clientA gets RABALANCE status, because of new reader. We need to release some partitions for new client
@@ -1492,7 +1492,7 @@ Y_UNIT_TEST(ProduceScenario) {
14921492
// clientC join group, and get 0 partitions, becouse it's all at clientA and clientB
14931493
UNIT_ASSERT_VALUES_EQUAL(clientC.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
14941494
UNIT_ASSERT_VALUES_EQUAL(clientC.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1495-
auto readInfoC = clientC.JoinAndSyncGroup(topics, group);
1495+
auto readInfoC = clientC.JoinAndSyncGroup(topics, group, protocolName, 1000000, minActivePartitions);
14961496
UNIT_ASSERT_VALUES_EQUAL(readInfoC.Partitions.size(), 0);
14971497

14981498
// all clients gets RABALANCE status, because of new reader. We need to release some partitions for new client
@@ -1514,7 +1514,7 @@ Y_UNIT_TEST(ProduceScenario) {
15141514
// clientD join group, and get 0 partitions, becouse it's all at clientA, clientB and clientC
15151515
UNIT_ASSERT_VALUES_EQUAL(clientD.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
15161516
UNIT_ASSERT_VALUES_EQUAL(clientD.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1517-
auto readInfoD = clientD.JoinAndSyncGroup(topics, group);
1517+
auto readInfoD = clientD.JoinAndSyncGroup(topics, group, protocolName, 1000000, minActivePartitions);
15181518
UNIT_ASSERT_VALUES_EQUAL(readInfoD.Partitions.size(), 0);
15191519

15201520
// all clients gets RABALANCE status, because of new reader. We need to release some partitions
@@ -1570,9 +1570,9 @@ Y_UNIT_TEST(ProduceScenario) {
15701570
std::vector<TString> topics;
15711571
topics.push_back(topicName);
15721572

1573-
auto readInfoA = clientA.JoinGroup(topics, group);
1573+
auto readInfoA = clientA.JoinGroup(topics, group, protocolName);
15741574
Sleep(TDuration::MilliSeconds(200));
1575-
auto readInfoB = clientB.JoinGroup(topics, group);
1575+
auto readInfoB = clientB.JoinGroup(topics, group, protocolName);
15761576
Sleep(TDuration::MilliSeconds(200));
15771577

15781578
UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
@@ -1584,7 +1584,7 @@ Y_UNIT_TEST(ProduceScenario) {
15841584
std::vector<TString> topics;
15851585
topics.push_back(shortTopicName);
15861586

1587-
auto joinResponse = clientA.JoinGroup(topics, group);
1587+
auto joinResponse = clientA.JoinGroup(topics, group, protocolName);
15881588
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
15891589
UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(joinResponse->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
15901590
}
@@ -1594,7 +1594,7 @@ Y_UNIT_TEST(ProduceScenario) {
15941594
std::vector<TString> topics;
15951595
topics.push_back(topicName);
15961596

1597-
auto joinResponse = clientA.JoinGroup(topics, notExistsGroup);
1597+
auto joinResponse = clientA.JoinGroup(topics, notExistsGroup, protocolName);
15981598
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::INVALID_REQUEST)); // because TReadInitAndAuthActor returns BAD_REQUEST on failed readRule check
15991599
}
16001600

@@ -1603,7 +1603,7 @@ Y_UNIT_TEST(ProduceScenario) {
16031603
std::vector<TString> topics;
16041604
topics.push_back(notExistsTopicName);
16051605

1606-
auto joinResponse = clientA.JoinGroup(topics, group);
1606+
auto joinResponse = clientA.JoinGroup(topics, group, protocolName);
16071607
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION));
16081608
}
16091609

@@ -1624,7 +1624,7 @@ Y_UNIT_TEST(ProduceScenario) {
16241624

16251625
// Check change topics list
16261626
topics.pop_back();
1627-
auto joinResponse = clientA.JoinGroup(topics, group);
1627+
auto joinResponse = clientA.JoinGroup(topics, group, protocolName);
16281628
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); // tell client to rejoin
16291629
}
16301630

ydb/core/protos/kafka.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ message TWorkerState {
99
string protocol_name = 1;
1010
bytes metadata = 2;
1111
}
12-
}
12+
}

0 commit comments

Comments
 (0)