Skip to content

2.2.11 causes warnings and test failures not seen on older versions #2641

@benlenton

Description

@benlenton

Overview

The latest version at time of writing - 2.2.11 - is causing our Kafka unit tests to fail:

09:41:39  test_send_and_receive (core.tests.messaging.test_kafka.KafkaBasicTest.test_send_and_receive)
09:41:40  Send and receive a message. ...
09:41:40  2025-06-09 08:41:39,907 kafka.cluster WARNING  Topic messaging-test is not available during auto-create initialization
09:41:43  2025-06-09 08:41:43,134 kafka.coordinator.assignors.range WARNING  No partition metadata for topic messaging-test
09:41:50  2025-06-09 08:41:49,023 kafka.coordinator.heartbeat WARNING  Heartbeat thread did not fully terminate during close
09:41:50  FAIL
...
09:41:55  test_can_request_retry (core.tests.messaging.test_kafka.KafkaRequestRetryTest.test_can_request_retry)
09:41:58  Request a retry i.e. when TransientProcessingException is observed. ... FAIL

When we pin the version to 2.2.10, the tests pass (a warning message is emitted but we're not so concerned about that currently):

kafka.coordinator.heartbeat WARNING  Heartbeat thread did not fully terminate during close

It seems that the key ("messaging-test") that we use to guarantee strict message ordering is not being respected because the topic (also "messaging-test") is not available during auto-create initialization, hence the messages are arriving out of order.

We use bitnami/kafka:3.5.1 as the container image.

Description

Test failure 1

We mock out Kafka for most unit tests and only have a couple which test its basic functionality. test_send_and_receive is relatively simple:

def test_send_and_receive(self):
    """Send and receive a message."""
    m = {"x": "y"}
    # send the message, immediately closing the producer
    self.producer.send(m, key=TEST_KEY)
    self.producer.close()
    # receive the message, immediately acknowledging and closing the consumer
    received = self.consumer.receive()
    self.consumer.acknowledge()
    self.consumer.close()
    # check the message is what we expected
    self.assertEqual(m, received)

With 2.2.11 this results in AssertionError: {'x': 'y'} != None, suggesting that the consumer receives nothing.

Test failure 2

The following test, test_can_request_retry, sends five messages with data {"value": i} where i is 0-4:

def test_can_request_retry(self):
    """Request a retry i.e. when TransientProcessingException is observed."""
    # send some messages, immediately closing the producer
    for i in range(5):
        self.producer.send({"value": i}, TEST_KEY)
    self.producer.close()
    # receive and acknowledge 3 messages
    for i in range(3):
        received = self.consumer.receive()
        self.assertEqual(received, {"value": i})
        self.consumer.acknowledge()
    # receive next message and request retry ten times
    received = self.consumer.receive()
    self.assertEqual(received, {"value": 3})
    for i in range(10):
        self.consumer.request_retry()
        received = self.consumer.receive()
        self.assertEqual(received, {"value": 3})
    # finally acknowledge the message
    self.consumer.acknowledge()
    # receive the next message and tidy up
    received = self.consumer.receive()
    self.assertEqual(received, {"value": 4})
    self.consumer.acknowledge()
    self.consumer.close()

However, the consumer in this test receives the message sent in the previous test and fails before receiving any further messages:

09:44:03  AssertionError: {'x': 'y'} != {'value': 0}
09:44:03  - {'x': 'y'}
09:44:03  + {'value': 0}

Test context

For completeness, these two test cases inherit from the same base class:

class KafkaTestCase(TestCase):
    """
    Typically the producer and consumer objects would be long-lived.
    Here they are closed immediately to minimise the window for test collisions.

    These would ideally be the _only_ tests to actually send and receive messages for real, with
    all other tests checking the interaction with the producer and sender via mocks.
    """

    def setUp(self):
        self.producer = Producer(TEST_TOPIC)
        self.consumer = Consumer(TEST_TOPIC, TEST_GROUP)
        # clean up to 500 lingering messages from the test topic to avoid stale messages causing test failure
        for _ in range(500):
            received = self.consumer.receive()
            if received is None:
                break

The constants referenced in these examples are:

TEST_TOPIC = "messaging-test"
TEST_GROUP = "messaging-test-group"
TEST_KEY = b"messaging-test"  # used to guarantee strict message ordering

Questions

Why is this now an issue for us, as of 2.2.11? Presumably it is a result of one of these changes:

However, we are unsure which one or why. Is there something we need to do to resolve this, or does this need to be addressed in the kafka-python repository?

Happy to provide further information as required. Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions