Skip to content

Commit cead838

Browse files
authored
Rename TChangeRecordBuilderContextTrait to TSerializationContext, move it inside TChangeRecord (#9178)
1 parent 13965ec commit cead838

File tree

5 files changed

+42
-63
lines changed

5 files changed

+42
-63
lines changed

ydb/core/backup/impl/table_writer_impl.h

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,6 @@
77
#include <ydb/core/change_exchange/change_record.h>
88
#include <ydb/core/protos/change_exchange.pb.h>
99

10-
namespace NKikimr {
11-
12-
namespace NBackup::NImpl {
13-
14-
class TChangeRecord;
15-
16-
} // namespace NBackup::NImpl
17-
18-
template <>
19-
struct TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> {
20-
NBackup::NImpl::EWriterType Type;
21-
22-
TChangeRecordBuilderContextTrait(NBackup::NImpl::EWriterType type)
23-
: Type(type)
24-
{}
25-
26-
// just copy type
27-
TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord>& other) = default;
28-
};
29-
30-
} // namespace NKikimr
31-
3210
namespace NKikimr::NBackup::NImpl {
3311

3412
class TChangeRecordBuilder;
@@ -40,6 +18,17 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
4018
using TPtr = TIntrusivePtr<TChangeRecord>;
4119
using TBuilder = TChangeRecordBuilder;
4220

21+
struct TSerializationContext {
22+
const NBackup::NImpl::EWriterType Type;
23+
24+
TSerializationContext(NBackup::NImpl::EWriterType type)
25+
: Type(type)
26+
{}
27+
28+
// just copy type
29+
TSerializationContext(const TSerializationContext& other) = default;
30+
};
31+
4332
const static NKikimrSchemeOp::ECdcStreamFormat StreamType = NKikimrSchemeOp::ECdcStreamFormatProto;
4433

4534
ui64 GetGroup() const override {
@@ -58,10 +47,7 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
5847
return SourceId;
5948
}
6049

61-
void Serialize(
62-
NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record,
63-
TChangeRecordBuilderContextTrait<TChangeRecord>& ctx) const
64-
{
50+
void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record, TSerializationContext& ctx) const {
6551
switch (ctx.Type) {
6652
case EWriterType::Backup:
6753
return SerializeBackup(record);

ydb/core/backup/impl/table_writer_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ Y_UNIT_TEST_SUITE(TableWriter) {
4848
.Build();
4949

5050
NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
51-
TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> ctx(EWriterType::Backup);
51+
NBackup::NImpl::TChangeRecord::TSerializationContext ctx(EWriterType::Backup);
5252
record->Serialize(result, ctx);
5353

5454
TVector<TCell> outCells{
@@ -89,7 +89,7 @@ Y_UNIT_TEST_SUITE(TableWriter) {
8989
.Build();
9090

9191
NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
92-
TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> ctx(EWriterType::Backup);
92+
NBackup::NImpl::TChangeRecord::TSerializationContext ctx(EWriterType::Backup);
9393
record->Serialize(result, ctx);
9494

9595
TVector<TCell> outCells{
@@ -144,7 +144,7 @@ Y_UNIT_TEST_SUITE(TableWriter) {
144144
.Build();
145145

146146
NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
147-
TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> ctx(EWriterType::Restore);
147+
NBackup::NImpl::TChangeRecord::TSerializationContext ctx(EWriterType::Restore);
148148
record->Serialize(result, ctx);
149149

150150
UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey());

ydb/core/change_exchange/change_record.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,6 @@
44
#include <util/generic/string.h>
55
#include <util/stream/output.h>
66

7-
namespace NKikimr {
8-
9-
template <typename TChangeRecord>
10-
struct TChangeRecordBuilderContextTrait {};
11-
12-
} // namespace NKikimr
13-
147
namespace NKikimr::NChangeExchange {
158

169
class IChangeSenderResolver;

ydb/core/tx/replication/service/json_change_record.h

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,23 @@ class TChangeRecordBuilder;
2727

2828
class TChangeRecord: public NChangeExchange::TChangeRecordBase {
2929
friend class TChangeRecordBuilder;
30-
using TSerializationContext = TChangeRecordBuilderContextTrait<TChangeRecord>;
3130

3231
public:
3332
using TPtr = TIntrusivePtr<TChangeRecord>;
3433
using TBuilder = TChangeRecordBuilder;
3534

35+
struct TSerializationContext {
36+
TMemoryPool MemoryPool;
37+
38+
TSerializationContext()
39+
: MemoryPool(256)
40+
{}
41+
42+
TSerializationContext(const TSerializationContext&)
43+
: MemoryPool(256) // do not preserve any state between writers, just construct new one.
44+
{}
45+
};
46+
3647
const static NKikimrSchemeOp::ECdcStreamFormat StreamType = NKikimrSchemeOp::ECdcStreamFormatJson;
3748

3849
ui64 GetGroup() const override;
@@ -89,20 +100,6 @@ struct TChangeRecordContainer<NReplication::NService::TChangeRecord>
89100
using TBaseChangeRecordContainer<NReplication::NService::TChangeRecord>::TBaseChangeRecordContainer;
90101
};
91102

92-
template <>
93-
struct TChangeRecordBuilderContextTrait<NReplication::NService::TChangeRecord> {
94-
TMemoryPool MemoryPool;
95-
96-
TChangeRecordBuilderContextTrait()
97-
: MemoryPool(256)
98-
{}
99-
100-
// do not preserve any state between writers, just construct new one.
101-
TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait<NReplication::NService::TChangeRecord>&)
102-
: MemoryPool(256)
103-
{}
104-
};
105-
106103
}
107104

108105
Y_DECLARE_OUT_SPEC(inline, NKikimr::NReplication::NService::TChangeRecord, out, value) {

ydb/core/tx/replication/service/table_writer_impl.h

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222

2323
namespace NKikimr::NReplication::NService {
2424

25+
template <typename TChangeRecord>
26+
using TSerializationContext = typename TChangeRecord::TSerializationContext;
27+
28+
template <typename TChangeRecord>
29+
using TBuilder = typename TChangeRecord::TBuilder;
30+
2531
template <typename TChangeRecord>
2632
class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TChangeRecord>> {
2733
using TBase = TActorBootstrapped<TTablePartitionWriter<TChangeRecord>>;
@@ -85,7 +91,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh
8591

8692
for (auto recordPtr : ev->Get()->GetRecords<TChangeRecord>()) {
8793
const auto& record = *recordPtr;
88-
record.Serialize(*event->Record.AddChanges(), BuilderContext);
94+
record.Serialize(*event->Record.AddChanges(), SerializationContext);
8995

9096
if (!source) {
9197
source = record.GetSourceId();
@@ -180,11 +186,11 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh
180186
const TActorId& parent,
181187
ui64 tabletId,
182188
const TTableId& tableId,
183-
TChangeRecordBuilderContextTrait<TChangeRecord> builderContext)
189+
const TSerializationContext<TChangeRecord>& ctx)
184190
: Parent(parent)
185191
, TabletId(tabletId)
186192
, TableId(tableId)
187-
, BuilderContext(builderContext)
193+
, SerializationContext(ctx)
188194
{}
189195

190196
void Bootstrap() {
@@ -207,7 +213,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter<TCh
207213

208214
TActorId LeaderPipeCache;
209215
ui64 SubscribeCookie = 0;
210-
TChangeRecordBuilderContextTrait<TChangeRecord> BuilderContext;
216+
TSerializationContext<TChangeRecord> SerializationContext;
211217

212218
}; // TTablePartitionWriter
213219

@@ -435,11 +441,8 @@ class TLocalTableWriter
435441
}
436442

437443
IActor* CreateSender(ui64 partitionId) const override {
438-
return new TTablePartitionWriter<TChangeRecord>(
439-
this->SelfId(),
440-
partitionId,
441-
TTableId(TablePathId, Schema->Version),
442-
BuilderContext);
444+
const auto tableId = TTableId(TablePathId, Schema->Version);
445+
return new TTablePartitionWriter<TChangeRecord>(this->SelfId(), partitionId, tableId, SerializationContext);
443446
}
444447

445448
void Handle(TEvWorker::TEvData::TPtr& ev) {
@@ -450,7 +453,7 @@ class TLocalTableWriter
450453

451454
for (auto& record : ev->Get()->Records) {
452455
records.emplace_back(record.Offset, TablePathId, record.Data.size());
453-
auto res = PendingRecords.emplace(record.Offset, typename TChangeRecord::TBuilder()
456+
auto res = PendingRecords.emplace(record.Offset, TBuilder<TChangeRecord>()
454457
.WithSourceId(ev->Get()->Source)
455458
.WithOrder(record.Offset)
456459
.WithBody(std::move(record.Data))
@@ -531,7 +534,7 @@ class TLocalTableWriter
531534
: TBase(&TThis::StateWork)
532535
, TBaseSender(this, this, this, this, TActorId())
533536
, TablePathId(tablePathId)
534-
, BuilderContext(std::forward<TArgs>(args)...)
537+
, SerializationContext(std::forward<TArgs>(args)...)
535538
{
536539
}
537540

@@ -557,7 +560,7 @@ class TLocalTableWriter
557560
private:
558561
mutable TMaybe<TString> LogPrefix;
559562
const TPathId TablePathId;
560-
TChangeRecordBuilderContextTrait<TChangeRecord> BuilderContext;
563+
TSerializationContext<TChangeRecord> SerializationContext;
561564

562565
TActorId Worker;
563566
ui64 TableVersion = 0;

0 commit comments

Comments
 (0)