Skip to content

Commit ae85998

Browse files
committed
fix
1 parent 607a6d6 commit ae85998

File tree

1 file changed

+23
-11
lines changed

1 file changed

+23
-11
lines changed

ydb/services/persqueue_v1/actors/read_session_actor.cpp

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,25 +1720,36 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvRead::TPtr&
17201720
ProcessReads(ctx);
17211721
}
17221722

1723+
17231724
template <typename TServerMessage>
17241725
i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
17251726
constexpr bool UseMigrationProtocol = std::is_same_v<TServerMessage, PersQueue::V1::MigrationStreamingReadServerMessage>;
17261727

1727-
auto* partition_data = UseMigrationProtocol
1728-
? resp.mutable_data_batch()->mutable_partition_data(0)
1729-
: resp.mutable_read_response()->mutable_partition_data(0);
1730-
1731-
Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0);
1732-
1733-
for (auto& batch : *partition_data->mutable_batches()) {
1734-
if (batch.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) {
1735-
batch.set_codec(Ydb::PersQueue::V1::CODEC_RAW);
1728+
if constexpr (UseMigrationProtocol) {
1729+
auto* partition_data = resp.mutable_data_batch()->mutable_partition_data(0);
1730+
Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0);
1731+
1732+
// Проходим по всем батчам и устанавливаем codec в 0, если он равен -1
1733+
for (auto& batch : *partition_data->mutable_batches()) {
1734+
for (auto& message_data : *batch.mutable_message_data()) {
1735+
if (message_data.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) {
1736+
message_data.set_codec(Ydb::PersQueue::V1::CODEC_RAW);
1737+
}
1738+
}
17361739
}
1737-
}
17381740

1739-
if constexpr (UseMigrationProtocol) {
17401741
Response.mutable_data_batch()->add_partition_data()->Swap(partition_data);
17411742
} else {
1743+
auto* partition_data = resp.mutable_read_response()->mutable_partition_data(0);
1744+
Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0);
1745+
1746+
// Проходим по всем батчам и устанавливаем codec в 0, если он равен -1
1747+
for (auto& batch : *partition_data->mutable_batches()) {
1748+
if (batch.codec() == Ydb::Topic::CODEC_UNSPECIFIED) {
1749+
batch.set_codec(Ydb::Topic::CODEC_RAW);
1750+
}
1751+
}
1752+
17421753
Response.mutable_read_response()->add_partition_data()->Swap(partition_data);
17431754
}
17441755

@@ -1750,6 +1761,7 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
17501761
}
17511762

17521763

1764+
17531765
template <typename TServerMessage>
17541766
i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) {
17551767

0 commit comments

Comments
 (0)