Skip to content

Commit 7fc930f

Browse files
CyberROFLnshestakov
authored andcommitted
(refactoring) Basic TTopicMessage (#15381)
1 parent af0f7f8 commit 7fc930f

File tree

18 files changed

+172
-137
lines changed

18 files changed

+172
-137
lines changed

ydb/core/backup/impl/local_partition_reader.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#include "local_partition_reader.h"
22
#include "logging.h"
33

4-
#include <ydb/library/actors/core/actor.h>
5-
#include <ydb/library/services/services.pb.h>
6-
74
#include <ydb/core/persqueue/events/global.h>
85
#include <ydb/core/protos/grpc_pq_old.pb.h>
96
#include <ydb/core/tx/replication/service/worker.h>
7+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
8+
#include <ydb/library/actors/core/actor.h>
9+
#include <ydb/library/services/services.pb.h>
1010

1111
using namespace NActors;
1212
using namespace NKikimr::NReplication::NService;
@@ -131,11 +131,11 @@ class TLocalPartitionReader
131131
}
132132

133133
auto gotOffset = Offset;
134-
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(readResult.ResultSize()));
134+
TVector<NReplication::TTopicMessage> records(::Reserve(readResult.ResultSize()));
135135

136136
for (auto& result : readResult.GetResult()) {
137137
gotOffset = std::max(gotOffset, result.GetOffset());
138-
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo());
138+
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData());
139139
}
140140
SentOffset = gotOffset + 1;
141141

ydb/core/backup/impl/local_partition_reader_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ Y_UNIT_TEST_SUITE(LocalPartitionReader) {
4646
auto data = runtime.GrabEdgeEventRethrow<TEvWorker::TEvData>(handle);
4747
UNIT_ASSERT_VALUES_EQUAL(data->Source, PARTITION_STR);
4848
UNIT_ASSERT_VALUES_EQUAL(data->Records.size(), 2);
49-
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Offset, INITIAL_OFFSET + dataPatternCookie * 2);
50-
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].Data, Sprintf("1-%d", dataPatternCookie));
51-
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Offset, INITIAL_OFFSET + dataPatternCookie * 2 + 1);
52-
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].Data, Sprintf("2-%d", dataPatternCookie));
49+
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].GetOffset(), INITIAL_OFFSET + dataPatternCookie * 2);
50+
UNIT_ASSERT_VALUES_EQUAL(data->Records[0].GetData(), Sprintf("1-%d", dataPatternCookie));
51+
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].GetOffset(), INITIAL_OFFSET + dataPatternCookie * 2 + 1);
52+
UNIT_ASSERT_VALUES_EQUAL(data->Records[1].GetData(), Sprintf("2-%d", dataPatternCookie));
5353
}
5454

