Skip to content

Enable DescribeConsumerGroup test #1449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ func findConsumerGroupDescription(groups []ConsumerGroupDescription, group strin
// checkGroupDesc is a helper function to check the validity of a ConsumerGroupDescription.
// We can't directly use DeepEqual because some fields/slice orders change with every run.
func checkGroupDesc(
groupDesc *ConsumerGroupDescription, state ConsumerGroupState, group string,
groupDesc *ConsumerGroupDescription, state ConsumerGroupState,
groupType ConsumerGroupType, group string,
protocol string, clientIDToPartitions map[string][]TopicPartition) bool {
if groupDesc.GroupID != group ||
groupDesc.State != state ||
groupDesc.Type != groupType ||
groupDesc.Error.Code() != ErrNoError ||
groupDesc.PartitionAssignor != protocol ||
// We can't check exactly the Broker information, but we add a check for the zero-value of the Host.
Expand Down Expand Up @@ -959,10 +961,6 @@ func (its *IntegrationTestSuite) TestAdminClient_ListConsumerGroups() {
// 3. Empty consumer group.
func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups() {
t := its.T()
if !testConsumerGroupProtocolClassic() {
t.Skipf("KIP 848 Admin operations changes still aren't " +
"available")
}

// Generating a new topic/groupID to ensure a fresh group/topic is created.
rand.Seed(time.Now().Unix())
Expand All @@ -973,6 +971,11 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups()
clientID1 := "test.client.1"
clientID2 := "test.client.2"

groupType := ConsumerGroupTypeClassic
if !testConsumerGroupProtocolClassic() {
groupType = ConsumerGroupTypeConsumer
}

ac := createAdminClient(t)
defer ac.Close()

Expand Down Expand Up @@ -1080,7 +1083,7 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups()
{Topic: &topic, Partition: 0, Offset: OffsetInvalid},
{Topic: &topic, Partition: 1, Offset: OffsetInvalid},
}
if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupID, "range", clientIDToPartitions) {
if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupType, groupID, "range", clientIDToPartitions) {
t.Errorf("Expected description for consumer group %s is not same as actual: %v", groupID, groupDesc)
return
}
Expand Down Expand Up @@ -1140,7 +1143,7 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups()
clientIDToPartitions[clientID2] = []TopicPartition{
{Topic: &topic, Partition: 1, Offset: OffsetInvalid},
}
if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupID, "range", clientIDToPartitions) {
if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupType, groupID, "range", clientIDToPartitions) {
t.Errorf("Expected description for consumer group %s is not same as actual %v\n", groupID, groupDesc)
return
}
Expand Down Expand Up @@ -1177,7 +1180,7 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups()
}

clientIDToPartitions = make(map[string][]TopicPartition)
if !checkGroupDesc(groupDesc, ConsumerGroupStateEmpty, groupID, "", clientIDToPartitions) {
if !checkGroupDesc(groupDesc, ConsumerGroupStateEmpty, groupType, groupID, "", clientIDToPartitions) {
t.Errorf("Expected description for consumer group %s is not same as actual %v\n", groupID, groupDesc)
}

Expand Down
4 changes: 4 additions & 0 deletions kafka/testresources/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_P
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
sasl.enabled.mechanisms=PLAIN
controller.quorum.voters=0@kafka:38705
group.consumer.min.session.timeout.ms=5000
group.consumer.session.timeout.ms=5000
Copy link
Preview

Copilot AI Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing whitespace at the end of this line to prevent potential parsing issues in configuration loading.

Suggested change
group.consumer.session.timeout.ms=5000
group.consumer.session.timeout.ms=5000

Copilot uses AI. Check for mistakes.

group.consumer.min.heartbeat.interval.ms=3000
group.consumer.heartbeat.interval.ms=3000