Skip to content

Kafka REST Proxy Polling Hanging During Consumer Group Rebalance #1350

@jc-sea

Description

@jc-sea

Description:

We are experiencing a significant issue in our Kubernetes environment where all Kafka REST Proxy consumer APIs (including /commit and /records API) will hang when any 1 consumer group is being rebalanced, even if the consumer group is consuming from unrelated topics. This results in delays of 3-5 minutes where all REST Proxy consumer requests hang, which affects processing time and leads to timeouts in client applications.

Context:

  • Kafka REST Proxy is being used to poll messages regularly in our Kubernetes cluster.
  • Consumer groups are consuming from different topics (non-overlapping), so ideally, rebalancing one group shouldn't affect the others.
  • The issue arises when one consumer group undergoes a rebalance. Even though other consumer groups are consuming from their own unique topics, their REST Proxy consumer requests to /records and /commit hang for the duration of the rebalance.
  • During a rebalance a client requests times out, the client retries the request.
  • When the rebalance completes, the client receives a response for the second retry request but the response includes more recent records, effectively skipping earlier offsets.
  • The Kafka REST Proxy logs show that the first request (which the client timed out on) eventually completes after 5-6 minutes with a 200 response, despite the client timing out — these records are lost & this explains the jump in offsets for records included in the second retry request.
  • I am looking to understand why all consumer groups are affected during the rebalance of one group, and if there are any configurations or changes to the REST Proxy that can reduce the rebalance impact on unrelated consumer groups.

Reproduction Steps:

  1. Set up Kafka REST Proxy with multiple consumer groups, each consuming from different, non-overlapping topics.
  2. Initiate regular polling for messages from Kafka REST Proxy using the /records API, for 3 or more separate k8s deployments of Node.js Kafka consumer workers (all with their own unique consumer group).
  3. Trigger a rebalance for one of the consumer groups.
  4. Observe that all consumer groups experience a hang while the rebalance is ongoing, despite them consuming from different topics.
  5. After some time (around ~3 minutes) the client’s first GET /records request times out for a given consumer worker. On retry, the second request will eventually return a response once the rebalance has completed, however more recent records are returned, whereas earlier offsets we have not yet process have been seemingly skipped.

Observations:

  1. Kafka REST Proxy logs show the first request that timed out eventually returns a 200 response after several minutes.
  2. During the rebalance, all consumer requests (across different consumer groups) are delayed or blocked.
  3. When the rebalance completes, the Kafka REST Proxy responds with a mix of old and new records, causing clients to miss records.
  4. All of this happens despite the consumer groups having separate topics and partitions.

Config Settings (from what I've tried so far):

[main] INFO org.apache.kafka.clients.admin.AdminClientConfig - AdminClientConfig values: 
	bootstrap.servers = [b-1.amazonaws.com:9096, b-3.kafka.us-east-1.amazonaws.com:9096, b-2.amazonaws.com:9096] 
	client.dns.lookup = use_all_dns_ips 
	client.id = 
	connections.max.idle.ms = 300000 
	default.api.timeout.ms = 60000 
	metadata.max.age.ms = 300000 
	metric.reporters = [] 
	metrics.num.samples = 2 
	metrics.recording.level = INFO 
	metrics.sample.window.ms = 30000 
	receive.buffer.bytes = 65536 
	reconnect.backoff.max.ms = 1000 
	reconnect.backoff.ms = 50 
	request.timeout.ms = 30000 
	retries = 2147483647 
	retry.backoff.ms = 100 
	sasl.client.callback.handler.class = null 
	sasl.jaas.config = [hidden] 
	sasl.kerberos.kinit.cmd = /usr/bin/kinit 
	sasl.kerberos.min.time.before.relogin = 60000 
	sasl.kerberos.service.name = null 
	sasl.kerberos.ticket.renew.jitter = 0.05 
	sasl.kerberos.ticket.renew.window.factor = 0.8 
	sasl.login.callback.handler.class = null 
	sasl.login.class = null 
	sasl.login.refresh.buffer.seconds = 300 
	sasl.login.refresh.min.period.seconds = 60 
	sasl.login.refresh.window.factor = 0.8 
	sasl.login.refresh.window.jitter = 0.05 
	sasl.mechanism = SCRAM-SHA-512 
	security.protocol = SASL_SSL 
	security.providers = null 
	send.buffer.bytes = 131072 
	socket.connection.setup.timeout.max.ms = 30000 
	socket.connection.setup.timeout.ms = 10000 
	ssl.cipher.suites = null 
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3] 
	ssl.endpoint.identification.algorithm = https 
	ssl.engine.factory.class = null 
	ssl.key.password = null 
	ssl.keymanager.algorithm = SunX509 
	ssl.keystore.certificate.chain = null 
	ssl.keystore.key = null 
	ssl.keystore.location = null 
	ssl.keystore.password = null 
	ssl.keystore.type = JKS 
	ssl.protocol = TLSv1.3 
	ssl.provider = null 
	ssl.secure.random.implementation = null 
	ssl.trustmanager.algorithm = PKIX 
	ssl.truststore.certificates = null 
	ssl.truststore.location = /var/ssl/private/kafka.broker.truststore.jks 
	ssl.truststore.password = [hidden] 
	ssl.truststore.type = JKS 