5555
TEvPersQueue::TEvResponse* GenerateData(ui32 dataPatternCookie) {

ydb/core/tx/replication/service/base_table_writer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/core/change_exchange/util.h>
99
#include <ydb/core/tablet_flat/flat_row_eggs.h>
1010
#include <ydb/core/tx/datashard/datashard.h>
11+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
1112
#include <ydb/core/tx/scheme_cache/helpers.h>
1213
#include <ydb/core/tx/tx_proxy/proxy.h>
1314
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -434,8 +435,8 @@ class TLocalTableWriter
434435
TSet<TRowVersion> versionsWithoutTxId;
435436

436437
for (auto& r : ev->Get()->Records) {
437-
auto offset = r.Offset;
438-
auto& data = r.Data;
438+
auto offset = r.GetOffset();
439+
auto& data = r.GetData();
439440

440441
auto record = Parser->Parse(ev->Get()->Source, offset, std::move(data));
441442

ydb/core/tx/replication/service/common_ut.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#pragma once
22

3-
#include "worker.h"
3+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
44

55
namespace NKikimr::NReplication::NService {
66

7-
struct TRecord: public TEvWorker::TEvData::TRecord {
7+
struct TRecord: public TTopicMessage {
88
explicit TRecord(ui64 offset, const TString& data)
9-
: TEvWorker::TEvData::TRecord(offset, data, TInstant::Zero(), "MessageGroupId", "ProducerId", 42)
9+
: TTopicMessage(offset, data)
1010
{}
1111
};
1212

ydb/core/tx/replication/service/s3_writer_ut.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include "common_ut.h"
12
#include "s3_writer.h"
23
#include "worker.h"
34

@@ -60,8 +61,7 @@ Y_UNIT_TEST_SUITE(S3Writer) {
6061
UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().at("/TEST/writer.AtufpxzetsqaVnEuozdXpD.json"),
6162
R"({"finished":false,"table_name":"/MyRoot/Table","writer_name":"AtufpxzetsqaVnEuozdXpD"})");
6263

63-
using TRecord = TEvWorker::TEvData::TRecord;
64-
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData({
64+
env.Send<TEvWorker::TEvPoll>(writer, new TEvWorker::TEvData(0, "TestSource", {
6565
TRecord(1, R"({"key":[1], "update":{"value":"10"}})"),
6666
TRecord(2, R"({"key":[2], "update":{"value":"20"}})"),
6767
TRecord(3, R"({"key":[3], "update":{"value":"30"}})"),
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(S3Writer) {
7575
R"({"key":[2], "update":{"value":"20"}})" "\n"
7676
R"({"key":[3], "update":{"value":"30"}})" "\n");
7777

78-
auto res = env.Send<TEvWorker::TEvGone>(writer, new TEvWorker::TEvData({}));
78+
auto res = env.Send<TEvWorker::TEvGone>(writer, new TEvWorker::TEvData(0, "TestSource", {}));
7979

8080
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, TEvWorker::TEvGone::DONE);
8181
UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 2);

ydb/core/tx/replication/service/table_writer_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
#include "common_ut.h"
12
#include "service.h"
23
#include "table_writer.h"
3-
#include "common_ut.h"
4+
#include "worker.h"
45

56
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
67
#include <ydb/core/tx/replication/ut_helpers/test_env.h>

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "topic_reader.h"
33
#include "worker.h"
44

5+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
56
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
67
#include <ydb/library/actors/core/actor.h>
78
#include <ydb/library/actors/core/hfunc.h>
@@ -57,11 +58,11 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
5758
LOG_D("Handle " << ev->Get()->ToString());
5859

5960
auto& result = ev->Get()->Result;
60-
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(result.Messages.size()));
61+
TVector<TTopicMessage> records(::Reserve(result.Messages.size()));
6162

6263
for (auto& msg : result.Messages) {
6364
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
64-
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime(), std::move(msg.GetMessageGroupId()), std::move(msg.GetProducerId()), msg.GetSeqNo());
65+
records.push_back(std::move(msg));
6566
}
6667

6768
Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records)));

ydb/core/tx/replication/service/topic_reader_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {
8686
UNIT_ASSERT_VALUES_EQUAL(records.size(), 1);
8787

8888
const auto& record = records.at(0);
89-
UNIT_ASSERT_VALUES_EQUAL(record.Offset, 0);
90-
UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-1");
89+
UNIT_ASSERT_VALUES_EQUAL(record.GetOffset(), 0);
90+
UNIT_ASSERT_VALUES_EQUAL(record.GetData(), "message-1");
9191
}
9292

9393
// trigger commit, write new data & kill reader
@@ -103,8 +103,8 @@ Y_UNIT_TEST_SUITE(RemoteTopicReader) {
103103
UNIT_ASSERT_VALUES_EQUAL(records.size(), 1);
104104

105105
const auto& record = records.at(0);
106-
UNIT_ASSERT_VALUES_EQUAL(record.Offset, 1);
107-
UNIT_ASSERT_VALUES_EQUAL(record.Data, "message-2");
106+
UNIT_ASSERT_VALUES_EQUAL(record.GetOffset(), 1);
107+
UNIT_ASSERT_VALUES_EQUAL(record.GetData(), "message-2");
108108
}
109109
}
110110
}

ydb/core/tx/replication/service/ut_s3_writer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SIZE(MEDIUM)
66

77
PEERDIR(
88
ydb/core/tx/replication/ut_helpers
9+
ydb/core/tx/replication/ydb_proxy
910
library/cpp/string_utils/base64
1011
library/cpp/testing/unittest
1112
)

ydb/core/tx/replication/service/ut_table_writer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SIZE(MEDIUM)
77
PEERDIR(
88
ydb/core/tx/datashard/ut_common
99
ydb/core/tx/replication/ut_helpers
10+
ydb/core/tx/replication/ydb_proxy
1011
library/cpp/string_utils/base64
1112
library/cpp/testing/unittest
1213
)

0 commit comments

Comments
 (0)