Skip to content

Test changes needed to work with 4.0 #1386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 105 additions & 34 deletions kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/binary"
"fmt"
"math/rand"
"os"
"path"
"reflect"
"runtime"
Expand Down Expand Up @@ -126,6 +127,34 @@ func checkGroupDesc(
return true
}

func waitGroupState(t *testing.T, ac *AdminClient, groupID string,
consumers []*Consumer, waitState ConsumerGroupState) (groupDesc *ConsumerGroupDescription) {
complete := false
for !complete {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
groupDescResult, err := ac.DescribeConsumerGroups(ctx, []string{groupID},
SetAdminRequestTimeout(30*time.Second))
if err != nil {
t.Fatalf("Error describing consumer groups %s\n", err)
return
}
groupDescs := groupDescResult.ConsumerGroupDescriptions
groupDesc = findConsumerGroupDescription(groupDescs, groupID)
if groupDesc == nil {
t.Fatalf("Consumer group %s should be present\n", groupID)
return
}
complete = groupDesc.State == waitState &&
len(groupDesc.Members) == len(consumers)
for _, consumer := range consumers {
consumer.Poll(5 * 1000)
}
time.Sleep(time.Second)
}
return
}

var testMsgsInit = false
var p0TestMsgs []*testmsgType // partition 0 test messages
// pAllTestMsgs holds messages for various partitions including PartitionAny and invalid partitions
Expand Down Expand Up @@ -598,12 +627,12 @@ func validateConfig(t *testing.T, results []ConfigResourceResult, expResults []C

type IntegrationTestSuite struct {
suite.Suite
compose *compose.LocalDockerCompose
compose compose.ComposeStack
}

func (its *IntegrationTestSuite) TearDownSuite() {
if testconf.DockerNeeded && its.compose != nil {
its.compose.Down()
its.compose.Down(context.Background())
}
}

Expand Down Expand Up @@ -1045,6 +1074,9 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups()
// Call Poll to trigger a rebalance and give it enough time to finish.
consumer1.Poll(10 * 1000)

waitGroupState(t, ac, groupID, []*Consumer{consumer1},
ConsumerGroupStateStable)

// Check the existence of the group.
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
Expand Down Expand Up @@ -1116,26 +1148,8 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups()
// that it becomes stable.
// We need to make sure that the consumer group stabilizes since we will
// check for the state later on.
consumer2.Poll(5 * 1000)
consumer1.Poll(5 * 1000)
isGroupStable := false
for !isGroupStable {
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
groupDescResult, err = ac.DescribeConsumerGroups(ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second))
if err != nil {
t.Errorf("Error describing consumer groups %s\n", err)
return
}
groupDescs = groupDescResult.ConsumerGroupDescriptions
groupDesc = findConsumerGroupDescription(groupDescs, groupID)
if groupDesc == nil {
t.Errorf("Consumer group %s should be present\n", groupID)
return
}
isGroupStable = groupDesc.State == ConsumerGroupStateStable
time.Sleep(time.Second)
}
groupDesc = waitGroupState(t, ac, groupID, []*Consumer{consumer1, consumer2},
ConsumerGroupStateStable)

clientIDToPartitions[clientID1] = []TopicPartition{
{Topic: &topic, Partition: 0, Offset: OffsetInvalid},
Expand Down Expand Up @@ -2106,7 +2120,10 @@ func (its *IntegrationTestSuite) TestAdminACLs() {
topic := testconf.TopicName
group := testconf.GroupID
noError := NewError(ErrNoError, "", false)
unknownError := NewError(ErrUnknown, "Unknown broker error", false)
wrongPrincipalError := NewError(ErrInvalidRequest,
"Could not parse principal "+
"from `wrong-principal` (no colon is present separating"+
" the principal type from the principal name)", false)
var expectedCreateACLs []CreateACLResult
var expectedDescribeACLs DescribeACLsResult
var expectedDeleteACLs []DeleteACLsResult
Expand Down Expand Up @@ -2227,10 +2244,12 @@ func (its *IntegrationTestSuite) TestAdminACLs() {
if err != nil {
t.Fatalf("CreateACLs() failed: %s", err)
}
expectedCreateACLs = []CreateACLResult{{Error: unknownError}}
expectedCreateACLs = []CreateACLResult{{Error: wrongPrincipalError}}
checkExpectedResult(expectedCreateACLs, resultCreateACLs)
}

time.Sleep(1 * time.Second)

// DescribeACLs must return the three ACLs
ctx, cancel = context.WithTimeout(context.Background(), maxDuration)
defer cancel()
Expand Down Expand Up @@ -2280,6 +2299,7 @@ func (its *IntegrationTestSuite) TestAdminACLs() {
}
checkExpectedResult(expectedDeleteACLs, resultDeleteACLs)

time.Sleep(1 * time.Second)
// All the ACLs should have been deleted
ctx, cancel = context.WithTimeout(context.Background(), maxDuration)
defer cancel()
Expand Down Expand Up @@ -2336,6 +2356,8 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAllConsumerGroupsOffsets()
}
}

time.Sleep(1 * time.Second)

