-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Open
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.7
Describe the bug
KafkaMessageListenerContainer tries to send offsets to a transaction when a record is filtered out by an early record interceptor and fails because there is no tx in progress (the interceptor is called before tx).
o.s.k.l.KafkaMessageListenerContainer 149 - Consumer exception
java.lang.IllegalStateException: Cannot send offsets if a transaction is not in progress (currentState= READY)
at org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:367)
at org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:785)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.sendOffsetsToTransaction(DefaultKafkaProducerFactory.java:1169)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doSendOffsets(KafkaMessageListenerContainer.java:3080)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.sendOffsetsToTransaction(KafkaMessageListenerContainer.java:3073)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.ackCurrent(KafkaMessageListenerContainer.java:3062)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.ackCurrent(KafkaMessageListenerContainer.java:3051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkEarlyIntercept(KafkaMessageListenerContainer.java:2673)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:2517)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2500)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1528)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1466)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1335)
To Reproduce
- Set up a transactional kafka listener
- Add an early (interceptBeforeTx is
true
) filtering RecordInterceptor - Send a record that is not filtered out by the interceptor (this step sets KafkaMessageListenerContainer.ListenerConsumer#producer)
- Send a record that is filtered out by the interceptor
Expected behavior
No offsets sending to a transaction.