Skip to content

Commit 1d9cc11

Browse files
committed
[kafka] YT-21805: Introduce group coordinator
* Changelog entry Type: feature Component: kafka-proxy Introduce group coordinator implementation. commit_hash:c1e12d85f566a68e2300d2a9e0395e6a9a29ed1e
1 parent a3da80a commit 1d9cc11

File tree

3 files changed

+32
-55
lines changed

3 files changed

+32
-55
lines changed

yt/yt/client/kafka/error.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ namespace NYT::NKafka {
99
DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, i16,
1010
((UnknownServerError) (-1))
1111
((None) (0))
12+
((NotCoordinator) (16))
13+
((IllegalGeneration) (22))
14+
((InconsistentGroupProtocol) (23))
15+
((RebalanceInProgress) (27))
1216
((TopicAuthorizationFailed) (29))
1317
((GroupAuthorizationFailed) (30))
1418
((SaslAuthenticationFailed) (31))

yt/yt/client/kafka/requests.cpp

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,8 @@ void TReqJoinGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
373373
SessionTimeoutMs = reader->ReadInt32();
374374
MemberId = reader->ReadString();
375375
ProtocolType = reader->ReadString();
376-
Protocols.resize(reader->ReadInt32());
377-
for (auto& protocol : Protocols) {
378-
protocol.Deserialize(reader, apiVersion);
379-
}
376+
377+
NKafka::Deserialize(Protocols, reader, /*isCompact*/ false, apiVersion);
380378
}
381379

382380
void TRspJoinGroupMember::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
@@ -393,10 +391,7 @@ void TRspJoinGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) cons
393391
writer->WriteString(Leader);
394392
writer->WriteString(MemberId);
395393

396-
writer->WriteInt32(Members.size());
397-
for (const auto& member : Members) {
398-
member.Serialize(writer, apiVersion);
399-
}
394+
NKafka::Serialize(Members, writer, /*isCompact*/ false, apiVersion);
400395
}
401396

402397
////////////////////////////////////////////////////////////////////////////////
@@ -410,36 +405,16 @@ void TReqSyncGroupAssignment::Deserialize(IKafkaProtocolReader* reader, int /*ap
410405
void TReqSyncGroup::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
411406
{
412407
GroupId = reader->ReadString();
413-
GenerationId = reader->ReadString();
408+
GenerationId = reader->ReadInt32();
414409
MemberId = reader->ReadString();
415-
Assignments.resize(reader->ReadInt32());
416-
for (auto& assignment : Assignments) {
417-
assignment.Deserialize(reader, apiVersion);
418-
}
419-
}
420410

421-
void TRspSyncGroupAssignment::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
422-
{
423-
writer->WriteString(Topic);
424-
writer->WriteInt32(Partitions.size());
425-
for (const auto& partition : Partitions) {
426-
writer->WriteInt32(partition);
427-
}
411+
NKafka::Deserialize(Assignments, reader, /*isCompact*/ false, apiVersion);
428412
}
429413

430-
void TRspSyncGroup::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
414+
void TRspSyncGroup::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
431415
{
432416
writer->WriteErrorCode(ErrorCode);
433-
434-
writer->StartBytes();
435-
writer->WriteInt16(0);
436-
writer->WriteInt32(Assignments.size());
437-
for (const auto& assignment : Assignments) {
438-
assignment.Serialize(writer, apiVersion);
439-
}
440-
// User data.
441-
writer->WriteBytes(TString{});
442-
writer->FinishBytes();
417+
writer->WriteBytes(Assignment);
443418
}
444419

445420
////////////////////////////////////////////////////////////////////////////////

yt/yt/client/kafka/requests.h

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ namespace NYT::NKafka {
1010

1111
////////////////////////////////////////////////////////////////////////////////
1212

13+
using TMemberId = TString;
14+
using TGroupId = TString;
15+
16+
////////////////////////////////////////////////////////////////////////////////
17+
1318
DEFINE_ENUM(ERequestType,
1419
((None) (-1))
1520
((Produce) (0))
@@ -20,9 +25,10 @@ DEFINE_ENUM(ERequestType,
2025
((OffsetCommit) (8))
2126
((OffsetFetch) (9))
2227
((FindCoordinator) (10))
23-
((JoinGroup) (11)) // Unimplemented.
24-
((Heartbeat) (12)) // Unimplemented.
25-
((SyncGroup) (14)) // Unimplemented.
28+
((JoinGroup) (11))
29+
((Heartbeat) (12))
30+
((LeaveGroup) (13))
31+
((SyncGroup) (14))
2632
((DescribeGroups) (15)) // Unimplemented.
2733
((SaslHandshake) (17))
2834
((ApiVersions) (18))
@@ -259,7 +265,7 @@ struct TRspFindCoordinator
259265
struct TReqJoinGroupProtocol
260266
{
261267
TString Name;
262-
TString Metadata; // TODO(nadya73): bytes.
268+
TString Metadata;
263269

264270
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
265271
};
@@ -268,9 +274,9 @@ struct TReqJoinGroup
268274
{
269275
static constexpr ERequestType RequestType = ERequestType::JoinGroup;
270276

271-
TString GroupId;
277+
TGroupId GroupId;
272278
i32 SessionTimeoutMs = 0;
273-
TString MemberId;
279+
TMemberId MemberId;
274280
TString ProtocolType;
275281
std::vector<TReqJoinGroupProtocol> Protocols;
276282

@@ -279,7 +285,7 @@ struct TReqJoinGroup
279285

280286
struct TRspJoinGroupMember
281287
{
282-
TString MemberId;
288+
TMemberId MemberId;
283289
TString Metadata; // TODO(nadya73): bytes.
284290

285291
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
@@ -291,7 +297,7 @@ struct TRspJoinGroup
291297
i32 GenerationId = 0;
292298
TString ProtocolName;
293299
TString Leader;
294-
TString MemberId;
300+
TMemberId MemberId;
295301
std::vector<TRspJoinGroupMember> Members;
296302

297303
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
@@ -301,7 +307,7 @@ struct TRspJoinGroup
301307

302308
struct TReqSyncGroupAssignment
303309
{
304-
TString MemberId;
310+
TMemberId MemberId;
305311
TString Assignment;
306312

307313
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
@@ -311,26 +317,18 @@ struct TReqSyncGroup
311317
{
312318
static constexpr ERequestType RequestType = ERequestType::SyncGroup;
313319

314-
TString GroupId;
315-
TString GenerationId;
316-
TString MemberId;
320+
TGroupId GroupId;
321+
i32 GenerationId = 0;
322+
TMemberId MemberId;
317323
std::vector<TReqSyncGroupAssignment> Assignments;
318324

319325
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
320326
};
321327

322-
struct TRspSyncGroupAssignment
323-
{
324-
TString Topic;
325-
std::vector<i32> Partitions;
326-
327-
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
328-
};
329-
330328
struct TRspSyncGroup
331329
{
332330
NKafka::EErrorCode ErrorCode = NKafka::EErrorCode::None;
333-
std::vector<TRspSyncGroupAssignment> Assignments;
331+
TString Assignment;
334332

335333
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
336334
};
@@ -341,9 +339,9 @@ struct TReqHeartbeat
341339
{
342340
static constexpr ERequestType RequestType = ERequestType::Heartbeat;
343341

344-
TString GroupId;
342+
TGroupId GroupId;
345343
i32 GenerationId = 0;
346-
TString MemberId;
344+
TMemberId MemberId;
347345

348346
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
349347
};

0 commit comments

Comments
 (0)