Skip to content

Commit f38e36b

Browse files
ildar-khisambeevGazizonoki
authored andcommitted
Moved "provide codecs via singleton map" commit from ydb repo
1 parent b8d3bc8 commit f38e36b

File tree

8 files changed

+70
-15
lines changed

8 files changed

+70
-15
lines changed

src/client/persqueue_core/impl/persqueue.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,15 @@
1111

1212
namespace NYdb::NPersQueue {
1313

14+
class TCommonCodecsProvider {
15+
public:
16+
TCommonCodecsProvider() {
17+
NYdb::NTopic::TCodecMap::GetTheCodecMap().Set((ui32)NYdb::NPersQueue::ECodec::GZIP, MakeHolder<NYdb::NTopic::TGzipCodec>());
18+
NYdb::NTopic::TCodecMap::GetTheCodecMap().Set((ui32)NYdb::NPersQueue::ECodec::ZSTD, MakeHolder<NYdb::NTopic::TZstdCodec>());
19+
}
20+
};
21+
TCommonCodecsProvider COMMON_CODECS_PROVIDER;
22+
1423
const std::vector<ECodec>& GetDefaultCodecs() {
1524
static const std::vector<ECodec> codecs = {};
1625
return codecs;

src/client/persqueue_core/impl/read_session.ipp

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2556,23 +2556,19 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
25562556
&& data.codec() != Ydb::PersQueue::V1::CODEC_RAW
25572557
&& data.codec() != Ydb::PersQueue::V1::CODEC_UNSPECIFIED
25582558
) {
2559-
if (auto session = Parent->CbContext->LockShared()) {
2560-
const NYdb::NTopic::ICodec* codecImpl = session->GetCodecImplOrThrow(static_cast<ECodec>(data.codec()));
2561-
std::string decompressed = codecImpl->Decompress(data.data());
2562-
data.set_data(decompressed);
2563-
data.set_codec(Ydb::PersQueue::V1::CODEC_RAW);
2564-
}
2559+
const NYdb::NTopic::ICodec* codecImpl = NYdb::NTopic::TCodecMap::GetTheCodecMap().GetOrThrow(static_cast<ui32>(data.codec()));
2560+
std::string decompressed = codecImpl->Decompress(data.data());
2561+
data.set_data(decompressed);
2562+
data.set_codec(Ydb::PersQueue::V1::CODEC_RAW);
25652563
}
25662564
} else {
25672565
if (Parent->DoDecompress
25682566
&& static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_RAW
25692567
&& static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_UNSPECIFIED
25702568
) {
2571-
if (auto session = Parent->CbContext->LockShared()) {
2572-
const NYdb::NTopic::ICodec* codecImpl = session->GetCodecImplOrThrow(static_cast<NTopic::ECodec>(batch.codec()));
2573-
std::string decompressed = codecImpl->Decompress(data.data());
2574-
data.set_data(decompressed);
2575-
}
2569+
const NYdb::NTopic::ICodec* codecImpl = NYdb::NTopic::TCodecMap::GetTheCodecMap().GetOrThrow(static_cast<ui32>(batch.codec()));
2570+
std::string decompressed = codecImpl->Decompress(data.data());
2571+
data.set_data(decompressed);
25762572
}
25772573
}
25782574

