diff --git a/kafka/integration_test.go b/kafka/integration_test.go index ab03a21d3..33290d405 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -101,10 +101,12 @@ func findConsumerGroupDescription(groups []ConsumerGroupDescription, group strin // checkGroupDesc is a helper function to check the validity of a ConsumerGroupDescription. // We can't directly use DeepEqual because some fields/slice orders change with every run. func checkGroupDesc( - groupDesc *ConsumerGroupDescription, state ConsumerGroupState, group string, + groupDesc *ConsumerGroupDescription, state ConsumerGroupState, + groupType ConsumerGroupType, group string, protocol string, clientIDToPartitions map[string][]TopicPartition) bool { if groupDesc.GroupID != group || groupDesc.State != state || + groupDesc.Type != groupType || groupDesc.Error.Code() != ErrNoError || groupDesc.PartitionAssignor != protocol || // We can't check exactly the Broker information, but we add a check for the zero-value of the Host. @@ -959,10 +961,6 @@ func (its *IntegrationTestSuite) TestAdminClient_ListConsumerGroups() { // 3. Empty consumer group. func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups() { t := its.T() - if !testConsumerGroupProtocolClassic() { - t.Skipf("KIP 848 Admin operations changes still aren't " + - "available") - } // Generating a new topic/groupID to ensure a fresh group/topic is created. rand.Seed(time.Now().Unix()) @@ -973,6 +971,11 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups() clientID1 := "test.client.1" clientID2 := "test.client.2" + groupType := ConsumerGroupTypeClassic + if !testConsumerGroupProtocolClassic() { + groupType = ConsumerGroupTypeConsumer + } + ac := createAdminClient(t) defer ac.Close() @@ -1080,7 +1083,7 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups() {Topic: &topic, Partition: 0, Offset: OffsetInvalid}, {Topic: &topic, Partition: 1, Offset: OffsetInvalid}, } - if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupID, "range", clientIDToPartitions) { + if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupType, groupID, "range", clientIDToPartitions) { t.Errorf("Expected description for consumer group %s is not same as actual: %v", groupID, groupDesc) return } @@ -1140,7 +1143,7 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups() clientIDToPartitions[clientID2] = []TopicPartition{ {Topic: &topic, Partition: 1, Offset: OffsetInvalid}, } - if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupID, "range", clientIDToPartitions) { + if !checkGroupDesc(groupDesc, ConsumerGroupStateStable, groupType, groupID, "range", clientIDToPartitions) { t.Errorf("Expected description for consumer group %s is not same as actual %v\n", groupID, groupDesc) return } @@ -1177,7 +1180,7 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups() } clientIDToPartitions = make(map[string][]TopicPartition) - if !checkGroupDesc(groupDesc, ConsumerGroupStateEmpty, groupID, "", clientIDToPartitions) { + if !checkGroupDesc(groupDesc, ConsumerGroupStateEmpty, groupType, groupID, "", clientIDToPartitions) { t.Errorf("Expected description for consumer group %s is not same as actual %v\n", groupID, groupDesc) } diff --git a/kafka/testresources/kraft/server.properties b/kafka/testresources/kraft/server.properties index 636f9cb38..327eff076 100644 --- a/kafka/testresources/kraft/server.properties +++ b/kafka/testresources/kraft/server.properties @@ -27,3 +27,7 @@ listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_P authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer sasl.enabled.mechanisms=PLAIN controller.quorum.voters=0@kafka:38705 +group.consumer.min.session.timeout.ms=5000 +group.consumer.session.timeout.ms=5000 +group.consumer.min.heartbeat.interval.ms=3000 +group.consumer.heartbeat.interval.ms=3000