Skip to content

Commit 29d6dd3

Browse files
Move TTargetTransfer to target_transfer.h (#15269)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
1 parent 4f16aff commit 29d6dd3

13 files changed

+221
-197
lines changed

ydb/core/tx/replication/controller/event_util.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#include "event_util.h"
2-
#include "target_table.h"
2+
#include "target_transfer.h"
33

44
namespace NKikimr::NReplication::NController {
55

ydb/core/tx/replication/controller/replication.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "secret_resolver.h"
55
#include "target_discoverer.h"
66
#include "target_table.h"
7+
#include "target_transfer.h"
78
#include "tenant_resolver.h"
89
#include "util.h"
910

ydb/core/tx/replication/controller/target_discoverer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "private_events.h"
33
#include "target_discoverer.h"
44
#include "target_table.h"
5+
#include "target_transfer.h"
56
#include "util.h"
67

78
#include <ydb/core/base/path.h>

ydb/core/tx/replication/controller/target_discoverer_ut.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "private_events.h"
22
#include "target_discoverer.h"
33
#include "target_table.h"
4+
#include "target_transfer.h"
45

56
#include <ydb/core/tx/replication/ut_helpers/test_env.h>
67
#include <ydb/core/tx/replication/ut_helpers/test_table.h>
Lines changed: 0 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -1,123 +1,16 @@
1-
#include "event_util.h"
2-
#include "logging.h"
3-
#include "stream_consumer_remover.h"
41
#include "target_table.h"
5-
#include "util.h"
62

73
#include <ydb/core/base/path.h>
8-
#include <ydb/core/scheme/scheme_pathid.h>
9-
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
10-
#include <ydb/library/actors/core/actor_bootstrapped.h>
11-
#include <ydb/library/actors/core/hfunc.h>
124

135
namespace NKikimr::NReplication::NController {
146

15-
class TTableWorkerRegistar: public TActorBootstrapped<TTableWorkerRegistar> {
16-
void Handle(TEvYdbProxy::TEvDescribeTopicResponse::TPtr& ev) {
17-
LOG_T("Handle " << ev->Get()->ToString());
18-
19-
const auto& result = ev->Get()->Result;
20-
if (!result.IsSuccess()) {
21-
if (IsRetryableError(result)) {
22-
LOG_W("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Retry.");
23-
return Retry();
24-
}
25-
26-
LOG_E("Error of resolving topic '" << SrcStreamPath << "': " << ev->Get()->ToString() << ". Stop.");
27-
return; // TODO: hard error
28-
}
29-
30-
for (const auto& partition : result.GetTopicDescription().GetPartitions()) {
31-
if (!partition.GetParentPartitionIds().empty()) {
32-
continue;
33-
}
34-
35-
auto ev = MakeRunWorkerEv(
36-
ReplicationId, TargetId, Config, partition.GetPartitionId(),
37-
ConnectionParams, ConsistencySettings, SrcStreamPath, SrcStreamConsumerName, DstPathId);
38-
Send(Parent, std::move(ev));
39-
}
40-
41-
PassAway();
42-
}
43-
44-
void Retry() {
45-
LOG_D("Retry");
46-
Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup());
47-
}
48-
49-
public:
50-
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
51-
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR;
52-
}
53-
54-
explicit TTableWorkerRegistar(
55-
const TActorId& parent,
56-
const TActorId& proxy,
57-
const NKikimrReplication::TConnectionParams& connectionParams,
58-
const NKikimrReplication::TConsistencySettings& consistencySettings,
59-
ui64 rid,
60-
ui64 tid,
61-
const TString& srcStreamPath,
62-
const TString& srcStreamConsumerName,
63-
const TPathId& dstPathId,
64-
const TReplication::ITarget::IConfig::TPtr& config)
65-
: Parent(parent)
66-
, YdbProxy(proxy)
67-
, ConnectionParams(connectionParams)
68-
, ConsistencySettings(consistencySettings)
69-
, ReplicationId(rid)
70-
, TargetId(tid)
71-
, SrcStreamPath(srcStreamPath)
72-
, SrcStreamConsumerName(srcStreamConsumerName)
73-
, DstPathId(dstPathId)
74-
, LogPrefix("TableWorkerRegistar", ReplicationId, TargetId)
75-
, Config(config)
76-
{
77-
}
78-
79-
void Bootstrap() {
80-
Become(&TThis::StateWork);
81-
Send(YdbProxy, new TEvYdbProxy::TEvDescribeTopicRequest(SrcStreamPath, {}));
82-
}
83-
84-
STATEFN(StateWork) {
85-
switch (ev->GetTypeRewrite()) {
86-
hFunc(TEvYdbProxy::TEvDescribeTopicResponse, Handle);
87-
sFunc(TEvents::TEvWakeup, Bootstrap);
88-
sFunc(TEvents::TEvPoison, PassAway);
89-
}
90-
}
91-
92-
private:
93-
const TActorId Parent;
94-
const TActorId YdbProxy;
95-
const NKikimrReplication::TConnectionParams ConnectionParams;
96-
const NKikimrReplication::TConsistencySettings ConsistencySettings;
97-
const ui64 ReplicationId;
98-
const ui64 TargetId;
99-
const TString SrcStreamPath;
100-
const TString SrcStreamConsumerName;
101-
const TPathId DstPathId;
102-
const TActorLogPrefix LogPrefix;
103-
const TReplication::ITarget::IConfig::TPtr Config;
104-
105-
}; // TTableWorkerRegistar
1067

1078
TTargetTableBase::TTargetTableBase(TReplication* replication, ETargetKind finalKind,
1089
ui64 id, const IConfig::TPtr& config)
10910
: TTargetWithStream(replication, finalKind, id, config)
11011
{
11112
}
11213

113-
IActor* TTargetTableBase::CreateWorkerRegistar(const TActorContext& ctx) const {
114-
auto replication = GetReplication();
115-
const auto& config = replication->GetConfig();
116-
return new TTableWorkerRegistar(ctx.SelfID, replication->GetYdbProxy(),
117-
config.GetSrcConnectionParams(), config.GetConsistencySettings(),
118-
replication->GetId(), GetId(), BuildStreamPath(), GetStreamConsumerName(), GetDstPathId(), GetConfig());
119-
}
120-
12114
TTargetTable::TTargetTable(TReplication* replication, ui64 id, const IConfig::TPtr& config)
12215
: TTargetTableBase(replication, ETargetKind::Table, id, config)
12316
{
@@ -140,62 +33,4 @@ TString TTargetIndexTable::BuildStreamPath() const {
14033
return CanonizePath(ChildPath(SplitPath(GetSrcPath()), {"indexImplTable", GetStreamName()}));
14134
}
14235

143-
TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConfig::TPtr& config)
144-
: TTargetTableBase(replication, ETargetKind::Transfer, id, config)
145-
{
146-
}
147-
148-
void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) {
149-
auto& t = cfg.GetTransferSpecific().GetTargets(0);
150-
Config = std::make_shared<TTargetTransfer::TTransferConfig>(
151-
GetConfig()->GetSrcPath(),
152-
GetConfig()->GetDstPath(),
153-
t.GetTransformLambda());
154-
}
155-
156-
void TTargetTransfer::Progress(const TActorContext& ctx) {
157-
auto replication = GetReplication();
158-
159-
switch (GetStreamState()) {
160-
case EStreamState::Removing:
161-
if (GetWorkers()) {
162-
RemoveWorkers(ctx);
163-
} else if (!StreamConsumerRemover) {
164-
StreamConsumerRemover = ctx.Register(CreateStreamConsumerRemover(replication, GetId(), ctx));
165-
}
166-
return;
167-
case EStreamState::Creating:
168-
case EStreamState::Ready:
169-
case EStreamState::Removed:
170-
case EStreamState::Error:
171-
break;
172-
}
173-
174-
TTargetWithStream::Progress(ctx);
175-
}
176-
177-
void TTargetTransfer::Shutdown(const TActorContext& ctx) {
178-
for (auto* x : TVector<TActorId*>{&StreamConsumerRemover}) {
179-
if (auto actorId = std::exchange(*x, {})) {
180-
ctx.Send(actorId, new TEvents::TEvPoison());
181-
}
182-
}
183-
184-
TTargetWithStream::Shutdown(ctx);
185-
}
186-
187-
TString TTargetTransfer::BuildStreamPath() const {
188-
return CanonizePath(GetSrcPath());
189-
}
190-
191-
TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda)
192-
: TConfigBase(ETargetKind::Transfer, srcPath, dstPath)
193-
, TransformLambda(transformLambda)
194-
{
195-
}
196-
197-
const TString& TTargetTransfer::TTransferConfig::GetTransformLambda() const {
198-
return TransformLambda;
199-
}
200-
20136
}

