Skip to content

Commit 5bf7adc

Browse files
committed
[kafka] YT-25055: Support topic creation in Metadata handler
* Changelog entry Type: feature Component: kafka-proxy Enable topic creation in the Metadata handler and improve handling of an empty topic list. Allow null values and null keys in the Records format commit_hash:77b418598177c1b38973823943731257a98d1e55
1 parent 5d7b204 commit 5bf7adc

File tree

2 files changed

+103
-50
lines changed

2 files changed

+103
-50
lines changed

yt/yt/client/kafka/requests.cpp

Lines changed: 98 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,19 @@ void TRecord::Serialize(IKafkaProtocolWriter* writer, int version) const
109109
WRITE_KAFKA_FIELD(recordWriter, WriteVarLong, TimestampDelta)
110110
WRITE_KAFKA_FIELD(recordWriter, WriteVarInt, OffsetDelta)
111111

112-
WRITE_KAFKA_FIELD(recordWriter, WriteVarInt, Key.size())
113-
WRITE_KAFKA_FIELD(recordWriter, WriteData, Key)
112+
if (Key) {
113+
WRITE_KAFKA_FIELD(recordWriter, WriteVarInt, Key->size())
114+
WRITE_KAFKA_FIELD(recordWriter, WriteData, *Key)
115+
} else {
116+
WRITE_KAFKA_FIELD(recordWriter, WriteVarInt, -1)
117+
}
114118

115-
WRITE_KAFKA_FIELD(recordWriter, WriteVarInt, Value.size())
116-
WRITE_KAFKA_FIELD(recordWriter, WriteData, Value)
119+
if (Value) {
120+
WRITE_KAFKA_FIELD(recordWriter, WriteVarInt, Value->size())
121+
WRITE_KAFKA_FIELD(recordWriter, WriteData, *Value)
122+
} else {
123+
WRITE_KAFKA_FIELD(recordWriter, WriteVarInt, -1)
124+
}
117125

118126
WRITE_KAFKA_FIELD(recordWriter, WriteVarInt, Headers.size())
119127
for (const auto& header : Headers) {
@@ -131,8 +139,16 @@ void TRecord::Serialize(IKafkaProtocolWriter* writer, int version) const
131139
writer->WriteInt64(TimestampDelta);
132140
}
133141

134-
writer->WriteBytes(Key);
135-
writer->WriteBytes(Value);
142+
if (Key) {
143+
writer->WriteBytes(*Key);
144+
} else {
145+
writer->WriteInt32(-1);
146+
}
147+
if (Value) {
148+
writer->WriteBytes(*Value);
149+
} else {
150+
writer->WriteInt32(-1);
151+
}
136152
} else {
137153
THROW_ERROR_EXCEPTION("Unsupported Record version %v in serialization", version);
138154
}
@@ -153,14 +169,18 @@ void TRecord::Deserialize(IKafkaProtocolReader* reader, int version)
153169

154170
auto keySize = reader->ReadVarInt();
155171
YT_LOG_TRACE("Parsing Record (KeySize: %v)", keySize);
156-
reader->ReadString(&Key, keySize);
172+
if (keySize > 0) {
173+
Key = TString{};
174+
reader->ReadString(&(*Key), keySize);
175+
}
157176

158177
i32 valueSize;
159178
READ_KAFKA_FIELD(valueSize, ReadVarInt);
160179

161180
if (valueSize > 0) {
162181
YT_LOG_TRACE("Parsing Record (ValueSize: %v)", valueSize);
163-
reader->ReadString(&Value, valueSize);
182+
Value = TString{};
183+
reader->ReadString(&(*Value), valueSize);
164184
}
165185

