Skip to content

Commit f8a5d3a

Browse files
committed
few fixes and logs
1 parent a63de6a commit f8a5d3a

File tree

3 files changed

+25
-21
lines changed

3 files changed

+25
-21
lines changed

ydb/core/kafka_proxy/actors/kafka_balance_actor_sql.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ const TString CHECK_GROUP_STATE = R"sql(
151151
DECLARE $ConsumerGroup AS Utf8;
152152
DECLARE $Database AS Utf8;
153153
154-
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol
154+
SELECT state, generation, master, last_heartbeat_time, consumer_group, database, protocol, protocol_type
155155
FROM `%s`
156156
WHERE consumer_group = $ConsumerGroup
157157
AND database = $Database;

ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ void TKafkaBalancerActor::Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&
4343
}
4444

4545
void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
46-
Cookie = 0; // savnik
47-
4846
if (!Kqp->HandleCreateSessionResponse(ev, ctx)) {
4947
SendResponseFail(ctx, EKafkaErrors::UNKNOWN_SERVER_ERROR, "Failed to create KQP session");
5048
PassAway();
@@ -64,7 +62,7 @@ void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const
6462
auto status = record.GetYdbStatus();
6563
if (status == ::Ydb::StatusIds_StatusCode::StatusIds_StatusCode_ABORTED && CurrentRetryNumber < TX_ABORT_MAX_RETRY_COUNT) {
6664
CurrentRetryNumber++;
67-
KAFKA_LOG_I(TStringBuilder() << "Retry after tx aborted. Num of retry# " << CurrentRetryNumber);
65+
KAFKA_LOG_I(TStringBuilder() << "Retry after tx aborted. Num of retry# " << static_cast<int>(CurrentRetryNumber));
6866
switch (RequestType) {
6967
case JOIN_GROUP:
7068
Register(new TKafkaBalancerActor(Context, Cookie, CorrelationId, JoinGroupRequestData, CurrentRetryNumber));
@@ -102,7 +100,7 @@ void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const
102100
}
103101

104102
void TKafkaBalancerActor::HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr ev, const TActorContext& ctx) {
105-
KAFKA_LOG_I(TStringBuilder() << "Handle kqp response. CurrentStep# " << (ui8)CurrentStep);
103+
KAFKA_LOG_I(TStringBuilder() << "Handle kqp response. CurrentStep# " << static_cast<int>(CurrentStep) << ", MemberId# " << MemberId);
106104
switch (RequestType) {
107105
case JOIN_GROUP:
108106
HandleJoinGroupResponse(ev, ctx);
@@ -169,6 +167,7 @@ std::optional<TGroupStatus> TKafkaBalancerActor::ParseCheckStateAndGeneration(
169167
result.MasterId = parser.ColumnParser("master").GetOptionalUtf8().value_or("");
170168
result.LastHeartbeat = parser.ColumnParser("last_heartbeat_time").GetOptionalDatetime().value_or(TInstant::Zero());
171169
result.ProtocolName = parser.ColumnParser("protocol").GetOptionalUtf8().value_or("");
170+
result.ProtocolType = parser.ColumnParser("protocol_type").GetOptionalUtf8().value_or("");
172171
result.Exists = true;
173172

174173
if (parser.TryNextRow()) {
@@ -489,7 +488,6 @@ void TKafkaBalancerActor::HandleJoinGroupResponse(
489488
CurrentStep = JOIN_TX0_1_CHECK_STATE_AND_GENERATION;
490489
KqpReqCookie++;
491490

492-
// savnik yql compile?
493491
NYdb::TParamsBuilder params = BuildCheckGroupStateParams();
494492

495493
Kqp->SendYqlRequest(Sprintf(CHECK_GROUP_STATE.c_str(), TKafkaConsumerGroupsMetaInitManager::GetInstant()->GetStorageTablePath().c_str()), params.Build(), KqpReqCookie, ctx);
@@ -634,7 +632,7 @@ void TKafkaBalancerActor::HandleJoinGroupResponse(
634632

635633
if (groupStatus->State < GROUP_STATE_SYNC) {
636634
if (WaitingWorkingStateRetries == WAIT_MASTER_MAX_RETRY_COUNT) {
637-
SendJoinGroupResponseFail(ctx, CorrelationId, REBALANCE_IN_PROGRESS); //savnik какой статус отдать, чтобы клиент поретраил?
635+
SendJoinGroupResponseFail(ctx, CorrelationId, REBALANCE_IN_PROGRESS);
638636
PassAway();
639637
return;
640638
}
@@ -650,6 +648,7 @@ void TKafkaBalancerActor::HandleJoinGroupResponse(
650648
}
651649

652650
CurrentStep = JOIN_TX1_2_GET_MEMBERS_AND_SET_STATE_SYNC;
651+
Protocol = groupStatus->ProtocolName;
653652
HandleJoinGroupResponse(ev, ctx);
654653
return;
655654
}
@@ -750,6 +749,7 @@ void TKafkaBalancerActor::HandleSyncGroupResponse(
750749
return;
751750
}
752751

752+
ProtocolType = groupStatus->ProtocolType;
753753
Master = groupStatus->MasterId;
754754

755755
if (MemberId == groupStatus->MasterId) {
@@ -1012,6 +1012,7 @@ void TKafkaBalancerActor::HandleHeartbeatResponse(
10121012
return;
10131013
}
10141014

1015+
ProtocolType = groupStatus->ProtocolType;
10151016
IsMaster = (groupStatus->MasterId == MemberId);
10161017
CurrentStep = HEARTBEAT_TX1_2_UPDATE_TTL;
10171018
KqpReqCookie++;
@@ -1044,6 +1045,7 @@ void TKafkaBalancerActor::HandleHeartbeatResponse(
10441045
}
10451046

10461047
void TKafkaBalancerActor::SendJoinGroupResponseOk(const TActorContext& /*ctx*/, ui64 correlationId) {
1048+
KAFKA_LOG_I(TStringBuilder() << "JOIN_GROUP success. MemberId# " << MemberId);
10471049
auto response = std::make_shared<TJoinGroupResponseData>();
10481050

10491051
response->ProtocolType = ProtocolType;
@@ -1069,6 +1071,7 @@ void TKafkaBalancerActor::SendJoinGroupResponseOk(const TActorContext& /*ctx*/,
10691071
}
10701072

10711073
void TKafkaBalancerActor::SendSyncGroupResponseOk(const TActorContext&, ui64 correlationId) {
1074+
KAFKA_LOG_I(TStringBuilder() << "SYNC_GROUP success. MemberId# " << MemberId);
10721075
auto response = std::make_shared<TSyncGroupResponseData>();
10731076
response->ProtocolType = ProtocolType;
10741077
response->ProtocolName = Protocol;
@@ -1079,6 +1082,7 @@ void TKafkaBalancerActor::SendSyncGroupResponseOk(const TActorContext&, ui64 cor
10791082
}
10801083

10811084
void TKafkaBalancerActor::SendLeaveGroupResponseOk(const TActorContext&, ui64 corellationId) {
1085+
KAFKA_LOG_I(TStringBuilder() << "LEAVE_GROUP success. MemberId# " << MemberId);
10821086
auto response = std::make_shared<TLeaveGroupResponseData>();
10831087
response->ErrorCode = EKafkaErrors::NONE_ERROR;
10841088
Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, EKafkaErrors::NONE_ERROR));
@@ -1087,6 +1091,7 @@ void TKafkaBalancerActor::SendLeaveGroupResponseOk(const TActorContext&, ui64 co
10871091
void TKafkaBalancerActor::SendHeartbeatResponseOk(const TActorContext&,
10881092
ui64 corellationId,
10891093
EKafkaErrors error) {
1094+
KAFKA_LOG_I(TStringBuilder() << "HEARTBEAT success. MemberId# " << MemberId);
10901095
auto response = std::make_shared<THeartbeatResponseData>();
10911096
response->ErrorCode = error;
10921097
Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
@@ -1097,8 +1102,7 @@ void TKafkaBalancerActor::SendJoinGroupResponseFail(const TActorContext&,
10971102
ui64 corellationId,
10981103
EKafkaErrors error,
10991104
TString message) {
1100-
1101-
KAFKA_LOG_ERROR("JOIN_GROUP failed. reason# " << message);
1105+
KAFKA_LOG_ERROR(TStringBuilder() << "JOIN_GROUP failed. reason# " << message << ", MemberId# " << MemberId);
11021106
auto response = std::make_shared<TJoinGroupResponseData>();
11031107
response->ErrorCode = error;
11041108
Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
@@ -1108,7 +1112,7 @@ void TKafkaBalancerActor::SendSyncGroupResponseFail(const TActorContext&,
11081112
ui64 corellationId,
11091113
EKafkaErrors error,
11101114
TString message) {
1111-
KAFKA_LOG_ERROR("SYNC_GROUP failed. reason# " << message);
1115+
KAFKA_LOG_ERROR(TStringBuilder() << "SYNC_GROUP failed. reason# " << message << ", MemberId# " << MemberId);
11121116
auto response = std::make_shared<TSyncGroupResponseData>();
11131117
response->ErrorCode = error;
11141118
response->Assignment = "";
@@ -1119,7 +1123,7 @@ void TKafkaBalancerActor::SendLeaveGroupResponseFail(const TActorContext&,
11191123
ui64 corellationId,
11201124
EKafkaErrors error,
11211125
TString message) {
1122-
KAFKA_LOG_ERROR("LEAVE_GROUP failed. reason# " << message);
1126+
KAFKA_LOG_ERROR(TStringBuilder() << "LEAVE_GROUP failed. reason# " << message << ", MemberId# " << MemberId);
11231127
auto response = std::make_shared<TLeaveGroupResponseData>();
11241128
response->ErrorCode = error;
11251129
Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));
@@ -1129,7 +1133,7 @@ void TKafkaBalancerActor::SendHeartbeatResponseFail(const TActorContext&,
11291133
ui64 corellationId,
11301134
EKafkaErrors error,
11311135
TString message) {
1132-
KAFKA_LOG_ERROR("HEARTBEAT failed. reason# " << message);
1136+
KAFKA_LOG_ERROR(TStringBuilder() << "HEARTBEAT failed. reason# " << message << ", MemberId# " << MemberId);
11331137
auto response = std::make_shared<THeartbeatResponseData>();
11341138
response->ErrorCode = error;
11351139
Send(Context->ConnectionId, new TEvKafka::TEvResponse(corellationId, response, error));

ydb/core/kafka_proxy/actors/kafka_balancer_actor.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ struct TGroupStatus {
4242
TString MasterId;
4343
TInstant LastHeartbeat;
4444
TString ProtocolName;
45+
TString ProtocolType;
4546
};
4647

4748
class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerActor> {
@@ -111,8 +112,8 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
111112
RequestType = JOIN_GROUP;
112113
CurrentStep = STEP_NONE;
113114

114-
GroupId = JoinGroupRequestData->GroupId.value();
115-
ProtocolType = JoinGroupRequestData->ProtocolType.value();
115+
GroupId = JoinGroupRequestData->GroupId.value_or("");
116+
ProtocolType = JoinGroupRequestData->ProtocolType.value_or("");
116117
}
117118

118119
TKafkaBalancerActor(const TContext::TPtr context, ui64 cookie, ui64 corellationId, TMessagePtr<TSyncGroupRequestData> message, ui8 retryNum = 0)
@@ -136,10 +137,9 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
136137
RequestType = SYNC_GROUP;
137138
CurrentStep = STEP_NONE;
138139

139-
GroupId = SyncGroupRequestData->GroupId.value();
140-
MemberId = SyncGroupRequestData->MemberId.value();
140+
GroupId = SyncGroupRequestData->GroupId.value_or("");
141+
MemberId = SyncGroupRequestData->MemberId.value_or("");
141142
GenerationId = SyncGroupRequestData->GenerationId;
142-
ProtocolType = SyncGroupRequestData->ProtocolType.value();
143143
}
144144

145145
TKafkaBalancerActor(const TContext::TPtr context, ui64 cookie, ui64 corellationId, TMessagePtr<THeartbeatRequestData> message, ui8 retryNum = 0)
@@ -163,8 +163,8 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
163163
RequestType = HEARTBEAT;
164164
CurrentStep = STEP_NONE;
165165

166-
GroupId = HeartbeatGroupRequestData->GroupId.value();
167-
MemberId = HeartbeatGroupRequestData->MemberId.value();
166+
GroupId = HeartbeatGroupRequestData->GroupId.value_or("");
167+
MemberId = HeartbeatGroupRequestData->MemberId.value_or("");
168168
GenerationId = HeartbeatGroupRequestData->GenerationId;
169169
}
170170

@@ -188,8 +188,8 @@ class TKafkaBalancerActor : public NActors::TActorBootstrapped<TKafkaBalancerAct
188188
RequestType = LEAVE_GROUP;
189189
CurrentStep = STEP_NONE;
190190

191-
GroupId = LeaveGroupRequestData->GroupId.value();
192-
MemberId = LeaveGroupRequestData->MemberId.value();
191+
GroupId = LeaveGroupRequestData->GroupId.value_or("");
192+
MemberId = LeaveGroupRequestData->MemberId.value_or("");
193193
}
194194

195195
void Bootstrap(const NActors::TActorContext& ctx);

0 commit comments

Comments
 (0)