Skip to content

Commit 8de431e

Browse files
committed
fix
1 parent c7b3460 commit 8de431e

File tree

3 files changed

+14
-2
lines changed

3 files changed

+14
-2
lines changed

ydb/core/persqueue/dread_cache_service/caching_service.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,10 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
475475
continue; //TODO - no such chunks must be on prod
476476
}
477477

478+
if (!proto.has_codec()) {
479+
proto.set_codec(NPersQueueCommon::RAW);
480+
}
481+
478482
TString sourceId;
479483
if (!r.GetSourceId().empty()) {
480484
sourceId = NPQ::NSourceIdEncoding::Decode(r.GetSourceId());

ydb/services/datastreams/datastreams_proxy.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,9 +1664,12 @@ namespace NKikimr::NDataStreams::V1 {
16641664
record->set_approximate_arrival_timestamp(r.GetCreateTimestampMS());
16651665
record->set_partition_key(r.GetPartitionKey());
16661666
record->set_sequence_number(std::to_string(r.GetOffset()).c_str());
1667-
if (proto.GetCodec() > 0) {
1668-
record->set_codec(proto.GetCodec() + 1);
1667+
1668+
if (!proto.has_codec()) {
1669+
proto.set_codec(NPersQueueCommon::RAW);
16691670
}
1671+
1672+
record->set_codec(proto.GetCodec() + 1);
16701673
}
16711674
if (!results.empty()) {
16721675
auto last = results.rbegin();

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2137,6 +2137,11 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
21372137
if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) {
21382138
continue; //TODO - no such chunks must be on prod
21392139
}
2140+
2141+
if (!proto.has_codec()) {
2142+
proto.set_codec(NPersQueueCommon::RAW);
2143+
}
2144+
21402145
TString sourceId = "";
21412146
if (!r.GetSourceId().empty()) {
21422147
if (!NPQ::NSourceIdEncoding::IsValidEncoded(r.GetSourceId())) {

0 commit comments

Comments
 (0)