Skip to content

Commit 7d4f0b7

Browse files
nerhneiroIrina Skvortsova
andcommitted
Fix bug with long timeout when consumer left ungracefully (#19503)
Co-authored-by: Irina Skvortsova <nerhneiro@yandex-team.ru>
1 parent 9a56e86 commit 7d4f0b7

File tree

3 files changed

+20
-8
lines changed

3 files changed

+20
-8
lines changed

ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ const TString SELECT_ALIVE_MEMBERS = R"sql(
157157
DECLARE $PaginationMemberId AS Utf8;
158158
DECLARE $Limit AS Uint64;
159159
160-
SELECT member_id, instance_id, rebalance_timeout_ms
160+
SELECT member_id, instance_id, rebalance_timeout_ms, session_timeout_ms, heartbeat_deadline
161161
FROM `%s`
162162
VIEW PRIMARY KEY
163163
WHERE database = $Database

ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -602,11 +602,16 @@ void TKafkaBalancerActor::JoinStepWaitMembersAndChooseProtocol(NKqp::TEvKqp::TEv
602602

603603
// check all clients have joined or their timeout has expired
604604
for (auto prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.begin(); prevGenerationMembersAndTimeoutsIt != WaitedMemberIdsAndTimeouts.end();) {
605+
ui32 memberRebalanceTimeoutMs = prevGenerationMembersAndTimeoutsIt->second.RebalanceTimeoutMs;
606+
const TInstant& memberHeartbeatDeadline = prevGenerationMembersAndTimeoutsIt->second.HeartbeatDeadline;
605607
if (AllWorkerStates.count(prevGenerationMembersAndTimeoutsIt->first) == 1) {
606608
KAFKA_LOG_D(TStringBuilder() << "Waited member connected: " << prevGenerationMembersAndTimeoutsIt->first);
607609
prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt);
608-
} else if ((RebalanceStartTime + TDuration::MilliSeconds(prevGenerationMembersAndTimeoutsIt->second)) < now) {
609-
KAFKA_LOG_D(TStringBuilder() << "Waited member connect deadline: " << prevGenerationMembersAndTimeoutsIt->first);
610+
} else if ((RebalanceStartTime + TDuration::MilliSeconds(memberRebalanceTimeoutMs)) < now) {
611+
KAFKA_LOG_D(TStringBuilder() << "Rebalance deadline: " << prevGenerationMembersAndTimeoutsIt->first);
612+
prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt);
613+
} else if (memberHeartbeatDeadline < now) {
614+
KAFKA_LOG_D(TStringBuilder() << "Waited member connect session deadline: " << prevGenerationMembersAndTimeoutsIt->first);
610615
prevGenerationMembersAndTimeoutsIt = WaitedMemberIdsAndTimeouts.erase(prevGenerationMembersAndTimeoutsIt);
611616
} else {
612617
++prevGenerationMembersAndTimeoutsIt;
@@ -938,7 +943,7 @@ bool TKafkaBalancerActor::ParseAssignments(
938943

939944
bool TKafkaBalancerActor::ParseMembersAndRebalanceTimeouts(
940945
NKqp::TEvKqp::TEvQueryResponse::TPtr ev,
941-
std::unordered_map<TString, ui32>& membersAndRebalanceTimeouts,
946+
std::unordered_map<TString, MemberTimeoutsMs>& membersAndTimeouts,
942947
TString& lastMemberId)
943948
{
944949
if (!ev) {
@@ -956,7 +961,9 @@ bool TKafkaBalancerActor::ParseMembersAndRebalanceTimeouts(
956961
TString memberId = TString(parser.ColumnParser("member_id").GetUtf8());
957962
TString instanceId = parser.ColumnParser("instance_id").GetOptionalUtf8().value_or("");
958963
ui32 rebalanceTimeoutMs = parser.ColumnParser("rebalance_timeout_ms").GetOptionalUint32().value_or(DEFAULT_REBALANCE_TIMEOUT_MS);
959-
membersAndRebalanceTimeouts[memberId] = rebalanceTimeoutMs;
964+
ui32 sessionTimeoutMs = parser.ColumnParser("session_timeout_ms").GetOptionalUint32().value_or(DEFAULT_SESSION_TIMEOUT_MS);
965+
TInstant heartbeatDeadline = parser.ColumnParser("heartbeat_deadline").GetOptionalDatetime().value_or(TInstant::Now() + TDuration::MilliSeconds(sessionTimeoutMs));
966+
membersAndTimeouts[memberId] = {rebalanceTimeoutMs, heartbeatDeadline};
960967

961968
lastMemberId = memberId;
962969
}
@@ -1224,7 +1231,7 @@ NYdb::TParamsBuilder TKafkaBalancerActor::BuildInsertMemberParams() {
12241231
params.AddParam("$MemberId").Utf8(MemberId).Build();
12251232
params.AddParam("$InstanceId").Utf8(InstanceId).Build();
12261233
params.AddParam("$Database").Utf8(Kqp->DataBase).Build();
1227-
params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds(RebalanceTimeoutMs + SessionTimeoutMs)).Build();
1234+
params.AddParam("$HeartbeatDeadline").Datetime(TInstant::Now() + TDuration::MilliSeconds(SessionTimeoutMs)).Build();
12281235
params.AddParam("$SessionTimeoutMs").Uint32(SessionTimeoutMs).Build();
12291236
params.AddParam("$RebalanceTimeoutMs").Uint32(RebalanceTimeoutMs).Build();
12301237

ydb/core/kafka_proxy/actors/kafka_balancer_actor.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
114114
LEAVE_SET_DEAD
115115
};
116116

117+
struct MemberTimeoutsMs {
118+
ui32 RebalanceTimeoutMs;
119+
TInstant HeartbeatDeadline;
120+
};
121+
117122
TKafkaBalancerActor(const TContext::TPtr context, ui64 cookie, ui64 corellationId, TMessagePtr<TJoinGroupRequestData> message, ui8 retryNum = 0)
118123
: Context(context)
119124
, CorrelationId(corellationId)
@@ -309,7 +314,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
309314
std::optional<TGroupStatus> ParseGroupState(NKqp::TEvKqp::TEvQueryResponse::TPtr ev);
310315
bool ParseAssignments(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, TString& assignments);
311316
bool ParseWorkerStates(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map<TString, NKafka::TWorkerState>& workerStates, TString& outLastMemberId);
312-
bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map<TString, ui32>& membersAndRebalanceTimeouts, TString& lastMemberId);
317+
bool ParseMembersAndRebalanceTimeouts(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, std::unordered_map<TString, MemberTimeoutsMs>& membersAndRebalanceTimeouts, TString& lastMemberId);
313318
bool ParseDeadsAndSessionTimeout(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& deadsCount, ui32& outSessionTimeoutMs);
314319
bool ParseGroupsCount(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& groupsCount);
315320
bool ParseMemberGeneration(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, ui64& generation);
@@ -363,7 +368,7 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
363368
TString Assignments;
364369
std::unordered_map<TString, TString> WorkerStates;
365370
std::unordered_map<TString, NKafka::TWorkerState> AllWorkerStates;
366-
std::unordered_map<TString, ui32> WaitedMemberIdsAndTimeouts;
371+
std::unordered_map<TString, MemberTimeoutsMs> WaitedMemberIdsAndTimeouts;
367372
TInstant RebalanceStartTime = TInstant::Now();
368373
TString Protocol;
369374
TString ProtocolType;

0 commit comments

Comments
 (0)