@@ -1724,12 +1724,22 @@ template <typename TServerMessage>
1724
1724
i64 TFormedReadResponse<TServerMessage>::ApplyResponse (TServerMessage&& resp) {
1725
1725
constexpr bool UseMigrationProtocol = std::is_same_v<TServerMessage, PersQueue::V1::MigrationStreamingReadServerMessage>;
1726
1726
1727
+ auto * partition_data = UseMigrationProtocol
1728
+ ? resp.mutable_data_batch ()->mutable_partition_data (0 )
1729
+ : resp.mutable_read_response ()->mutable_partition_data (0 );
1730
+
1731
+ Y_ABORT_UNLESS (partition_data != nullptr && partition_data->batches_size () > 0 );
1732
+
1733
+ for (auto & batch : *partition_data->mutable_batches ()) {
1734
+ if (batch.codec () == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) {
1735
+ batch.set_codec (Ydb::PersQueue::V1::CODEC_RAW);
1736
+ }
1737
+ }
1738
+
1727
1739
if constexpr (UseMigrationProtocol) {
1728
- Y_ABORT_UNLESS (resp.data_batch ().partition_data_size () == 1 );
1729
- Response.mutable_data_batch ()->add_partition_data ()->Swap (resp.mutable_data_batch ()->mutable_partition_data (0 ));
1740
+ Response.mutable_data_batch ()->add_partition_data ()->Swap (partition_data);
1730
1741
} else {
1731
- Y_ABORT_UNLESS (resp.read_response ().partition_data_size () == 1 );
1732
- Response.mutable_read_response ()->add_partition_data ()->Swap (resp.mutable_read_response ()->mutable_partition_data (0 ));
1742
+ Response.mutable_read_response ()->add_partition_data ()->Swap (partition_data);
1733
1743
}
1734
1744
1735
1745
Response.set_status (Ydb::StatusIds::SUCCESS);
@@ -1739,6 +1749,7 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
1739
1749
return ByteSize - prev;
1740
1750
}
1741
1751
1752
+
1742
1753
template <typename TServerMessage>
1743
1754
i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse (TEvPQProxy::TEvDirectReadResponse::TPtr& ev) {
1744
1755
0 commit comments