-
Notifications
You must be signed in to change notification settings - Fork 663
Description
Description:
We are experiencing a significant issue in our Kubernetes environment where all Kafka REST Proxy consumer APIs (including /commit and /records API) will hang when any 1 consumer group is being rebalanced, even if the consumer group is consuming from unrelated topics. This results in delays of 3-5 minutes where all REST Proxy consumer requests hang, which affects processing time and leads to timeouts in client applications.
Context:
- Kafka REST Proxy is being used to poll messages regularly in our Kubernetes cluster.
- Consumer groups are consuming from different topics (non-overlapping), so ideally, rebalancing one group shouldn't affect the others.
- The issue arises when one consumer group undergoes a rebalance. Even though other consumer groups are consuming from their own unique topics, their REST Proxy consumer requests to /records and /commit hang for the duration of the rebalance.
- During a rebalance a client requests times out, the client retries the request.
- When the rebalance completes, the client receives a response for the second retry request but the response includes more recent records, effectively skipping earlier offsets.
- The Kafka REST Proxy logs show that the first request (which the client timed out on) eventually completes after 5-6 minutes with a 200 response, despite the client timing out — these records are lost & this explains the jump in offsets for records included in the second retry request.
- I am looking to understand why all consumer groups are affected during the rebalance of one group, and if there are any configurations or changes to the REST Proxy that can reduce the rebalance impact on unrelated consumer groups.
Reproduction Steps:
- Set up Kafka REST Proxy with multiple consumer groups, each consuming from different, non-overlapping topics.
- Initiate regular polling for messages from Kafka REST Proxy using the /records API, for 3 or more separate k8s deployments of Node.js Kafka consumer workers (all with their own unique consumer group).
- Trigger a rebalance for one of the consumer groups.
- Observe that all consumer groups experience a hang while the rebalance is ongoing, despite them consuming from different topics.
- After some time (around ~3 minutes) the client’s first GET /records request times out for a given consumer worker. On retry, the second request will eventually return a response once the rebalance has completed, however more recent records are returned, whereas earlier offsets we have not yet process have been seemingly skipped.
Observations:
- Kafka REST Proxy logs show the first request that timed out eventually returns a 200 response after several minutes.
- During the rebalance, all consumer requests (across different consumer groups) are delayed or blocked.
- When the rebalance completes, the Kafka REST Proxy responds with a mix of old and new records, causing clients to miss records.
- All of this happens despite the consumer groups having separate topics and partitions.
Config Settings (from what I've tried so far):
[main] INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [b-1.amazonaws.com:9096, b-3.kafka.us-east-1.amazonaws.com:9096, b-2.amazonaws.com:9096]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-512
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /var/ssl/private/kafka.broker.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
[2025-02-06 04:21:39,800] INFO KafkaRestConfig values:
access.control.allow.headers =
access.control.allow.methods =
access.control.allow.origin =
access.control.skip.options = true
advertised.listeners = []
api.endpoints.blocklist = []
api.v2.enable = true
api.v3.enable = true
authentication.method = BASIC
authentication.realm = KafkaRest
authentication.roles = [user]
authentication.skip.paths = []
bootstrap.servers = b-1.amazonaws.com:9096,b-3.amazonaws.com:9096,b-2.amazonaws.com:9096
client.init.timeout.ms = 60000
client.sasl.kerberos.kinit.cmd = /usr/bin/kinit
client.sasl.kerberos.min.time.before.relogin = 60000
client.sasl.kerberos.service.name =
client.sasl.kerberos.ticket.renew.jitter = 0.05
client.sasl.kerberos.ticket.renew.window.factor = 0.8
client.sasl.mechanism = SCRAM-SHA-512
client.security.protocol = SASL_SSL
client.ssl.cipher.suites =
client.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
client.ssl.endpoint.identification.algorithm =
client.ssl.key.password = [hidden]
client.ssl.keymanager.algorithm = SunX509
client.ssl.keystore.location =
client.ssl.keystore.password = [hidden]
client.ssl.keystore.type = JKS
client.ssl.protocol = TLS
client.ssl.provider =
client.ssl.trustmanager.algorithm = PKIX
client.ssl.truststore.location = /var/ssl/private/kafka.broker.truststore.jks
client.ssl.truststore.password = [hidden]
client.ssl.truststore.type = JKS
client.timeout.ms = 500
client.zk.session.timeout.ms = 30000
compression.enable = true
confluent.resource.name.authority =
consumer.instance.timeout.ms = 300000
consumer.iterator.backoff.ms = 50
consumer.iterator.timeout.ms = 1
consumer.request.max.bytes = 67108864
consumer.request.timeout.ms = 30000
consumer.threads = 50
csrf.prevention.enable = false
csrf.prevention.token.endpoint = /csrf
csrf.prevention.token.expiration.minutes = 30
csrf.prevention.token.max.entries = 10000
debug = false
dos.filter.delay.ms = 100
dos.filter.enabled = false
dos.filter.insert.headers = true
dos.filter.ip.whitelist = []
dos.filter.managed.attr = false
dos.filter.max.idle.tracker.ms = 30000
dos.filter.max.requests.ms = 30000
dos.filter.max.requests.per.sec = 25
dos.filter.max.wait.ms = 50
dos.filter.remote.port = false
dos.filter.throttle.ms = 30000
dos.filter.throttled.requests = 5
dos.filter.track.sessions = true
fetch.min.bytes = -1
host.name = 10.42.20.28
id = kafka-rest-proxy-5f8fcf8c8-4szf7
idle.timeout.ms = 30000
kafka.rest.resource.extension.class = []
listeners = [http://0.0.0.0:8082]
metric.reporters = []
metrics.jmx.prefix = kafka.rest
metrics.num.samples = 2
metrics.sample.window.ms = 30000
metrics.tag.map = []
nosniff.prevention.enable = false
port = 8082
producer.threads = 5
request.logger.name = io.confluent.rest-utils.requests
request.queue.capacity = 2147483647
request.queue.capacity.growby = 64
request.queue.capacity.init = 128
resource.extension.classes = []
response.http.headers.config =
response.mediatype.default = application/json
response.mediatype.preferred = [application/json, application/vnd.kafka.v2+json]
rest.servlet.initializor.classes = []
schema.registry.url = kafka-schema-registry:8081
shutdown.graceful.ms = 1000
simpleconsumer.pool.size.max = 25
simpleconsumer.pool.timeout.ms = 1000
ssl.cipher.suites = []
ssl.client.auth = false
ssl.client.authentication = NONE
ssl.enabled.protocols = []
ssl.endpoint.identification.algorithm = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm =
ssl.keystore.location =
ssl.keystore.password = [hidden]
ssl.keystore.reload = false
ssl.keystore.type = JKS
ssl.keystore.watch.location =
ssl.protocol = TLS
ssl.provider =
ssl.trustmanager.algorithm =
ssl.truststore.location =
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
thread.pool.max = 200
thread.pool.min = 8
websocket.path.prefix = /ws
websocket.servlet.initializor.classes = []
zookeeper.connect =
[2025-02-06 04:21:41,901] INFO SchemaRegistryConfig values:
auto.register.schemas = false
basic.auth.credentials.source = URL
basic.auth.user.info = [hidden]
bearer.auth.credentials.source = STATIC_TOKEN
bearer.auth.token = [hidden]
http.connect.timeout.ms = 60000
http.read.timeout.ms = 60000
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
latest.compatibility.strict = true
max.schemas.per.subject = 1000
proxy.host =
proxy.port = -1
schema.reflection = false
schema.registry.basic.auth.user.info = [hidden]
schema.registry.ssl.cipher.suites = null
schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
schema.registry.ssl.endpoint.identification.algorithm = https
schema.registry.ssl.engine.factory.class = null
schema.registry.ssl.key.password = null
schema.registry.ssl.keymanager.algorithm = SunX509
schema.registry.ssl.keystore.certificate.chain = null
schema.registry.ssl.keystore.key = null
schema.registry.ssl.keystore.location = null
schema.registry.ssl.keystore.password = null
schema.registry.ssl.keystore.type = JKS
schema.registry.ssl.protocol = TLSv1.3
schema.registry.ssl.provider = null
schema.registry.ssl.secure.random.implementation = null
schema.registry.ssl.trustmanager.algorithm = PKIX
schema.registry.ssl.truststore.certificates = null
schema.registry.ssl.truststore.location = null
schema.registry.ssl.truststore.password = null
schema.registry.ssl.truststore.type = JKS
schema.registry.url = [kafka-schema-registry:8081]
use.latest.version = false
[2025-02-06 16:49:16,918] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [b-1.amazonaws.com:9096, b-3.amazonaws.com:9096, b-2.amazonaws.com:9096]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-kafka-replication-worker-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = kafka-replication-worker-1
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 30
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = SCRAM-SHA-512
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /var/ssl/private/kafka.broker.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
Expected Behavior:
- Consumer groups should not interfere with each other during rebalances, especially if they are consuming from different topics.
- The Kafka REST Proxy should be able to handle rebalances more gracefully, ensuring other consumer groups do not experience delays or timeouts.
Questions/Requests for Help:
- Why do all consumer groups get affected during a rebalance of just one group? Is this expected behavior due to how Kafka handles consumer group metadata updates or partition assignments during rebalancing?
- Are there any Kafka REST Proxy configurations that we can adjust to minimize the impact of rebalances on unrelated consumer groups? Could global metadata updates or the partition assignment process be causing delays for other unrelated consumer groups? If so, how can we isolate this?
- Is it possible to configure Kafka REST Proxy to timeout early (or immediately) when a rebalance is detected, rather than hanging until the rebalance completes & returning a response long after the client has timed out?
Environment Details:
- Kafka REST Proxy version: cp-kafka-rest:6.2.13
- Kafka version: 2.8.1
- Kubernetes version: 1.17.6
- Consumer client: https://www.npmjs.com/package/got
- Kafka setup: Amazon Managed Streaming for Apache Kafka (MSK)
If there are any logs, stack traces, or specific configurations (e.g., broker settings) that may help debug this issue, I can provide those upon request.
Thank you for any assistance you can provide, and I’m happy to help clarify further if needed.