Consumers as k8s deployment - failing with rebalances when scaling #6440
Replies: 3 comments 9 replies
-
I think running the clients in one consumer group as a single Deployment is fine. I don't think that alone is the problem. But I'm afraid I never used the .NET client and I never saw a problem like this in the Java client. |
Beta Was this translation helpful? Give feedback.
-
So I've managed to get more verbose logging from librdkafka. Whole log of consumer failing is below, but there are some points I've noticed in chronological order:
Last correctly processed message.
Sucessful commit of that last message
First time, consumer notices rebalance during heartbeat (there are 6 partitions and we are scaling consumers from 2 to 6)
Consumer is dumping all it's partition assignments
Done
Consumer got new partition assignments
Consumer receives first message from newly assigned parition
Trying to commit it with old generation ID 204. It still doesn't have new generation ID after rebalance!
Now, consumer receives new generation ID 205. But this is too late, since it already tried to commit message.
Commit with generation ID 204 rightfully rejected
Exception bubbled up to our application Log continues for a little while with another rebalance (in librdkafka thread), but at this point, application is shutting down due to unhandled ex and it's going to kill that kafka thread in a second. So, what feels weird to me is that consumer actually receives message, process it and tries to commit it even before rebalance is complete and GroupJoin response with new generation id is received. If so, than it sounds like problem of rebalance implementation in librdkafka? Whole log:
|
Beta Was this translation helpful? Give feedback.
-
Just FYI, I've also started issue on Confluent's repo, since it really seems to be very specific to their DotNet implementation (or maybe even librdkafka itself). Just if you want to catch up with that: confluentinc/confluent-kafka-dotnet#1771 I'll let you know, if we manage to resolve it there. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hello,
we are having quite strange issues with our consumers in k8s.
We are using Strimzi for "broker side" - mostly with defaults, running 3 instances of brokers and ZKs (quay.io/strimzi/kafka:0.25.0-kafka-2.7.0).
On client side, we have simple .NET app using Confluent's .NET Kafka SDK (basically just wrapper around librdkafka). We have k8s deployment for each consumer group with motivation to be able to scale clients as traffic demands.
When we start whole DP at once (let's say 3 instances), everything starts up fine. All clients joins empty consumer group, assigns partitions and start working.
But problem arises, when we try to scale that DP. Once we increase instance count (to 6 for example), the new instances seems to correctly join group and start consuming messages. But already existing instances fail with exception:
And to make things worse, those instances are terminated and k8s instantly tries to create new ones to replace them. This puts us into infinitely failing loop of new instances causing old ones to fail, so whole DP is useless at that moment.
All related problems, I've already found, deals with various timeouts - specifically session timeout, Fetch timeouts, maxBytes, auto commit handling etc. But even with various experiments with those parametes, I'm still not able to make it work.
Out settings:
SessionTimeout - default 45 sec
AutoCommit - disabled, we are commiting manually, once message is processed
FetchMaxBytes - default 52MB
HeartBeatInterval - default 3 sec
SocketTimeout - default 60 sec
Message size - around 1KB
One message process time - around 100ms (without any significant deviations)
Consumers are "way behind" at this stage - so for each start, they have enough data to catch and are probably fetching max ammount of data all the time
We are calling consumer.Close() on exception (and we observe rebalance starting right after instance fails)
What I've tried (separately):
I'm pretty sure, that there are no "long processing" issues - like some messages, which would take long time and than fails. With failing instaces, I can see in log, that it "flies", processing several messages per second and suddenly fails with aformentioned exception.
I'm not sure, if that is enough information, so anybody will see problem right away (which would be great!).
But I'm also interested in "best practices" of running consumers in k8s. I thought, that running each consumer group as DP actually fits perfectly with DPs purpose, since each client is kind of stateless and whole system should be able to scale up and down (and reschedule between nodes etc.) as needed.
But this "infinite fail loop" really terrifies me. Even, if we have some timeouts wrong, there would be always possibility, that one "bad message" could kill whole group. I'm still thinking, that we are still doing something wrong, since this has to be the way, how "everybody is running it" right?
I will be very grafetul for any ideas towards resolving this, or just experience, how you run consumers in k8s.
Thanks a lot! Jakub
Beta Was this translation helpful? Give feedback.
All reactions