From 9ada9bbd5958b377c3a687fd064b5baed2baafe4 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 3 Feb 2025 16:44:13 +0100 Subject: [PATCH 1/2] Wait for propagation after ACL, SCRAM user, create topics operations Replace testcontainers deprecated calls. --- kafka/integration_test.go | 94 +++++++++++++------ kafka/testhelpers_test.go | 1 + kafka/testresources/docker-compose-kraft.yaml | 2 +- kafka/testresources/docker-compose.yaml | 4 + 4 files changed, 71 insertions(+), 30 deletions(-) diff --git a/kafka/integration_test.go b/kafka/integration_test.go index d0fa2eb0d..2ec3e746f 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "fmt" "math/rand" + "os" "path" "reflect" "runtime" @@ -126,6 +127,34 @@ func checkGroupDesc( return true } +func waitGroupState(t *testing.T, ac *AdminClient, groupID string, + consumers []*Consumer, waitState ConsumerGroupState) (groupDesc *ConsumerGroupDescription) { + complete := false + for !complete { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + groupDescResult, err := ac.DescribeConsumerGroups(ctx, []string{groupID}, + SetAdminRequestTimeout(30*time.Second)) + if err != nil { + t.Fatalf("Error describing consumer groups %s\n", err) + return + } + groupDescs := groupDescResult.ConsumerGroupDescriptions + groupDesc = findConsumerGroupDescription(groupDescs, groupID) + if groupDesc == nil { + t.Fatalf("Consumer group %s should be present\n", groupID) + return + } + complete = groupDesc.State == waitState && + len(groupDesc.Members) == len(consumers) + for _, consumer := range consumers { + consumer.Poll(5 * 1000) + } + time.Sleep(time.Second) + } + return +} + var testMsgsInit = false var p0TestMsgs []*testmsgType // partition 0 test messages // pAllTestMsgs holds messages for various partitions including PartitionAny and invalid partitions @@ -598,12 +627,12 @@ func validateConfig(t *testing.T, results []ConfigResourceResult, expResults []C type IntegrationTestSuite struct { suite.Suite - compose *compose.LocalDockerCompose + compose compose.ComposeStack } func (its *IntegrationTestSuite) TearDownSuite() { if testconf.DockerNeeded && its.compose != nil { - its.compose.Down() + its.compose.Down(context.Background()) } } @@ -1045,6 +1074,9 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups() // Call Poll to trigger a rebalance and give it enough time to finish. consumer1.Poll(10 * 1000) + waitGroupState(t, ac, groupID, []*Consumer{consumer1}, + ConsumerGroupStateStable) + // Check the existence of the group. ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) defer cancel() @@ -1116,26 +1148,8 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAndDescribeConsumerGroups() // that it becomes stable. // We need to make sure that the consumer group stabilizes since we will // check for the state later on. - consumer2.Poll(5 * 1000) - consumer1.Poll(5 * 1000) - isGroupStable := false - for !isGroupStable { - ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - groupDescResult, err = ac.DescribeConsumerGroups(ctx, []string{groupID}, SetAdminRequestTimeout(30*time.Second)) - if err != nil { - t.Errorf("Error describing consumer groups %s\n", err) - return - } - groupDescs = groupDescResult.ConsumerGroupDescriptions - groupDesc = findConsumerGroupDescription(groupDescs, groupID) - if groupDesc == nil { - t.Errorf("Consumer group %s should be present\n", groupID) - return - } - isGroupStable = groupDesc.State == ConsumerGroupStateStable - time.Sleep(time.Second) - } + groupDesc = waitGroupState(t, ac, groupID, []*Consumer{consumer1, consumer2}, + ConsumerGroupStateStable) clientIDToPartitions[clientID1] = []TopicPartition{ {Topic: &topic, Partition: 0, Offset: OffsetInvalid}, @@ -2231,6 +2245,8 @@ func (its *IntegrationTestSuite) TestAdminACLs() { checkExpectedResult(expectedCreateACLs, resultCreateACLs) } + time.Sleep(1 * time.Second) + // DescribeACLs must return the three ACLs ctx, cancel = context.WithTimeout(context.Background(), maxDuration) defer cancel() @@ -2280,6 +2296,7 @@ func (its *IntegrationTestSuite) TestAdminACLs() { } checkExpectedResult(expectedDeleteACLs, resultDeleteACLs) + time.Sleep(1 * time.Second) // All the ACLs should have been deleted ctx, cancel = context.WithTimeout(context.Background(), maxDuration) defer cancel() @@ -2336,6 +2353,8 @@ func (its *IntegrationTestSuite) TestAdminClient_ListAllConsumerGroupsOffsets() } } + time.Sleep(1 * time.Second) + // Join a consumer group and subscribe to the created topics, // commit some offsets to read later. group := fmt.Sprintf("%s-%d", testconf.GroupID, rand.Intn(100000)) @@ -3164,7 +3183,8 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { } defer ac.Close() - users := []string{"non-existent"} + user := fmt.Sprintf("UserScramCredentials-%d", rand.Int()) + users := []string{user} // Call DescribeUserScramCredentials ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) @@ -3190,7 +3210,7 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { // Call AlterUserScramCredentials for Upsert upsertions := []UserScramCredentialUpsertion{ { - User: "non-existent", + User: user, ScramCredentialInfo: ScramCredentialInfo{ Mechanism: ScramMechanismSHA256, Iterations: 10000}, Password: []byte("password"), @@ -3201,6 +3221,9 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { defer cancel() alterRes, alterErr := ac.AlterUserScramCredentials(ctx, upsertions, nil) + // Wait for propagation + time.Sleep(1 * time.Second) + // Check Upsert result if alterErr != nil { t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", alterErr) @@ -3243,7 +3266,7 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { // Call AlterUserScramCredentials for Delete deletions := []UserScramCredentialDeletion{ - {User: "non-existent", Mechanism: ScramMechanismSHA256}} + {User: user, Mechanism: ScramMechanismSHA256}} ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) defer cancel() alterRes, alterErr = ac.AlterUserScramCredentials(ctx, nil, deletions) @@ -3260,6 +3283,9 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { t.Fatalf("Error code should be ErrNoError instead it is %d", kErr.Code()) } + // Wait for propagation + time.Sleep(1 * time.Second) + // Call DescribeUserScramCredentials to verify delete ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) defer cancel() @@ -3489,14 +3515,24 @@ func TestIntegration(t *testing.T) { return } if testconf.DockerNeeded && !testconf.DockerExists { + // Ryuk is terminating the containers before + os.Setenv("TESTCONTAINERS_RYUK_DISABLED", "true") + dockerCompose := "./testresources/docker-compose.yaml" if !testConsumerGroupProtocolClassic() { dockerCompose = "./testresources/docker-compose-kraft.yaml" } - its.compose = compose.NewLocalDockerCompose([]string{dockerCompose}, "test-docker") - execErr := its.compose.WithCommand([]string{"up", "-d"}).Invoke() - if err := execErr.Error; err != nil { - t.Fatalf("up -d command failed with the error message %s\n", err) + composeInstance, err := compose.NewDockerComposeWith( + compose.WithStackFiles(dockerCompose), + compose.StackIdentifier("test-docker")) + if err != nil { + t.Fatalf("NewDockerComposeWith failed with the error %s\n", err) + } + its.compose = composeInstance + + err = its.compose.Up(context.Background(), compose.Wait(true)) + if err != nil { + t.Fatalf("Up command failed with the error message %s\n", err) } // It takes some time after the containers come up for them to be ready. time.Sleep(20 * time.Second) diff --git a/kafka/testhelpers_test.go b/kafka/testhelpers_test.go index 3247e4521..ec17e74fb 100644 --- a/kafka/testhelpers_test.go +++ b/kafka/testhelpers_test.go @@ -408,6 +408,7 @@ func createTestTopic(t *testing.T, suffix string, numPartitions int, replication } } + time.Sleep(1 * time.Second) return topic } diff --git a/kafka/testresources/docker-compose-kraft.yaml b/kafka/testresources/docker-compose-kraft.yaml index 8f6e88216..4de0955cd 100644 --- a/kafka/testresources/docker-compose-kraft.yaml +++ b/kafka/testresources/docker-compose-kraft.yaml @@ -2,7 +2,7 @@ version: '3' services: kafka: build: ./kraft - restart: always + restart: unless-stopped ports: - 9092:29092 - 9093:29093 diff --git a/kafka/testresources/docker-compose.yaml b/kafka/testresources/docker-compose.yaml index e3a0fb1cd..0f8055929 100644 --- a/kafka/testresources/docker-compose.yaml +++ b/kafka/testresources/docker-compose.yaml @@ -2,11 +2,13 @@ version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.6.2 + restart: unless-stopped environment: ZOOKEEPER_CLIENT_PORT: 2181 zookeeper_auth: image: confluentinc/cp-zookeeper + restart: unless-stopped environment: ZOOKEEPER_CLIENT_PORT: 2182 KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf" @@ -15,6 +17,7 @@ services: kafka: image: confluentinc/cp-kafka:7.6.2 + restart: unless-stopped depends_on: - zookeeper ports: @@ -34,6 +37,7 @@ services: kafka_auth: image: confluentinc/cp-kafka:7.6.2 + restart: unless-stopped depends_on: - zookeeper_auth ports: From fb304b588eb89cfd6ea8bbcb40e98d235afbe78e Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 3 Feb 2025 16:45:56 +0100 Subject: [PATCH 2/2] Changes needed for 4.0 --- kafka/integration_test.go | 45 ++++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/kafka/integration_test.go b/kafka/integration_test.go index 2ec3e746f..a55e1b9e4 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -2120,7 +2120,10 @@ func (its *IntegrationTestSuite) TestAdminACLs() { topic := testconf.TopicName group := testconf.GroupID noError := NewError(ErrNoError, "", false) - unknownError := NewError(ErrUnknown, "Unknown broker error", false) + wrongPrincipalError := NewError(ErrInvalidRequest, + "Could not parse principal "+ + "from `wrong-principal` (no colon is present separating"+ + " the principal type from the principal name)", false) var expectedCreateACLs []CreateACLResult var expectedDescribeACLs DescribeACLsResult var expectedDeleteACLs []DeleteACLsResult @@ -2241,7 +2244,7 @@ func (its *IntegrationTestSuite) TestAdminACLs() { if err != nil { t.Fatalf("CreateACLs() failed: %s", err) } - expectedCreateACLs = []CreateACLResult{{Error: unknownError}} + expectedCreateACLs = []CreateACLResult{{Error: wrongPrincipalError}} checkExpectedResult(expectedCreateACLs, resultCreateACLs) } @@ -2964,11 +2967,12 @@ func (its *IntegrationTestSuite) TestProducerConsumerTimestamps() { drChan := make(chan Event, 1) - /* Offset the timestamp to avoid comparison with system clock */ - future, _ := time.ParseDuration("87658h") // 10y + /* Messages produced with a timestamp more than 1 hour in the future + * are rejected. */ + future, _ := time.ParseDuration("2h") timestamp := time.Now().Add(future) key := fmt.Sprintf("TS: %v", timestamp) - t.Logf("Producing message with timestamp %v", timestamp) + t.Logf("Producing message with timestamp %v, should fail", timestamp) err = p.Produce(&Message{ TopicPartition: TopicPartition{Topic: &testconf.TopicName, Partition: 0}, Key: []byte(key), @@ -2986,6 +2990,37 @@ func (its *IntegrationTestSuite) TestProducerConsumerTimestamps() { if !ok { t.Fatalf("drChan: Expected *Message, got %v", ev) } + if m.TopicPartition.Error == nil { + t.Fatalf("Delivery should fail, got no Error") + } + + if m.TopicPartition.Error.Error() != "Broker: Invalid timestamp" { + t.Fatalf( + "Delivery should fail with error 'Broker: Invalid timestamp', not %v", + m.TopicPartition.Error) + } + + future, _ = time.ParseDuration("1h") + timestamp = time.Now().Add(future) + key = fmt.Sprintf("TS: %v", timestamp) + t.Logf("Producing message with timestamp %v, should succeed", timestamp) + err = p.Produce(&Message{ + TopicPartition: TopicPartition{Topic: &testconf.TopicName, Partition: 0}, + Key: []byte(key), + Timestamp: timestamp}, + drChan) + + if err != nil { + t.Fatalf("Produce: %v", err) + } + + // Wait for delivery + t.Logf("Awaiting delivery report") + ev = <-drChan + m, ok = ev.(*Message) + if !ok { + t.Fatalf("drChan: Expected *Message, got %v", ev) + } if m.TopicPartition.Error != nil { t.Fatalf("Delivery failed: %v", m.TopicPartition) }