166186
i32 headerCount;
@@ -329,13 +349,13 @@ void TRspApiVersions::Serialize(IKafkaProtocolWriter* writer, int apiVersion) co
329349
void TReqMetadataTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
330350
{
331351
if (apiVersion >= 10) {
332-
TopicId = reader->ReadUuid();
352+
READ_KAFKA_FIELD(TopicId, ReadUuid)
333353
}
334354

335355
if (apiVersion < 9) {
336-
Topic = reader->ReadString();
356+
READ_KAFKA_FIELD(Name, ReadString)
337357
} else {
338-
Topic = reader->ReadCompactString();
358+
READ_KAFKA_FIELD(Name, ReadCompactString)
339359
}
340360
if (apiVersion >= 9) {
341361
NKafka::Deserialize(TagBuffer, reader, /*isCompact*/ true);
@@ -347,14 +367,14 @@ void TReqMetadata::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
347367
NKafka::Deserialize(Topics, reader, apiVersion >= 9, apiVersion);
348368

349369
if (apiVersion >= 4) {
350-
AllowAutoTopicCreation = reader->ReadBool();
370+
READ_KAFKA_FIELD(AllowAutoTopicCreation, ReadBool)
351371
}
352372

353373
if (apiVersion >= 8) {
354374
if (apiVersion <= 10) {
355-
IncludeClusterAuthorizedOperations = reader->ReadBool();
375+
READ_KAFKA_FIELD(IncludeClusterAuthorizedOperations, ReadBool)
356376
}
357-
IncludeTopicAuthorizedOperations = reader->ReadBool();
377+
READ_KAFKA_FIELD(IncludeTopicAuthorizedOperations, ReadBool)
358378
}
359379

360380
if (apiVersion >= 9) {
@@ -364,11 +384,19 @@ void TReqMetadata::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
364384

365385
void TRspMetadataBroker::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
366386
{
367-
writer->WriteInt32(NodeId);
368-
writer->WriteString(Host);
369-
writer->WriteInt32(Port);
387+
WRITE_KAFKA_FIELD(writer, WriteInt32, NodeId)
388+
if (apiVersion < 9) {
389+
WRITE_KAFKA_FIELD(writer, WriteString, Host)
390+
} else {
391+
WRITE_KAFKA_FIELD(writer, WriteCompactString, Host)
392+
}
393+
WRITE_KAFKA_FIELD(writer, WriteInt32, Port)
370394
if (apiVersion >= 1) {
371-
writer->WriteNullableString(Rack);
395+
if (apiVersion < 9) {
396+
WRITE_KAFKA_FIELD(writer, WriteNullableString, Rack)
397+
} else {
398+
WRITE_KAFKA_FIELD(writer, WriteCompactNullableString, Rack)
399+
}
372400
}
373401
if (apiVersion >= 9) {
374402
NKafka::Serialize(TagBuffer, writer, /*isCompact*/ true);
@@ -377,45 +405,70 @@ void TRspMetadataBroker::Serialize(IKafkaProtocolWriter* writer, int apiVersion)
377405

378406
void TRspMetadataTopicPartition::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
379407
{
380-
writer->WriteErrorCode(ErrorCode);
381-
writer->WriteInt32(PartitionIndex);
382-
writer->WriteInt32(LeaderId);
408+
WRITE_KAFKA_FIELD(writer, WriteErrorCode, ErrorCode)
409+
WRITE_KAFKA_FIELD(writer, WriteInt32, PartitionIndex)
410+
WRITE_KAFKA_FIELD(writer, WriteInt32, LeaderId)
411+
383412
// TODO(nadya73): check version.
384413
writer->WriteInt32(ReplicaNodes.size());
385414
for (auto replicaNode : ReplicaNodes) {
386415
writer->WriteInt32(replicaNode);
387416
}
388-
// TODO(nadya73): check version.
417+
// TODO(nadya73): check version.
389418
writer->WriteInt32(IsrNodes.size());
390419
for (auto isrNode : IsrNodes) {
391420
writer->WriteInt32(isrNode);
392421
}
422+
if (apiVersion >= 5) {
423+
// TODO(nadya73): check version.
424+
writer->WriteInt32(OfflineReplicas.size());
425+
for (auto offlineReplica : OfflineReplicas) {
426+
writer->WriteInt32(offlineReplica);
427+
}
428+
}
393429
if (apiVersion >= 9) {
394430
NKafka::Serialize(TagBuffer, writer, /*isCompact*/ true);
395431
}
396432
}
397433

398434
void TRspMetadataTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
399435
{
400-
writer->WriteErrorCode(ErrorCode);
401-
writer->WriteString(Name);
436+
WRITE_KAFKA_FIELD(writer, WriteErrorCode, ErrorCode)
437+
if (apiVersion < 9) {
438+
WRITE_KAFKA_FIELD(writer, WriteString, Name)
439+
} else {
440+
WRITE_KAFKA_FIELD(writer, WriteCompactString, Name)
441+
}
442+
if (apiVersion >= 10) {
443+
WRITE_KAFKA_FIELD(writer, WriteUuid, TopicId)
444+
}
402445
if (apiVersion >= 1) {
403-
writer->WriteBool(IsInternal);
446+
WRITE_KAFKA_FIELD(writer, WriteBool, IsInternal)
404447
}
405448
NKafka::Serialize(Partitions, writer, apiVersion >= 9, apiVersion);
449+
if (apiVersion >= 8) {
450+
WRITE_KAFKA_FIELD(writer, WriteInt32, TopicAuthorizedOperations)
451+
}
406452
if (apiVersion >= 9) {
407453
NKafka::Serialize(TagBuffer, writer, /*isCompact*/ true);
408454
}
409455
}
410456

411457
void TRspMetadata::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
412458
{
459+
if (apiVersion >= 3) {
460+
WRITE_KAFKA_FIELD(writer, WriteInt32, ThrottleTimeMs)
461+
}
413462
NKafka::Serialize(Brokers, writer, apiVersion >= 9, apiVersion);
414463
if (apiVersion >= 2) {
415-
writer->WriteNullableString(ClusterId);
464+
if (apiVersion < 9) {
465+
WRITE_KAFKA_FIELD(writer, WriteNullableString, ClusterId)
466+
} else {
467+
WRITE_KAFKA_FIELD(writer, WriteCompactNullableString, ClusterId)
468+
}
416469
}
417470
if (apiVersion >= 1) {
418-
writer->WriteInt32(ControllerId);
471+
WRITE_KAFKA_FIELD(writer, WriteInt32, ControllerId)
419472
}
420473
NKafka::Serialize(Topics, writer, apiVersion >= 9, apiVersion);
421474
if (apiVersion >= 9) {
@@ -841,52 +894,52 @@ void TRspProduce::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
841894

842895
void TReqListOffsetsTopicPartition::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
843896
{
844-
PartitionIndex = reader->ReadInt32();
845-
Timestamp = reader->ReadInt64(); // TODO: use timestamp?
846-
MaxNumOffsets = reader->ReadInt32();
897+
READ_KAFKA_FIELD(PartitionIndex, ReadInt32)
898+
READ_KAFKA_FIELD(Timestamp, ReadInt64) // TODO: use timestamp?
847899
}
848900

849901
void TReqListOffsetsTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
850902
{
851-
Name = reader->ReadString();
852-
Partitions.resize(reader->ReadInt32());
903+
READ_KAFKA_FIELD(Name, ReadString)
904+
i32 partitionCount;
905+
READ_KAFKA_FIELD(partitionCount, ReadInt32)
906+
Partitions.resize(partitionCount);
853907
for (auto& partition : Partitions) {
854908
partition.Deserialize(reader, apiVersion);
855909
}
856910
}
857911

858912
void TReqListOffsets::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
859913
{
860-
ReplicaId = reader->ReadInt32();
861-
Topics.resize(reader->ReadInt32());
914+
READ_KAFKA_FIELD(ReplicaId, ReadInt32)
915+
i32 topicCount;
916+
READ_KAFKA_FIELD(topicCount, ReadInt32)
917+
Topics.resize(topicCount);
862918
for (auto& topic : Topics) {
863919
topic.Deserialize(reader, apiVersion);
864920
}
865921
}
866922

867-
void TRspListOffsetsTopicPartition::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
923+
void TRspListOffsetsTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
868924
{
869-
writer->WriteInt32(PartitionIndex);
870-
writer->WriteErrorCode(ErrorCode);
871-
872-
if (apiVersion <= 0) {
873-
writer->WriteInt32(1); // Size of 'old_style_offsets'.
874-
}
875-
writer->WriteInt64(Offset);
925+
WRITE_KAFKA_FIELD(writer, WriteInt32, PartitionIndex)
926+
WRITE_KAFKA_FIELD(writer, WriteErrorCode, ErrorCode)
927+
WRITE_KAFKA_FIELD(writer, WriteInt64, Timestamp)
928+
WRITE_KAFKA_FIELD(writer, WriteInt64, Offset)
876929
}
877930

878931
void TRspListOffsetsTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
879932
{
880-
writer->WriteString(Name);
881-
writer->WriteInt32(Partitions.size());
933+
WRITE_KAFKA_FIELD(writer, WriteString, Name)
934+
WRITE_KAFKA_FIELD(writer, WriteInt32, Partitions.size())
882935
for (const auto& partition : Partitions) {
883936
partition.Serialize(writer, apiVersion);
884937
}
885938
}
886939

887940
void TRspListOffsets::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
888941
{
889-
writer->WriteInt32(Topics.size());
942+
WRITE_KAFKA_FIELD(writer, WriteInt32, Topics.size())
890943
for (const auto& topic : Topics) {
891944
topic.Serialize(writer, apiVersion);
892945
}

yt/yt/client/kafka/requests.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ struct TRecord
9898
i32 OffsetDelta = 0;
9999

100100
// Present in v1 and v2.
101-
TString Key;
102-
TString Value;
101+
std::optional<TString> Key;
102+
std::optional<TString> Value;
103103

104104
std::vector<TRecordHeader> Headers;
105105

@@ -189,7 +189,7 @@ struct TRspApiVersions
189189
struct TReqMetadataTopic
190190
{
191191
TGuid TopicId;
192-
TString Topic;
192+
TString Name;
193193
std::vector<TTaggedField> TagBuffer;
194194

195195
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
@@ -251,7 +251,7 @@ struct TRspMetadata
251251
{
252252
i32 ThrottleTimeMs = 0;
253253
std::vector<TRspMetadataBroker> Brokers;
254-
std::optional<std::string> ClusterId;
254+
std::optional<TString> ClusterId;
255255
i32 ControllerId = 0;
256256
std::vector<TRspMetadataTopic> Topics;
257257
std::vector<TTaggedField> TagBuffer;
@@ -668,7 +668,6 @@ struct TReqListOffsetsTopicPartition
668668
{
669669
i32 PartitionIndex = 0;
670670
i64 Timestamp = 0;
671-
i32 MaxNumOffsets = 0;
672671

673672
std::vector<TTaggedField> TagBuffer;
674673

@@ -701,6 +700,7 @@ struct TRspListOffsetsTopicPartition
701700
{
702701
i32 PartitionIndex = 0;
703702
NKafka::EErrorCode ErrorCode = EErrorCode::None;
703+
i64 Timestamp = 0;
704704
i64 Offset = 0;
705705

706706
std::vector<TTaggedField> TagBuffer;

0 commit comments

Comments
 (0)