[2025-02-06 04:21:39,800] INFO KafkaRestConfig values: 
	access.control.allow.headers = 
	access.control.allow.methods = 
	access.control.allow.origin = 
	access.control.skip.options = true 
	advertised.listeners = [] 
	api.endpoints.blocklist = [] 
	api.v2.enable = true 
	api.v3.enable = true 
	authentication.method = BASIC 
	authentication.realm = KafkaRest 
	authentication.roles = [user] 
	authentication.skip.paths = [] 
	bootstrap.servers = b-1.amazonaws.com:9096,b-3.amazonaws.com:9096,b-2.amazonaws.com:9096 
	client.init.timeout.ms = 60000 
	client.sasl.kerberos.kinit.cmd = /usr/bin/kinit 
	client.sasl.kerberos.min.time.before.relogin = 60000 
	client.sasl.kerberos.service.name = 
	client.sasl.kerberos.ticket.renew.jitter = 0.05 
	client.sasl.kerberos.ticket.renew.window.factor = 0.8 
	client.sasl.mechanism = SCRAM-SHA-512 
	client.security.protocol = SASL_SSL 
	client.ssl.cipher.suites = 
	client.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 
	client.ssl.endpoint.identification.algorithm = 
	client.ssl.key.password = [hidden] 
	client.ssl.keymanager.algorithm = SunX509 
	client.ssl.keystore.location = 
	client.ssl.keystore.password = [hidden] 
	client.ssl.keystore.type = JKS 
	client.ssl.protocol = TLS 
	client.ssl.provider = 
	client.ssl.trustmanager.algorithm = PKIX 
	client.ssl.truststore.location = /var/ssl/private/kafka.broker.truststore.jks 
	client.ssl.truststore.password = [hidden] 
	client.ssl.truststore.type = JKS 
	client.timeout.ms = 500 
	client.zk.session.timeout.ms = 30000 
	compression.enable = true 
	confluent.resource.name.authority = 
	consumer.instance.timeout.ms = 300000 
	consumer.iterator.backoff.ms = 50 
	consumer.iterator.timeout.ms = 1 
	consumer.request.max.bytes = 67108864 
	consumer.request.timeout.ms = 30000 
	consumer.threads = 50 
	csrf.prevention.enable = false 
	csrf.prevention.token.endpoint = /csrf 
	csrf.prevention.token.expiration.minutes = 30 
	csrf.prevention.token.max.entries = 10000 
	debug = false 
	dos.filter.delay.ms = 100 
	dos.filter.enabled = false 
	dos.filter.insert.headers = true 
	dos.filter.ip.whitelist = [] 
	dos.filter.managed.attr = false 
	dos.filter.max.idle.tracker.ms = 30000 
	dos.filter.max.requests.ms = 30000 
	dos.filter.max.requests.per.sec = 25 
	dos.filter.max.wait.ms = 50 
	dos.filter.remote.port = false 
	dos.filter.throttle.ms = 30000 
	dos.filter.throttled.requests = 5 
	dos.filter.track.sessions = true 
	fetch.min.bytes = -1 
	host.name = 10.42.20.28 
	id = kafka-rest-proxy-5f8fcf8c8-4szf7 
	idle.timeout.ms = 30000 
	kafka.rest.resource.extension.class = [] 
	listeners = [http://0.0.0.0:8082] 
	metric.reporters = [] 
	metrics.jmx.prefix = kafka.rest 
	metrics.num.samples = 2 
	metrics.sample.window.ms = 30000 
	metrics.tag.map = [] 
	nosniff.prevention.enable = false 
	port = 8082 
	producer.threads = 5 
	request.logger.name = io.confluent.rest-utils.requests 
	request.queue.capacity = 2147483647 
	request.queue.capacity.growby = 64 
	request.queue.capacity.init = 128 
	resource.extension.classes = [] 
	response.http.headers.config = 
	response.mediatype.default = application/json 
	response.mediatype.preferred = [application/json, application/vnd.kafka.v2+json] 
	rest.servlet.initializor.classes = [] 
	schema.registry.url = kafka-schema-registry:8081 
	shutdown.graceful.ms = 1000 
	simpleconsumer.pool.size.max = 25 
	simpleconsumer.pool.timeout.ms = 1000 
	ssl.cipher.suites = [] 
	ssl.client.auth = false 
	ssl.client.authentication = NONE 
	ssl.enabled.protocols = [] 
	ssl.endpoint.identification.algorithm = null 
	ssl.key.password = [hidden] 
	ssl.keymanager.algorithm = 
	ssl.keystore.location = 
	ssl.keystore.password = [hidden] 
	ssl.keystore.reload = false 
	ssl.keystore.type = JKS 
	ssl.keystore.watch.location = 
	ssl.protocol = TLS 
	ssl.provider = 
	ssl.trustmanager.algorithm = 
	ssl.truststore.location = 
	ssl.truststore.password = [hidden] 
	ssl.truststore.type = JKS 
	thread.pool.max = 200 
	thread.pool.min = 8 
	websocket.path.prefix = /ws 
	websocket.servlet.initializor.classes = [] 
	zookeeper.connect = 
[2025-02-06 04:21:41,901] INFO SchemaRegistryConfig values: 
	auto.register.schemas = false 
	basic.auth.credentials.source = URL 
	basic.auth.user.info = [hidden] 
	bearer.auth.credentials.source = STATIC_TOKEN 
	bearer.auth.token = [hidden] 
	http.connect.timeout.ms = 60000 
	http.read.timeout.ms = 60000 
	key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy 
	latest.compatibility.strict = true 
	max.schemas.per.subject = 1000 
	proxy.host = 
	proxy.port = -1 
	schema.reflection = false 
	schema.registry.basic.auth.user.info = [hidden] 
	schema.registry.ssl.cipher.suites = null 
	schema.registry.ssl.enabled.protocols = [TLSv1.2, TLSv1.3] 
	schema.registry.ssl.endpoint.identification.algorithm = https 
	schema.registry.ssl.engine.factory.class = null 
	schema.registry.ssl.key.password = null 
	schema.registry.ssl.keymanager.algorithm = SunX509 
	schema.registry.ssl.keystore.certificate.chain = null 
	schema.registry.ssl.keystore.key = null 
	schema.registry.ssl.keystore.location = null 
	schema.registry.ssl.keystore.password = null 
	schema.registry.ssl.keystore.type = JKS 
	schema.registry.ssl.protocol = TLSv1.3 
	schema.registry.ssl.provider = null 
	schema.registry.ssl.secure.random.implementation = null 
	schema.registry.ssl.trustmanager.algorithm = PKIX 
	schema.registry.ssl.truststore.certificates = null 
	schema.registry.ssl.truststore.location = null 
	schema.registry.ssl.truststore.password = null 
	schema.registry.ssl.truststore.type = JKS 
	schema.registry.url = [kafka-schema-registry:8081] 
	use.latest.version = false 
[2025-02-06 16:49:16,918] INFO ConsumerConfig values:
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [b-1.amazonaws.com:9096, b-3.amazonaws.com:9096, b-2.amazonaws.com:9096]
	check.crcs = true
	client.dns.lookup = use_all_dns_ips
	client.id = consumer-kafka-replication-worker-1
	client.rack =
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = kafka-replication-worker-1
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	internal.throw.on.fetch.stable.offset.unsupported = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 30
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = SCRAM-SHA-512
	security.protocol = SASL_SSL
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.3
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = /var/ssl/private/kafka.broker.truststore.jks
	ssl.truststore.password = [hidden]
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Expected Behavior:

  • Consumer groups should not interfere with each other during rebalances, especially if they are consuming from different topics.
  • The Kafka REST Proxy should be able to handle rebalances more gracefully, ensuring other consumer groups do not experience delays or timeouts.

Questions/Requests for Help:

  1. Why do all consumer groups get affected during a rebalance of just one group? Is this expected behavior due to how Kafka handles consumer group metadata updates or partition assignments during rebalancing?
  2. Are there any Kafka REST Proxy configurations that we can adjust to minimize the impact of rebalances on unrelated consumer groups? Could global metadata updates or the partition assignment process be causing delays for other unrelated consumer groups? If so, how can we isolate this?
  3. Is it possible to configure Kafka REST Proxy to timeout early (or immediately) when a rebalance is detected, rather than hanging until the rebalance completes & returning a response long after the client has timed out?

Environment Details:

If there are any logs, stack traces, or specific configurations (e.g., broker settings) that may help debug this issue, I can provide those upon request.

Thank you for any assistance you can provide, and I’m happy to help clarify further if needed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions