-
Notifications
You must be signed in to change notification settings - Fork 155
Description
Hi,
Its seems to me that the compatibility with a new version of Apache-client 3.9.1 with the current implementation of parallel consumer does not work anymore.
It seems that this newer version remove the "org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer" and replace it with "org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer", this leads to an issue on getAutoCommitEnabled method where we always enter inside the else part
if ("org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer".equals(delegate.getClass().getName())) {
final boolean autoCommitEnabled = getAutoCommitEnabledFromCoordinator(delegate.getClass(), delegate);
return Optional.of(autoCommitEnabled);
} else if ("org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer".equals(delegate.getClass().getName())) {
final Field autoCommitEnabledField = delegate.getClass().getDeclaredField("autoCommitEnabled"); //NoSuchFieldException
autoCommitEnabledField.setAccessible(true);
final boolean autoCommitEnabled = (boolean) autoCommitEnabledField.get(delegate); //IllegalAccessException
return Optional.of(autoCommitEnabled);
} else {
log.warn("Encountered unknown consumer delegate {}", consumer.getClass());
return Optional.empty();
}
Thanks a lot for you help.
Kind regards olivier