Skip to content

Commit c7b3460

Browse files
committed
fix
1 parent b2cb6b0 commit c7b3460

File tree

2 files changed

+9
-25
lines changed

2 files changed

+9
-25
lines changed

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,11 @@ bool FillBatchedData(
419419
hasOffset = true;
420420

421421
auto proto(GetDeserializedData(r.GetData()));
422+
423+
if (!proto.has_codec()) {
424+
proto.set_codec(NPersQueueCommon::RAW);
425+
}
426+
422427
if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) {
423428
continue; //TODO - no such chunks must be on prod
424429
}

ydb/services/persqueue_v1/actors/read_session_actor.cpp

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,35 +1720,16 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvRead::TPtr&
17201720
ProcessReads(ctx);
17211721
}
17221722

1723-
17241723
template <typename TServerMessage>
17251724
i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
17261725
constexpr bool UseMigrationProtocol = std::is_same_v<TServerMessage, PersQueue::V1::MigrationStreamingReadServerMessage>;
17271726

17281727
if constexpr (UseMigrationProtocol) {
1729-
auto* partition_data = resp.mutable_data_batch()->mutable_partition_data(0);
1730-
Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0);
1731-
1732-
for (auto& batch : *partition_data->mutable_batches()) {
1733-
for (auto& message_data : *batch.mutable_message_data()) {
1734-
if (message_data.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) {
1735-
message_data.set_codec(Ydb::PersQueue::V1::CODEC_RAW);
1736-
}
1737-
}
1738-
}
1739-
1740-
Response.mutable_data_batch()->add_partition_data()->Swap(partition_data);
1728+
Y_ABORT_UNLESS(resp.data_batch().partition_data_size() == 1);
1729+
Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0));
17411730
} else {
1742-
auto* partition_data = resp.mutable_read_response()->mutable_partition_data(0);
1743-
Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0);
1744-
1745-
for (auto& batch : *partition_data->mutable_batches()) {
1746-
if (batch.codec() == Ydb::Topic::CODEC_UNSPECIFIED) {
1747-
batch.set_codec(Ydb::Topic::CODEC_RAW);
1748-
}
1749-
}
1750-
1751-
Response.mutable_read_response()->add_partition_data()->Swap(partition_data);
1731+
Y_ABORT_UNLESS(resp.read_response().partition_data_size() == 1);
1732+
Response.mutable_read_response()->add_partition_data()->Swap(resp.mutable_read_response()->mutable_partition_data(0));
17521733
}
17531734

17541735
Response.set_status(Ydb::StatusIds::SUCCESS);
@@ -1759,7 +1740,6 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
17591740
}
17601741

17611742

1762-
17631743
template <typename TServerMessage>
17641744
i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) {
17651745

@@ -1776,7 +1756,6 @@ i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEv
17761756
return diff;
17771757
}
17781758

1779-
17801759
template <bool UseMigrationProtocol>
17811760
void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::TPtr& ev, const TActorContext& ctx) {
17821761
if (!ActualPartitionActors.contains(ev->Sender)) {

0 commit comments

Comments
 (0)