src/client/persqueue_core/impl/write_session_impl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,8 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) {
906906

907907
TBuffer CompressBuffer(std::shared_ptr<TPersQueueClient::TImpl> client, std::vector<std::string_view>& data, ECodec codec, i32 level) {
908908
TBuffer result;
909-
THolder<IOutputStream> coder = client->GetCodecImplOrThrow(codec)->CreateCoder(result, level);
909+
Y_UNUSED(client);
910+
THolder<IOutputStream> coder = NYdb::NTopic::TCodecMap::GetTheCodecMap().GetOrThrow((ui32)codec)->CreateCoder(result, level);
910911
for (auto& buffer : data) {
911912
coder->Write(buffer.data(), buffer.size());
912913
}

src/client/topic/codecs/codecs.h

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
#include <ydb-cpp-sdk/util/stream/str.h>
88
#include <src/util/stream/zlib.h>
99
#include <ydb-cpp-sdk/util/stream/output.h>
10+
#include <ydb-cpp-sdk/util/system/spinlock.h>
1011

11-
#define CODECS_ALREADY_DEFINED
12+
#include <unordered_map>
1213

1314
namespace NYdb::NTopic {
1415

@@ -77,4 +78,42 @@ class TUnsupportedCodec final : public ICodec {
7778
}
7879
};
7980

81+
class TCodecMap {
82+
public:
83+
static TCodecMap& GetTheCodecMap() {
84+
static TCodecMap instance;
85+
return instance;
86+
}
87+
88+
void Set(ui32 codecId, THolder<ICodec>&& codecImpl) {
89+
with_lock(Lock) {
90+
Codecs[codecId] = std::move(codecImpl);
91+
}
92+
}
93+
94+
const ICodec* GetOrThrow(ui32 codecId) const {
95+
with_lock(Lock) {
96+
if (!Codecs.contains(codecId)) {
97+
throw yexception() << "codec with id " << ui32(codecId) << " not provided";
98+
}
99+
return Codecs.at(codecId).Get();
100+
}
101+
}
102+
103+
104+
TCodecMap(const TCodecMap&) = delete;
105+
TCodecMap(TCodecMap&&) = delete;
106+
TCodecMap& operator=(const TCodecMap&) = delete;
107+
TCodecMap& operator=(TCodecMap&&) = delete;
108+
109+
private:
110+
TCodecMap() = default;
111+
112+
private:
113+
std::unordered_map<ui32, THolder<ICodec>> Codecs;
114+
TAdaptiveLock Lock;
115+
};
116+
117+
#define CODEC_MAP_ALREADY_PROVIDED
118+
80119
} // namespace NYdb::NTopic

src/client/topic/impl/topic.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@
1212

1313
namespace NYdb::NTopic {
1414

15+
class TCommonCodecsProvider {
16+
public:
17+
TCommonCodecsProvider() {
18+
NYdb::NTopic::TCodecMap::GetTheCodecMap().Set((ui32)NYdb::NPersQueue::ECodec::GZIP, MakeHolder<NYdb::NTopic::TGzipCodec>());
19+
NYdb::NTopic::TCodecMap::GetTheCodecMap().Set((ui32)NYdb::NPersQueue::ECodec::ZSTD, MakeHolder<NYdb::NTopic::TZstdCodec>());
20+
}
21+
};
22+
TCommonCodecsProvider COMMON_CODECS_PROVIDER;
23+
1524
TDescribeTopicResult::TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result)
1625
: TStatus(std::move(status))
1726
, TopicDescription_(std::move(result))

src/client/topic/impl/write_session_impl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1096,7 +1096,8 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) {
10961096

10971097
TBuffer CompressBuffer(std::shared_ptr<TTopicClient::TImpl> client, std::vector<std::string_view>& data, ECodec codec, i32 level) {
10981098
TBuffer result;
1099-
THolder<IOutputStream> coder = client->GetCodecImplOrThrow(codec)->CreateCoder(result, level);
1099+
Y_UNUSED(client);
1100+
THolder<IOutputStream> coder = NYdb::NTopic::TCodecMap::GetTheCodecMap().GetOrThrow((ui32)codec)->CreateCoder(result, level);
11001101
for (auto& buffer : data) {
11011102
coder->Write(buffer.data(), buffer.size());
11021103
}

src/util/random/entropy.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <ydb-cpp-sdk/util/stream/mem.h>
1010
#include <src/util/stream/zlib.h>
1111
#include <src/util/stream/buffer.h>
12+
#include <src/util/stream/buffered.h>
1213

1314
#include <src/util/system/fs.h>
1415
#include <src/util/system/info.h>

src/util/stream/zlib.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
#include <ydb-cpp-sdk/util/stream/fwd.h>
44
#include <ydb-cpp-sdk/util/stream/input.h>
55
#include <ydb-cpp-sdk/util/stream/output.h>
6-
#include "buffered.h"
76

87
#include <ydb-cpp-sdk/util/system/defaults.h>
98
#include <ydb-cpp-sdk/util/generic/ptr.h>

0 commit comments

Comments
 (0)