@@ -33,9 +33,9 @@ void TKafkaListOffsetsActor::SendOffsetsRequests(const NActors::TActorContext& c
33
33
for (size_t i = 0 ; i < ListOffsetsRequestData->Topics .size (); ++i) {
34
34
auto &requestTopic = ListOffsetsRequestData->Topics [i];
35
35
auto &responseTopic = ListOffsetsResponseData->Topics [i];
36
-
36
+
37
37
responseTopic = TListOffsetsResponseData::TListOffsetsTopicResponse{};
38
-
38
+
39
39
if (!requestTopic.Name .has_value ()) {
40
40
HandleMissingTopicName (requestTopic, responseTopic);
41
41
continue ;
@@ -44,7 +44,7 @@ void TKafkaListOffsetsActor::SendOffsetsRequests(const NActors::TActorContext& c
44
44
responseTopic.Name = requestTopic.Name ;
45
45
TTopicRequestInfo topicRequestInfo;
46
46
topicRequestInfo.TopicIndex = i;
47
-
47
+
48
48
for (auto & partition: requestTopic.Partitions ) {
49
49
topicRequestInfo.Partitions .push_back (TPartitionRequestInfo{.PartitionId = partition.PartitionIndex , .Timestamp = partition.Timestamp });
50
50
}
@@ -65,7 +65,7 @@ void TKafkaListOffsetsActor::HandleMissingTopicName(const TListOffsetsRequestDat
65
65
66
66
TActorId TKafkaListOffsetsActor::SendOffsetsRequest (const TListOffsetsRequestData::TListOffsetsTopic& topic, const NActors::TActorContext&) {
67
67
KAFKA_LOG_D (" ListOffsets actor: Get offsets for topic '" << topic.Name << " ' for user '" << Context->UserToken ->GetUserSID () << " '" );
68
-
68
+
69
69
TEvKafka::TGetOffsetsRequest offsetsRequest;
70
70
offsetsRequest.Topic = NormalizePath (Context->DatabasePath , topic.Name .value ());
71
71
offsetsRequest.Token = Context->UserToken ->GetSerializedToken ();
@@ -83,7 +83,7 @@ void TKafkaListOffsetsActor::Handle(TEvKafka::TEvTopicOffsetsResponse::TPtr& ev,
83
83
--PendingResponses;
84
84
auto it = TopicsRequestsInfo.find (ev->Sender );
85
85
86
- Y_DEBUG_ABORT_UNLESS (it != TopicsRequestsInfo.end ());
86
+ Y_DEBUG_ABORT_UNLESS (it != TopicsRequestsInfo.end ());
87
87
if (it == TopicsRequestsInfo.end ()) {
88
88
KAFKA_LOG_CRIT (" ListOffsets actor: received unexpected TEvTopicOffsetsResponse. Ignoring." );
89
89
return RespondIfRequired (ctx);
@@ -110,15 +110,15 @@ void TKafkaListOffsetsActor::Handle(TEvKafka::TEvTopicOffsetsResponse::TPtr& ev,
110
110
auto & responseFromPQPartition = it->second ;
111
111
responsePartition.LeaderEpoch = responseFromPQPartition.Generation ;
112
112
responsePartition.Timestamp = TIMESTAMP_DEFAULT_RESPONSE_VALUE;
113
-
113
+
114
114
if (partitionRequestInfo.Timestamp == TIMESTAMP_START_OFFSET) {
115
115
responsePartition.Offset = responseFromPQPartition.StartOffset ;
116
116
responsePartition.ErrorCode = NONE_ERROR;
117
117
} else if (partitionRequestInfo.Timestamp == TIMESTAMP_END_OFFSET) {
118
118
responsePartition.Offset = responseFromPQPartition.EndOffset ;
119
119
responsePartition.ErrorCode = NONE_ERROR;
120
120
} else {
121
- responsePartition.ErrorCode = INVALID_REQUEST; // FIXME(savnik): handle it
121
+ responsePartition.ErrorCode = INVALID_REQUEST; // FIXME(savnik): handle
122
122
ErrorCode = INVALID_REQUEST;
123
123
}
124
124
} else {
0 commit comments