-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
Overview
The latest version at time of writing - 2.2.11
- is causing our Kafka unit tests to fail:
09:41:39 test_send_and_receive (core.tests.messaging.test_kafka.KafkaBasicTest.test_send_and_receive)
09:41:40 Send and receive a message. ...
09:41:40 2025-06-09 08:41:39,907 kafka.cluster WARNING Topic messaging-test is not available during auto-create initialization
09:41:43 2025-06-09 08:41:43,134 kafka.coordinator.assignors.range WARNING No partition metadata for topic messaging-test
09:41:50 2025-06-09 08:41:49,023 kafka.coordinator.heartbeat WARNING Heartbeat thread did not fully terminate during close
09:41:50 FAIL
...
09:41:55 test_can_request_retry (core.tests.messaging.test_kafka.KafkaRequestRetryTest.test_can_request_retry)
09:41:58 Request a retry i.e. when TransientProcessingException is observed. ... FAIL
When we pin the version to 2.2.10
, the tests pass (a warning message is emitted but we're not so concerned about that currently):
kafka.coordinator.heartbeat WARNING Heartbeat thread did not fully terminate during close
It seems that the key ("messaging-test"
) that we use to guarantee strict message ordering is not being respected because the topic (also "messaging-test"
) is not available during auto-create initialization, hence the messages are arriving out of order.
We use bitnami/kafka:3.5.1
as the container image.
Description
Test failure 1
We mock out Kafka for most unit tests and only have a couple which test its basic functionality. test_send_and_receive
is relatively simple:
def test_send_and_receive(self):
"""Send and receive a message."""
m = {"x": "y"}
# send the message, immediately closing the producer
self.producer.send(m, key=TEST_KEY)
self.producer.close()
# receive the message, immediately acknowledging and closing the consumer
received = self.consumer.receive()
self.consumer.acknowledge()
self.consumer.close()
# check the message is what we expected
self.assertEqual(m, received)
With 2.2.11
this results in AssertionError: {'x': 'y'} != None
, suggesting that the consumer receives nothing.
Test failure 2
The following test, test_can_request_retry
, sends five messages with data {"value": i}
where i
is 0-4
:
def test_can_request_retry(self):
"""Request a retry i.e. when TransientProcessingException is observed."""
# send some messages, immediately closing the producer
for i in range(5):
self.producer.send({"value": i}, TEST_KEY)
self.producer.close()
# receive and acknowledge 3 messages
for i in range(3):
received = self.consumer.receive()
self.assertEqual(received, {"value": i})
self.consumer.acknowledge()
# receive next message and request retry ten times
received = self.consumer.receive()
self.assertEqual(received, {"value": 3})
for i in range(10):
self.consumer.request_retry()
received = self.consumer.receive()
self.assertEqual(received, {"value": 3})
# finally acknowledge the message
self.consumer.acknowledge()
# receive the next message and tidy up
received = self.consumer.receive()
self.assertEqual(received, {"value": 4})
self.consumer.acknowledge()
self.consumer.close()
However, the consumer in this test receives the message sent in the previous test and fails before receiving any further messages:
09:44:03 AssertionError: {'x': 'y'} != {'value': 0}
09:44:03 - {'x': 'y'}
09:44:03 + {'value': 0}
Test context
For completeness, these two test cases inherit from the same base class:
class KafkaTestCase(TestCase):
"""
Typically the producer and consumer objects would be long-lived.
Here they are closed immediately to minimise the window for test collisions.
These would ideally be the _only_ tests to actually send and receive messages for real, with
all other tests checking the interaction with the producer and sender via mocks.
"""
def setUp(self):
self.producer = Producer(TEST_TOPIC)
self.consumer = Consumer(TEST_TOPIC, TEST_GROUP)
# clean up to 500 lingering messages from the test topic to avoid stale messages causing test failure
for _ in range(500):
received = self.consumer.receive()
if received is None:
break
The constants referenced in these examples are:
TEST_TOPIC = "messaging-test"
TEST_GROUP = "messaging-test-group"
TEST_KEY = b"messaging-test" # used to guarantee strict message ordering
Questions
Why is this now an issue for us, as of 2.2.11
? Presumably it is a result of one of these changes:
- Add synchronized decorator; add lock to subscription state #2636
- Do not ignore metadata response for single topic with error #2640
However, we are unsure which one or why. Is there something we need to do to resolve this, or does this need to be addressed in the kafka-python
repository?
Happy to provide further information as required. Thanks!