@@ -360,19 +360,6 @@ void TKafkaBalancerActor::JoinStepCheckGroupState(NKqp::TEvKqp::TEvQueryResponse
360
360
void TKafkaBalancerActor::JoinStepCreateNewOrJoinGroup (NKqp::TEvKqp::TEvQueryResponse::TPtr ev, const TActorContext& ctx) {
361
361
Kqp->SetTxId (ev->Get ()->Record .GetResponse ().GetTxMeta ().id ());
362
362
auto groupStatus = ParseGroupState (ev);
363
- LastSuccessGeneration = groupStatus->LastSuccessGeneration ;
364
-
365
- KAFKA_LOG_I (TStringBuilder () << " Check group before join status."
366
- " \n memberId: " << MemberId <<
367
- " \n instanceId: " << InstanceId <<
368
- " \n group: " << GroupId <<
369
- " \n exists: " << groupStatus->Exists <<
370
- " \n protocolType: " << groupStatus->ProtocolType <<
371
- " \n protocolName: " << groupStatus->ProtocolName <<
372
- " \n master: " << groupStatus->MasterId <<
373
- " \n generation: " << groupStatus->Generation <<
374
- " \n lastSuccessGeneration: " << groupStatus->LastSuccessGeneration <<
375
- " \n state: " << groupStatus->State );
376
363
377
364
if (!groupStatus) {
378
365
SendJoinGroupResponseFail (ctx, CorrelationId,
@@ -388,12 +375,27 @@ void TKafkaBalancerActor::JoinStepCreateNewOrJoinGroup(NKqp::TEvKqp::TEvQueryRes
388
375
return ;
389
376
}
390
377
378
+ if (groupStatus->Exists ) {
379
+ KAFKA_LOG_I (TStringBuilder () << " Check group before join status."
380
+ " \n memberId: " << MemberId <<
381
+ " \n instanceId: " << InstanceId <<
382
+ " \n group: " << GroupId <<
383
+ " \n exists: " << groupStatus->Exists <<
384
+ " \n protocolType: " << groupStatus->ProtocolType <<
385
+ " \n protocolName: " << groupStatus->ProtocolName <<
386
+ " \n master: " << groupStatus->MasterId <<
387
+ " \n generation: " << groupStatus->Generation <<
388
+ " \n lastSuccessGeneration: " << groupStatus->LastSuccessGeneration <<
389
+ " \n state: " << groupStatus->State );
390
+ }
391
+
391
392
if (!groupStatus->Exists ) {
392
393
KqpReqCookie++;
393
394
IsMaster = true ;
394
395
Master = MemberId;
395
396
GenerationId = 0 ;
396
397
CurrentStep = JOIN_CHECK_GROUPS_COUNT;
398
+ LastSuccessGeneration = std::numeric_limits<ui64>::max ();
397
399
NYdb::TParamsBuilder params = BuildCheckGroupsCountParams ();
398
400
Kqp->SendYqlRequest (Sprintf (CHECK_GROUPS_COUNT.c_str (), TKafkaConsumerGroupsMetaInitManager::GetInstant ()->GetStorageTablePath ().c_str ()), params.Build (), KqpReqCookie, ctx);
399
401
} else if (groupStatus->State != GROUP_STATE_JOIN) {
0 commit comments