Skip to content

Commit 32c4fe1

Browse files
committed
fix
1 parent 4c6b51d commit 32c4fe1

File tree

2 files changed

+9
-17
lines changed

2 files changed

+9
-17
lines changed

ydb/core/kafka_proxy/actors/kafka_balancer_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ static EKafkaErrors KqpStatusToKafkaError(Ydb::StatusIds::StatusCode status) {
5050
} else if (status == Ydb::StatusIds::PRECONDITION_FAILED) {
5151
return EKafkaErrors::REBALANCE_IN_PROGRESS;
5252
}
53-
return EKafkaErrors::UNKNOWN_SERVER_ERROR;
53+
return EKafkaErrors::INVALID_REQUEST;
5454
}
5555

5656
void TKafkaBalancerActor::Handle(TEvents::TEvWakeup::TPtr&, const TActorContext& ctx) {

ydb/core/kafka_proxy/kafka_connection.cpp

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
244244
HandleKillReadSession();
245245
Register(new TKafkaBalancerActor(Context, 0, header->CorrelationId, message));
246246
} else {
247-
if (ReadSessionActorId) {
248-
Send(ReadSessionActorId, new TEvKafka::TEvJoinGroupRequest(header->CorrelationId, message));
249-
} else {
247+
if (!ReadSessionActorId) {
250248
ReadSessionActorId = RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0));
251-
Send(ReadSessionActorId, new TEvKafka::TEvJoinGroupRequest(header->CorrelationId, message));
252249
}
250+
Send(ReadSessionActorId, new TEvKafka::TEvJoinGroupRequest(header->CorrelationId, message));
253251
}
254252
}
255253

@@ -258,12 +256,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
258256
HandleKillReadSession();
259257
Register(new TKafkaBalancerActor(Context, 0, header->CorrelationId, message));
260258
} else {
261-
if (ReadSessionActorId) {
262-
Send(ReadSessionActorId, new TEvKafka::TEvSyncGroupRequest(header->CorrelationId, message));
263-
} else {
259+
if (!ReadSessionActorId) {
264260
ReadSessionActorId = RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0));
265-
Send(ReadSessionActorId, new TEvKafka::TEvSyncGroupRequest(header->CorrelationId, message));
266261
}
262+
Send(ReadSessionActorId, new TEvKafka::TEvSyncGroupRequest(header->CorrelationId, message));
267263
}
268264
}
269265

@@ -272,12 +268,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
272268
HandleKillReadSession();
273269
Register(new TKafkaBalancerActor(Context, 0, header->CorrelationId, message));
274270
} else {
275-
if (ReadSessionActorId) {
276-
Send(ReadSessionActorId, new TEvKafka::TEvHeartbeatRequest(header->CorrelationId, message));
277-
} else {
271+
if (!ReadSessionActorId) {
278272
ReadSessionActorId = RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0));
279-
Send(ReadSessionActorId, new TEvKafka::TEvHeartbeatRequest(header->CorrelationId, message));
280273
}
274+
Send(ReadSessionActorId, new TEvKafka::TEvHeartbeatRequest(header->CorrelationId, message));
281275
}
282276
}
283277

@@ -286,12 +280,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
286280
HandleKillReadSession();
287281
Register(new TKafkaBalancerActor(Context, 0, header->CorrelationId, message));
288282
} else {
289-
if (ReadSessionActorId) {
290-
Send(ReadSessionActorId, new TEvKafka::TEvLeaveGroupRequest(header->CorrelationId, message));
291-
} else {
283+
if (!ReadSessionActorId) {
292284
ReadSessionActorId = RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0));
293-
Send(ReadSessionActorId, new TEvKafka::TEvLeaveGroupRequest(header->CorrelationId, message));
294285
}
286+
Send(ReadSessionActorId, new TEvKafka::TEvLeaveGroupRequest(header->CorrelationId, message));
295287
}
296288
}
297289

0 commit comments

Comments
 (0)