ydb/core/tx/replication/controller/target_table.h

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ class TTargetTableBase: public TTargetWithStream {
1212
TString GetStreamPath() const override;
1313

1414
protected:
15-
IActor* CreateWorkerRegistar(const TActorContext& ctx) const override;
1615
virtual TString BuildStreamPath() const = 0;
1716
};
1817

@@ -50,32 +49,4 @@ class TTargetIndexTable: public TTargetTableBase {
5049
TString BuildStreamPath() const override;
5150
};
5251

53-
class TTargetTransfer: public TTargetTableBase {
54-
public:
55-
struct TTransferConfig : public TConfigBase {
56-
using TPtr = std::shared_ptr<TTransferConfig>;
57-
58-
TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda);
59-
60-
const TString& GetTransformLambda() const;
61-
62-
private:
63-
TString TransformLambda;
64-
};
65-
66-
explicit TTargetTransfer(TReplication* replication,
67-
ui64 id, const IConfig::TPtr& config);
68-
69-
void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;
70-
71-
void Progress(const TActorContext& ctx) override;
72-
void Shutdown(const TActorContext& ctx) override;
73-
74-
protected:
75-
TString BuildStreamPath() const override;
76-
77-
private:
78-
TActorId StreamConsumerRemover;
79-
};
80-
8152
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#include "stream_consumer_remover.h"
2+
#include "target_transfer.h"
3+
4+
#include <ydb/core/base/path.h>
5+
#include <ydb/core/tx/replication/service/service.h>
6+
#include <ydb/library/actors/core/events.h>
7+
8+
namespace NKikimr::NReplication::NController {
9+
10+
TTargetTransfer::TTargetTransfer(TReplication* replication, ui64 id, const IConfig::TPtr& config)
11+
: TTargetWithStream(replication, ETargetKind::Transfer, id, config)
12+
{
13+
}
14+
15+
void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& cfg) {
16+
auto& t = cfg.GetTransferSpecific().GetTargets(0);
17+
Config = std::make_shared<TTargetTransfer::TTransferConfig>(
18+
GetConfig()->GetSrcPath(),
19+
GetConfig()->GetDstPath(),
20+
t.GetTransformLambda());
21+
}
22+
23+
void TTargetTransfer::Progress(const TActorContext& ctx) {
24+
auto replication = GetReplication();
25+
26+
switch (GetStreamState()) {
27+
case EStreamState::Removing:
28+
if (GetWorkers()) {
29+
RemoveWorkers(ctx);
30+
} else if (!StreamConsumerRemover) {
31+
StreamConsumerRemover = ctx.Register(CreateStreamConsumerRemover(replication, GetId(), ctx));
32+
}
33+
return;
34+
case EStreamState::Creating:
35+
case EStreamState::Ready:
36+
case EStreamState::Removed:
37+
case EStreamState::Error:
38+
break;
39+
}
40+
41+
TTargetWithStream::Progress(ctx);
42+
}
43+
44+
void TTargetTransfer::Shutdown(const TActorContext& ctx) {
45+
for (auto* x : TVector<TActorId*>{&StreamConsumerRemover}) {
46+
if (auto actorId = std::exchange(*x, {})) {
47+
ctx.Send(actorId, new TEvents::TEvPoison());
48+
}
49+
}
50+
51+
TTargetWithStream::Shutdown(ctx);
52+
}
53+
54+
TString TTargetTransfer::GetStreamPath() const {
55+
return CanonizePath(GetSrcPath());
56+
}
57+
58+
TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda)
59+
: TConfigBase(ETargetKind::Transfer, srcPath, dstPath)
60+
, TransformLambda(transformLambda)
61+
{
62+
}
63+
64+
const TString& TTargetTransfer::TTransferConfig::GetTransformLambda() const {
65+
return TransformLambda;
66+
}
67+
68+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#pragma once
2+
3+
#include "target_with_stream.h"
4+
5+
namespace NKikimr::NReplication::NController {
6+
7+
class TTargetTransfer: public TTargetWithStream {
8+
public:
9+
struct TTransferConfig : public TConfigBase {
10+
using TPtr = std::shared_ptr<TTransferConfig>;
11+
12+
TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda);
13+
14+
const TString& GetTransformLambda() const;
15+
16+
private:
17+
TString TransformLambda;
18+
};
19+
20+
explicit TTargetTransfer(TReplication* replication,
21+
ui64 id, const IConfig::TPtr& config);
22+
23+
void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override;
24+
25+
void Progress(const TActorContext& ctx) override;
26+
void Shutdown(const TActorContext& ctx) override;
27+
28+
TString GetStreamPath() const override;
29+
30+
private:
31+
TActorId StreamConsumerRemover;
32+
};
33+
34+
}

0 commit comments

Comments
 (0)