// Join a consumer group and subscribe to the created topics,
// commit some offsets to read later.
group := fmt.Sprintf("%s-%d", testconf.GroupID, rand.Intn(100000))
Expand Down Expand Up @@ -2945,11 +2967,12 @@ func (its *IntegrationTestSuite) TestProducerConsumerTimestamps() {

drChan := make(chan Event, 1)

/* Offset the timestamp to avoid comparison with system clock */
future, _ := time.ParseDuration("87658h") // 10y
/* Messages produced with a timestamp more than 1 hour in the future
* are rejected. */
future, _ := time.ParseDuration("2h")
timestamp := time.Now().Add(future)
key := fmt.Sprintf("TS: %v", timestamp)
t.Logf("Producing message with timestamp %v", timestamp)
t.Logf("Producing message with timestamp %v, should fail", timestamp)
err = p.Produce(&Message{
TopicPartition: TopicPartition{Topic: &testconf.TopicName, Partition: 0},
Key: []byte(key),
Expand All @@ -2967,6 +2990,37 @@ func (its *IntegrationTestSuite) TestProducerConsumerTimestamps() {
if !ok {
t.Fatalf("drChan: Expected *Message, got %v", ev)
}
if m.TopicPartition.Error == nil {
t.Fatalf("Delivery should fail, got no Error")
}

if m.TopicPartition.Error.Error() != "Broker: Invalid timestamp" {
t.Fatalf(
"Delivery should fail with error 'Broker: Invalid timestamp', not %v",
m.TopicPartition.Error)
}

future, _ = time.ParseDuration("1h")
timestamp = time.Now().Add(future)
key = fmt.Sprintf("TS: %v", timestamp)
t.Logf("Producing message with timestamp %v, should succeed", timestamp)
err = p.Produce(&Message{
TopicPartition: TopicPartition{Topic: &testconf.TopicName, Partition: 0},
Key: []byte(key),
Timestamp: timestamp},
drChan)

if err != nil {
t.Fatalf("Produce: %v", err)
}

// Wait for delivery
t.Logf("Awaiting delivery report")
ev = <-drChan
m, ok = ev.(*Message)
if !ok {
t.Fatalf("drChan: Expected *Message, got %v", ev)
}
if m.TopicPartition.Error != nil {
t.Fatalf("Delivery failed: %v", m.TopicPartition)
}
Expand Down Expand Up @@ -3164,7 +3218,8 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() {
}
defer ac.Close()

users := []string{"non-existent"}
user := fmt.Sprintf("UserScramCredentials-%d", rand.Int())
users := []string{user}

// Call DescribeUserScramCredentials
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
Expand All @@ -3190,7 +3245,7 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() {
// Call AlterUserScramCredentials for Upsert
upsertions := []UserScramCredentialUpsertion{
{
User: "non-existent",
User: user,
ScramCredentialInfo: ScramCredentialInfo{
Mechanism: ScramMechanismSHA256, Iterations: 10000},
Password: []byte("password"),
Expand All @@ -3201,6 +3256,9 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() {
defer cancel()
alterRes, alterErr := ac.AlterUserScramCredentials(ctx, upsertions, nil)

// Wait for propagation
time.Sleep(1 * time.Second)

// Check Upsert result
if alterErr != nil {
t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", alterErr)
Expand Down Expand Up @@ -3243,7 +3301,7 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() {

// Call AlterUserScramCredentials for Delete
deletions := []UserScramCredentialDeletion{
{User: "non-existent", Mechanism: ScramMechanismSHA256}}
{User: user, Mechanism: ScramMechanismSHA256}}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
alterRes, alterErr = ac.AlterUserScramCredentials(ctx, nil, deletions)
Expand All @@ -3260,6 +3318,9 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() {
t.Fatalf("Error code should be ErrNoError instead it is %d", kErr.Code())
}

// Wait for propagation
time.Sleep(1 * time.Second)

// Call DescribeUserScramCredentials to verify delete
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
Expand Down Expand Up @@ -3489,14 +3550,24 @@ func TestIntegration(t *testing.T) {
return
}
if testconf.DockerNeeded && !testconf.DockerExists {
// Ryuk is terminating the containers before
os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true")

dockerCompose := "./testresources/docker-compose.yaml"
if !testConsumerGroupProtocolClassic() {
dockerCompose = "./testresources/docker-compose-kraft.yaml"
}
its.compose = compose.NewLocalDockerCompose([]string{dockerCompose}, "test-docker")
execErr := its.compose.WithCommand([]string{"up", "-d"}).Invoke()
if err := execErr.Error; err != nil {
t.Fatalf("up -d command failed with the error message %s\n", err)
composeInstance, err := compose.NewDockerComposeWith(
compose.WithStackFiles(dockerCompose),
compose.StackIdentifier("test-docker"))
if err != nil {
t.Fatalf("NewDockerComposeWith failed with the error %s\n", err)
}
its.compose = composeInstance

err = its.compose.Up(context.Background(), compose.Wait(true))
if err != nil {
t.Fatalf("Up command failed with the error message %s\n", err)
}
// It takes some time after the containers come up for them to be ready.
time.Sleep(20 * time.Second)
Expand Down
1 change: 1 addition & 0 deletions kafka/testhelpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ func createTestTopic(t *testing.T, suffix string, numPartitions int, replication
}

}
time.Sleep(1 * time.Second)

return topic
}
2 changes: 1 addition & 1 deletion kafka/testresources/docker-compose-kraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'
services:
kafka:
build: ./kraft
restart: always
restart: unless-stopped
ports:
- 9092:29092
- 9093:29093
Expand Down
4 changes: 4 additions & 0 deletions kafka/testresources/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.2
restart: unless-stopped
environment:
ZOOKEEPER_CLIENT_PORT: 2181

zookeeper_auth:
image: confluentinc/cp-zookeeper
restart: unless-stopped
environment:
ZOOKEEPER_CLIENT_PORT: 2182
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf"
Expand All @@ -15,6 +17,7 @@ services:

kafka:
image: confluentinc/cp-kafka:7.6.2
restart: unless-stopped
depends_on:
- zookeeper
ports:
Expand All @@ -34,6 +37,7 @@ services:

kafka_auth:
image: confluentinc/cp-kafka:7.6.2
restart: unless-stopped
depends_on:
- zookeeper_auth
ports:
Expand Down