Skip to content

Commit ad15759

Browse files
committed
fix
1 parent 2a78847 commit ad15759

File tree

4 files changed

+13
-11
lines changed

4 files changed

+13
-11
lines changed

ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,4 +218,4 @@ const TString UPDATE_TTL_LEAVE_GROUP = R"(
218218
} // namespace NKafka
219219

220220

221-
// savnik check max members count
221+
// savnik check max members count

ydb/core/kafka_proxy/kafka_messages.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4098,6 +4098,7 @@ class TJoinGroupResponseData : public TApiMessage {
40984098
static constexpr TKafkaVersions FlexibleVersions = {6, Max<TKafkaVersion>()};
40994099
};
41004100
MetadataMeta::Type Metadata;
4101+
TString MetaStr;
41014102

41024103
i32 Size(TKafkaVersion version) const override;
41034104
void Read(TKafkaReadable& readable, TKafkaVersion version) override;
@@ -4691,6 +4692,7 @@ class TSyncGroupRequestData : public TApiMessage {
46914692
static constexpr TKafkaVersions FlexibleVersions = {4, Max<TKafkaVersion>()};
46924693
};
46934694
AssignmentMeta::Type Assignment;
4695+
TString AssignmentStr;
46944696

46954697
i32 Size(TKafkaVersion version) const override;
46964698
void Read(TKafkaReadable& readable, TKafkaVersion version) override;

ydb/core/kafka_proxy/ut/ut_protocol.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1483,7 +1483,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
14831483
// clientB join group, and get 0 partitions, becouse it's all at clientA
14841484
UNIT_ASSERT_VALUES_EQUAL(clientB.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
14851485
UNIT_ASSERT_VALUES_EQUAL(clientB.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1486-
auto readInfoB = clientB.JoinAndSyncGroup(topics, group);
1486+
auto readInfoB = clientB.JoinAndSyncGroup(topics, group, protocolName, 1000000, minActivePartitions);
14871487
UNIT_ASSERT_VALUES_EQUAL(readInfoB.Partitions.size(), 0);
14881488

14891489
// clientA gets RABALANCE status, because of new reader. We need to release some partitions for new client
@@ -1503,7 +1503,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
15031503
// clientC join group, and get 0 partitions, becouse it's all at clientA and clientB
15041504
UNIT_ASSERT_VALUES_EQUAL(clientC.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
15051505
UNIT_ASSERT_VALUES_EQUAL(clientC.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1506-
auto readInfoC = clientC.JoinAndSyncGroup(topics, group);
1506+
auto readInfoC = clientC.JoinAndSyncGroup(topics, group, protocolName, 1000000, minActivePartitions);
15071507
UNIT_ASSERT_VALUES_EQUAL(readInfoC.Partitions.size(), 0);
15081508

15091509
// all clients gets RABALANCE status, because of new reader. We need to release some partitions for new client
@@ -1525,7 +1525,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
15251525
// clientD join group, and get 0 partitions, becouse it's all at clientA, clientB and clientC
15261526
UNIT_ASSERT_VALUES_EQUAL(clientD.SaslHandshake()->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
15271527
UNIT_ASSERT_VALUES_EQUAL(clientD.SaslAuthenticate("ouruser@/Root", "ourUserPassword")->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
1528-
auto readInfoD = clientD.JoinAndSyncGroup(topics, group);
1528+
auto readInfoD = clientD.JoinAndSyncGroup(topics, group, protocolName, 1000000, minActivePartitions);
15291529
UNIT_ASSERT_VALUES_EQUAL(readInfoD.Partitions.size(), 0);
15301530

15311531
// all clients gets RABALANCE status, because of new reader. We need to release some partitions
@@ -1581,9 +1581,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
15811581
std::vector<TString> topics;
15821582
topics.push_back(topicName);
15831583

1584-
auto readInfoA = clientA.JoinGroup(topics, group);
1584+
auto readInfoA = clientA.JoinGroup(topics, group, protocolName);
15851585
Sleep(TDuration::MilliSeconds(200));
1586-
auto readInfoB = clientB.JoinGroup(topics, group);
1586+
auto readInfoB = clientB.JoinGroup(topics, group, protocolName);
15871587
Sleep(TDuration::MilliSeconds(200));
15881588

15891589
UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(readInfoA->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
@@ -1595,7 +1595,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
15951595
std::vector<TString> topics;
15961596
topics.push_back(shortTopicName);
15971597

1598-
auto joinResponse = clientA.JoinGroup(topics, group);
1598+
auto joinResponse = clientA.JoinGroup(topics, group, protocolName);
15991599
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
16001600
UNIT_ASSERT_VALUES_EQUAL(clientA.LeaveGroup(joinResponse->MemberId.value(), group)->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
16011601
}
@@ -1605,7 +1605,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
16051605
std::vector<TString> topics;
16061606
topics.push_back(topicName);
16071607

1608-
auto joinResponse = clientA.JoinGroup(topics, notExistsGroup);
1608+
auto joinResponse = clientA.JoinGroup(topics, notExistsGroup, protocolName);
16091609
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::GROUP_ID_NOT_FOUND));
16101610
}
16111611

@@ -1614,7 +1614,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
16141614
std::vector<TString> topics;
16151615
topics.push_back(notExistsTopicName);
16161616

1617-
auto joinResponse = clientA.JoinGroup(topics, group);
1617+
auto joinResponse = clientA.JoinGroup(topics, group, protocolName);
16181618
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION));
16191619
}
16201620

@@ -1635,7 +1635,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
16351635

16361636
// Check change topics list
16371637
topics.pop_back();
1638-
auto joinResponse = clientA.JoinGroup(topics, group);
1638+
auto joinResponse = clientA.JoinGroup(topics, group, protocolName);
16391639
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::REBALANCE_IN_PROGRESS)); // tell client to rejoin
16401640
}
16411641

ydb/core/protos/kafka.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ message TWorkerState {
99
string protocol_name = 1;
1010
bytes metadata = 2;
1111
}
12-
}
12+
}

0 commit comments

Comments
 (0)