Skip to content

Commit 77d2782

Browse files
committed
[kafka] YT-24188: Support ListOffsets method
* Changelog entry Type: feature Component: kafka-proxy Partially support of `ListOffsets` method commit_hash:15a3207d5cb3e4cad7205af4d51fda4a937db912
1 parent 23474d6 commit 77d2782

File tree

4 files changed

+124
-1
lines changed

4 files changed

+124
-1
lines changed

yt/yt/client/kafka/error.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ DEFINE_ENUM_WITH_UNDERLYING_TYPE(EErrorCode, i16,
1212
((TopicAuthorizationFailed) (29))
1313
((GroupAuthorizationFailed) (30))
1414
((SaslAuthenticationFailed) (31))
15+
((InvalidTimestamp) (32))
1516
((UnsupportedSaslMechanism) (33))
1617
);
1718

yt/yt/client/kafka/requests-inl.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ void Deserialize(std::vector<T>& data, IKafkaProtocolReader* reader, bool isComp
3939
}
4040
data.resize(size);
4141
} else {
42-
data.resize(reader->ReadInt32());
42+
auto size = reader->ReadInt32();
43+
if (size < 0) {
44+
return;
45+
}
46+
data.resize(size);
4347
}
4448
for (auto& item : data) {
4549
item.Deserialize(reader, args...);

yt/yt/client/kafka/requests.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,4 +757,57 @@ void TRspProduce::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
757757

758758
////////////////////////////////////////////////////////////////////////////////
759759

760+
void TReqListOffsetsTopicPartition::Deserialize(IKafkaProtocolReader* reader, int /*apiVersion*/)
761+
{
762+
PartitionIndex = reader->ReadInt32();
763+
Timestamp = reader->ReadInt64(); // TODO: use timestamp?
764+
MaxNumOffsets = reader->ReadInt32();
765+
}
766+
767+
void TReqListOffsetsTopic::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
768+
{
769+
Name = reader->ReadString();
770+
Partitions.resize(reader->ReadInt32());
771+
for (auto& partition : Partitions) {
772+
partition.Deserialize(reader, apiVersion);
773+
}
774+
}
775+
776+
void TReqListOffsets::Deserialize(IKafkaProtocolReader* reader, int apiVersion)
777+
{
778+
ReplicaId = reader->ReadInt32();
779+
Topics.resize(reader->ReadInt32());
780+
for (auto& topic : Topics) {
781+
topic.Deserialize(reader, apiVersion);
782+
}
783+
}
784+
785+
void TRspListOffsetsTopicPartition::Serialize(IKafkaProtocolWriter* writer, int /*apiVersion*/) const
786+
{
787+
writer->WriteInt32(PartitionIndex);
788+
writer->WriteErrorCode(ErrorCode);
789+
writer->WriteInt64(Offset);
790+
}
791+
792+
void TRspListOffsetsTopic::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
793+
{
794+
writer->WriteString(Name);
795+
writer->WriteInt32(Partitions.size());
796+
for (const auto& partition : Partitions) {
797+
partition.Serialize(writer, apiVersion);
798+
}
799+
}
800+
801+
void TRspListOffsets::Serialize(IKafkaProtocolWriter* writer, int apiVersion) const
802+
{
803+
writer->WriteInt32(Topics.size());
804+
for (const auto& topic : Topics) {
805+
topic.Serialize(writer, apiVersion);
806+
}
807+
}
808+
809+
////////////////////////////////////////////////////////////////////////////////
810+
811+
812+
760813
} // namespace NYT::NKafka

yt/yt/client/kafka/requests.h

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,71 @@ struct TRspProduce
624624

625625
////////////////////////////////////////////////////////////////////////////////
626626

627+
struct TReqListOffsetsTopicPartition
628+
{
629+
i32 PartitionIndex = 0;
630+
i64 Timestamp = 0;
631+
i32 MaxNumOffsets = 0;
632+
633+
std::vector<TTaggedField> TagBuffer;
634+
635+
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
636+
};
637+
638+
struct TReqListOffsetsTopic
639+
{
640+
TString Name;
641+
std::vector<TReqListOffsetsTopicPartition> Partitions;
642+
643+
std::vector<TTaggedField> TagBuffer;
644+
645+
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
646+
};
647+
648+
struct TReqListOffsets
649+
{
650+
static constexpr ERequestType RequestType = ERequestType::ListOffsets;
651+
652+
i32 ReplicaId = 0;
653+
std::vector<TReqListOffsetsTopic> Topics;
654+
655+
std::vector<TTaggedField> TagBuffer;
656+
657+
void Deserialize(IKafkaProtocolReader* reader, int apiVersion);
658+
};
659+
660+
struct TRspListOffsetsTopicPartition
661+
{
662+
i32 PartitionIndex = 0;
663+
NKafka::EErrorCode ErrorCode = EErrorCode::None;
664+
i64 Offset = 0;
665+
666+
std::vector<TTaggedField> TagBuffer;
667+
668+
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
669+
};
670+
671+
struct TRspListOffsetsTopic
672+
{
673+
TString Name;
674+
std::vector<TRspListOffsetsTopicPartition> Partitions;
675+
676+
std::vector<TTaggedField> TagBuffer;
677+
678+
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
679+
};
680+
681+
struct TRspListOffsets
682+
{
683+
std::vector<TRspListOffsetsTopic> Topics;
684+
685+
std::vector<TTaggedField> TagBuffer;
686+
687+
void Serialize(IKafkaProtocolWriter* writer, int apiVersion) const;
688+
};
689+
690+
////////////////////////////////////////////////////////////////////////////////
691+
627692
} // namespace NYT::NKafka
628693

629694
#define REQUESTS_INL_H_

0 commit comments

Comments
 (0)