From 607a6d69b25e56bfa33328ccbe69dc0b35c46978 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Thu, 10 Oct 2024 08:32:42 +0000 Subject: [PATCH 1/7] Replace topic to raw in pq_v1 and topics --- .../actors/read_session_actor.cpp | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 105f707c1c37..b3db0271fe33 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -1724,12 +1724,22 @@ template i64 TFormedReadResponse::ApplyResponse(TServerMessage&& resp) { constexpr bool UseMigrationProtocol = std::is_same_v; + auto* partition_data = UseMigrationProtocol + ? resp.mutable_data_batch()->mutable_partition_data(0) + : resp.mutable_read_response()->mutable_partition_data(0); + + Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0); + + for (auto& batch : *partition_data->mutable_batches()) { + if (batch.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) { + batch.set_codec(Ydb::PersQueue::V1::CODEC_RAW); + } + } + if constexpr (UseMigrationProtocol) { - Y_ABORT_UNLESS(resp.data_batch().partition_data_size() == 1); - Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0)); + Response.mutable_data_batch()->add_partition_data()->Swap(partition_data); } else { - Y_ABORT_UNLESS(resp.read_response().partition_data_size() == 1); - Response.mutable_read_response()->add_partition_data()->Swap(resp.mutable_read_response()->mutable_partition_data(0)); + Response.mutable_read_response()->add_partition_data()->Swap(partition_data); } Response.set_status(Ydb::StatusIds::SUCCESS); @@ -1739,6 +1749,7 @@ i64 TFormedReadResponse::ApplyResponse(TServerMessage&& resp) { return ByteSize - prev; } + template i64 TFormedReadResponse::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) { From ae85998fe82e0a2874e4fe347583d2a19a238597 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Thu, 10 Oct 2024 09:29:54 +0000 Subject: [PATCH 2/7] fix --- .../actors/read_session_actor.cpp | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index b3db0271fe33..0b8315cb38a3 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -1720,25 +1720,36 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvRead::TPtr& ProcessReads(ctx); } + template i64 TFormedReadResponse::ApplyResponse(TServerMessage&& resp) { constexpr bool UseMigrationProtocol = std::is_same_v; - auto* partition_data = UseMigrationProtocol - ? resp.mutable_data_batch()->mutable_partition_data(0) - : resp.mutable_read_response()->mutable_partition_data(0); - - Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0); - - for (auto& batch : *partition_data->mutable_batches()) { - if (batch.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) { - batch.set_codec(Ydb::PersQueue::V1::CODEC_RAW); + if constexpr (UseMigrationProtocol) { + auto* partition_data = resp.mutable_data_batch()->mutable_partition_data(0); + Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0); + + // Проходим по всем батчам и устанавливаем codec в 0, если он равен -1 + for (auto& batch : *partition_data->mutable_batches()) { + for (auto& message_data : *batch.mutable_message_data()) { + if (message_data.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) { + message_data.set_codec(Ydb::PersQueue::V1::CODEC_RAW); + } + } } - } - if constexpr (UseMigrationProtocol) { Response.mutable_data_batch()->add_partition_data()->Swap(partition_data); } else { + auto* partition_data = resp.mutable_read_response()->mutable_partition_data(0); + Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0); + + // Проходим по всем батчам и устанавливаем codec в 0, если он равен -1 + for (auto& batch : *partition_data->mutable_batches()) { + if (batch.codec() == Ydb::Topic::CODEC_UNSPECIFIED) { + batch.set_codec(Ydb::Topic::CODEC_RAW); + } + } + Response.mutable_read_response()->add_partition_data()->Swap(partition_data); } @@ -1750,6 +1761,7 @@ i64 TFormedReadResponse::ApplyResponse(TServerMessage&& resp) { } + template i64 TFormedReadResponse::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) { From b2cb6b082ba5a757ddcf5160502aabe8bf41f2b4 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 14 Oct 2024 06:47:47 +0000 Subject: [PATCH 3/7] fix --- ydb/services/persqueue_v1/actors/read_session_actor.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 0b8315cb38a3..8818543cc820 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -1729,7 +1729,6 @@ i64 TFormedReadResponse::ApplyResponse(TServerMessage&& resp) { auto* partition_data = resp.mutable_data_batch()->mutable_partition_data(0); Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0); - // Проходим по всем батчам и устанавливаем codec в 0, если он равен -1 for (auto& batch : *partition_data->mutable_batches()) { for (auto& message_data : *batch.mutable_message_data()) { if (message_data.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) { @@ -1743,7 +1742,6 @@ i64 TFormedReadResponse::ApplyResponse(TServerMessage&& resp) { auto* partition_data = resp.mutable_read_response()->mutable_partition_data(0); Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0); - // Проходим по всем батчам и устанавливаем codec в 0, если он равен -1 for (auto& batch : *partition_data->mutable_batches()) { if (batch.codec() == Ydb::Topic::CODEC_UNSPECIFIED) { batch.set_codec(Ydb::Topic::CODEC_RAW); From c7b34602aa69234600c15e0ed0cb613d3d9e9703 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 14 Oct 2024 07:35:16 +0000 Subject: [PATCH 4/7] fix --- .../persqueue_v1/actors/partition_actor.cpp | 5 ++++ .../actors/read_session_actor.cpp | 29 +++---------------- 2 files changed, 9 insertions(+), 25 deletions(-) diff --git a/ydb/services/persqueue_v1/actors/partition_actor.cpp b/ydb/services/persqueue_v1/actors/partition_actor.cpp index 326571da03ec..ad926c590c2c 100644 --- a/ydb/services/persqueue_v1/actors/partition_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_actor.cpp @@ -419,6 +419,11 @@ bool FillBatchedData( hasOffset = true; auto proto(GetDeserializedData(r.GetData())); + + if (!proto.has_codec()) { + proto.set_codec(NPersQueueCommon::RAW); + } + if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) { continue; //TODO - no such chunks must be on prod } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.cpp b/ydb/services/persqueue_v1/actors/read_session_actor.cpp index 8818543cc820..71b6c9a2e29e 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.cpp @@ -1720,35 +1720,16 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvRead::TPtr& ProcessReads(ctx); } - template i64 TFormedReadResponse::ApplyResponse(TServerMessage&& resp) { constexpr bool UseMigrationProtocol = std::is_same_v; if constexpr (UseMigrationProtocol) { - auto* partition_data = resp.mutable_data_batch()->mutable_partition_data(0); - Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0); - - for (auto& batch : *partition_data->mutable_batches()) { - for (auto& message_data : *batch.mutable_message_data()) { - if (message_data.codec() == Ydb::PersQueue::V1::CODEC_UNSPECIFIED) { - message_data.set_codec(Ydb::PersQueue::V1::CODEC_RAW); - } - } - } - - Response.mutable_data_batch()->add_partition_data()->Swap(partition_data); + Y_ABORT_UNLESS(resp.data_batch().partition_data_size() == 1); + Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0)); } else { - auto* partition_data = resp.mutable_read_response()->mutable_partition_data(0); - Y_ABORT_UNLESS(partition_data != nullptr && partition_data->batches_size() > 0); - - for (auto& batch : *partition_data->mutable_batches()) { - if (batch.codec() == Ydb::Topic::CODEC_UNSPECIFIED) { - batch.set_codec(Ydb::Topic::CODEC_RAW); - } - } - - Response.mutable_read_response()->add_partition_data()->Swap(partition_data); + Y_ABORT_UNLESS(resp.read_response().partition_data_size() == 1); + Response.mutable_read_response()->add_partition_data()->Swap(resp.mutable_read_response()->mutable_partition_data(0)); } Response.set_status(Ydb::StatusIds::SUCCESS); @@ -1759,7 +1740,6 @@ i64 TFormedReadResponse::ApplyResponse(TServerMessage&& resp) { } - template i64 TFormedReadResponse::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) { @@ -1776,7 +1756,6 @@ i64 TFormedReadResponse::ApplyDirectReadResponse(TEvPQProxy::TEv return diff; } - template void TReadSessionActor::Handle(typename TEvReadResponse::TPtr& ev, const TActorContext& ctx) { if (!ActualPartitionActors.contains(ev->Sender)) { From 8de431e52eefbbaf581d93c07c7773edbbf6f0a5 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 14 Oct 2024 08:21:02 +0000 Subject: [PATCH 5/7] fix --- ydb/core/persqueue/dread_cache_service/caching_service.cpp | 4 ++++ ydb/services/datastreams/datastreams_proxy.cpp | 7 +++++-- .../deprecated/persqueue_v0/grpc_pq_read_actor.cpp | 5 +++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/ydb/core/persqueue/dread_cache_service/caching_service.cpp b/ydb/core/persqueue/dread_cache_service/caching_service.cpp index 6f8951df4aa0..7181e7161385 100644 --- a/ydb/core/persqueue/dread_cache_service/caching_service.cpp +++ b/ydb/core/persqueue/dread_cache_service/caching_service.cpp @@ -475,6 +475,10 @@ class TPQDirectReadCacheService : public TActorBootstrappedset_approximate_arrival_timestamp(r.GetCreateTimestampMS()); record->set_partition_key(r.GetPartitionKey()); record->set_sequence_number(std::to_string(r.GetOffset()).c_str()); - if (proto.GetCodec() > 0) { - record->set_codec(proto.GetCodec() + 1); + + if (!proto.has_codec()) { + proto.set_codec(NPersQueueCommon::RAW); } + + record->set_codec(proto.GetCodec() + 1); } if (!results.empty()) { auto last = results.rbegin(); diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp index eea03a63e2c8..7513bc64db14 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp @@ -2137,6 +2137,11 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) { continue; //TODO - no such chunks must be on prod } + + if (!proto.has_codec()) { + proto.set_codec(NPersQueueCommon::RAW); + } + TString sourceId = ""; if (!r.GetSourceId().empty()) { if (!NPQ::NSourceIdEncoding::IsValidEncoded(r.GetSourceId())) { From 6d70f34928e3a432555aae6298dc111507aa36c7 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 14 Oct 2024 09:50:26 +0000 Subject: [PATCH 6/7] fix --- ydb/services/datastreams/datastreams_proxy.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 1665ad48511a..bf2b999fe059 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -1665,8 +1665,8 @@ namespace NKikimr::NDataStreams::V1 { record->set_partition_key(r.GetPartitionKey()); record->set_sequence_number(std::to_string(r.GetOffset()).c_str()); - if (!proto.has_codec()) { - proto.set_codec(NPersQueueCommon::RAW); + if (proto.GetCodec() > 0) { + record->set_codec(proto.GetCodec() + 1); } record->set_codec(proto.GetCodec() + 1); From e7e0197ef8ba55ffbca681950e9232c622755180 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Mon, 14 Oct 2024 09:51:42 +0000 Subject: [PATCH 7/7] fix --- ydb/services/datastreams/datastreams_proxy.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index bf2b999fe059..48277a3c5ff0 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -1664,12 +1664,9 @@ namespace NKikimr::NDataStreams::V1 { record->set_approximate_arrival_timestamp(r.GetCreateTimestampMS()); record->set_partition_key(r.GetPartitionKey()); record->set_sequence_number(std::to_string(r.GetOffset()).c_str()); - if (proto.GetCodec() > 0) { record->set_codec(proto.GetCodec() + 1); } - - record->set_codec(proto.GetCodec() + 1); } if (!results.empty()) { auto last = results.rbegin();