Skip to content

Commit b7e3633

Browse files
authored
Async replication: configurable default retention period (#8270)
1 parent 4581091 commit b7e3633

File tree

8 files changed

+35
-7
lines changed

8 files changed

+35
-7
lines changed

ydb/core/base/appdata.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <ydb/core/protos/key.pb.h>
2121
#include <ydb/core/protos/memory_controller_config.pb.h>
2222
#include <ydb/core/protos/pqconfig.pb.h>
23+
#include <ydb/core/protos/replication.pb.h>
2324
#include <ydb/core/protos/stream.pb.h>
2425
#include <ydb/core/protos/netclassifier.pb.h>
2526
#include <ydb/core/protos/datashard_config.pb.h>
@@ -63,6 +64,7 @@ struct TAppData::TImpl {
6364
NKikimrSharedCache::TSharedCacheConfig SharedCacheConfig;
6465
NKikimrConfig::TMetadataCacheConfig MetadataCacheConfig;
6566
NKikimrConfig::TMemoryControllerConfig MemoryControllerConfig;
67+
NKikimrReplication::TReplicationDefaults ReplicationConfig;
6668
};
6769

6870
TAppData::TAppData(
@@ -116,6 +118,7 @@ TAppData::TAppData(
116118
, SharedCacheConfig(Impl->SharedCacheConfig)
117119
, MetadataCacheConfig(Impl->MetadataCacheConfig)
118120
, MemoryControllerConfig(Impl->MemoryControllerConfig)
121+
, ReplicationConfig(Impl->ReplicationConfig)
119122
, KikimrShouldContinue(kikimrShouldContinue)
120123
{}
121124

ydb/core/base/appdata_fwd.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ namespace NKikimrConfig {
6666
class TMemoryControllerConfig;
6767
}
6868

69+
namespace NKikimrReplication {
70+
class TReplicationDefaults;
71+
}
72+
6973
namespace NKikimrNetClassifier {
7074
class TNetClassifierDistributableConfig;
7175
class TNetClassifierConfig;
@@ -215,6 +219,7 @@ struct TAppData {
215219
NKikimrSharedCache::TSharedCacheConfig& SharedCacheConfig;
216220
NKikimrConfig::TMetadataCacheConfig& MetadataCacheConfig;
217221
NKikimrConfig::TMemoryControllerConfig& MemoryControllerConfig;
222+
NKikimrReplication::TReplicationDefaults& ReplicationConfig;
218223
bool EnforceUserTokenRequirement = false;
219224
bool EnforceUserTokenCheckRequirement = false; // check token if it was specified
220225
bool AllowHugeKeyValueDeletes = true; // delete when all clients limit deletes per request

ydb/core/driver_lib/run/run.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,6 +1131,10 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
11311131
AppData->MemoryControllerConfig.CopyFrom(runConfig.AppConfig.GetMemoryControllerConfig());
11321132
}
11331133

1134+
if (runConfig.AppConfig.HasReplicationConfig()) {
1135+
AppData->ReplicationConfig = runConfig.AppConfig.GetReplicationConfig();
1136+
}
1137+
11341138
// setup resource profiles
11351139
AppData->ResourceProfiles = new TResourceProfiles;
11361140
if (runConfig.AppConfig.GetBootstrapConfig().ResourceProfilesSize())

ydb/core/protos/config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import "ydb/core/protos/memory_controller_config.proto";
2525
import "ydb/core/protos/netclassifier.proto";
2626
import "ydb/core/protos/node_broker.proto";
2727
import "ydb/core/protos/pqconfig.proto";
28+
import "ydb/core/protos/replication.proto";
2829
import "ydb/core/protos/resource_broker.proto";
2930
import "ydb/core/protos/shared_cache.proto";
3031
import "ydb/core/protos/stream.proto";
@@ -1946,6 +1947,7 @@ message TAppConfig {
19461947
optional TMetadataCacheConfig MetadataCacheConfig = 80;
19471948
optional TMemoryControllerConfig MemoryControllerConfig = 81;
19481949
optional TGroupedMemoryLimiterConfig GroupedMemoryLimiterConfig = 82;
1950+
optional NKikimrReplication.TReplicationDefaults ReplicationConfig = 83;
19491951

19501952
repeated TNamedConfig NamedConfigs = 100;
19511953
optional string ClusterYamlConfig = 101;

ydb/core/protos/console_config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ message TConfigItem {
140140
BackgroundCleaningConfigItem = 77;
141141
MetadataCacheConfigItem = 80;
142142
MemoryControllerConfigItem = 81;
143+
ReplicationConfigItem = 83;
143144

144145
NamedConfigsItem = 100;
145146
ClusterYamlConfigItem = 101;

ydb/core/protos/replication.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import "ydb/public/api/protos/ydb_issue_message.proto";
55
package NKikimrReplication;
66
option java_package = "ru.yandex.kikimr.proto";
77

8+
message TReplicationDefaults {
9+
optional int32 RetentionPeriodSeconds = 1 [default = 86400]; // 1d
10+
}
11+
812
message TStaticCredentials {
913
optional string User = 1;
1014
optional string Password = 2 [(Ydb.sensitive) = true];

ydb/core/tx/replication/controller/stream_creator.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
#include "target_with_stream.h"
55
#include "util.h"
66

7+
#include <ydb/core/base/appdata.h>
78
#include <ydb/core/base/path.h>
9+
#include <ydb/core/protos/replication.pb.h>
810
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
911
#include <ydb/library/actors/core/actor_bootstrapped.h>
1012
#include <ydb/library/actors/core/hfunc.h>
@@ -16,9 +18,12 @@
1618
namespace NKikimr::NReplication::NController {
1719

1820
class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
19-
static NYdb::NTable::TChangefeedDescription MakeChangefeed(const TString& name, const NJson::TJsonMap& attrs) {
21+
static NYdb::NTable::TChangefeedDescription MakeChangefeed(
22+
const TString& name, const TDuration& retentionPeriod, const NJson::TJsonMap& attrs)
23+
{
2024
using namespace NYdb::NTable;
2125
return TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json)
26+
.WithRetentionPeriod(retentionPeriod)
2227
.WithInitialScan()
2328
.AddAttribute("__async_replication", NJson::WriteJson(attrs, false));
2429
}
@@ -133,14 +138,15 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
133138
TReplication::ETargetKind kind,
134139
const TString& srcPath,
135140
const TString& dstPath,
136-
const TString& streamName)
141+
const TString& streamName,
142+
const TDuration& streamRetentionPeriod)
137143
: Parent(parent)
138144
, YdbProxy(proxy)
139145
, ReplicationId(rid)
140146
, TargetId(tid)
141147
, Kind(kind)
142148
, SrcPath(srcPath)
143-
, Changefeed(MakeChangefeed(streamName, NJson::TJsonMap{
149+
, Changefeed(MakeChangefeed(streamName, streamRetentionPeriod, NJson::TJsonMap{
144150
{"path", dstPath},
145151
{"id", ToString(rid)},
146152
}))
@@ -175,13 +181,15 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct
175181
Y_ABORT_UNLESS(target);
176182
return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(),
177183
replication->GetId(), target->GetId(), target->GetKind(),
178-
target->GetSrcPath(), target->GetDstPath(), target->GetStreamName());
184+
target->GetSrcPath(), target->GetDstPath(), target->GetStreamName(),
185+
TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()));
179186
}
180187

181188
IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
182-
TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, const TString& streamName)
189+
TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
190+
const TString& streamName, const TDuration& streamRetentionPeriod)
183191
{
184-
return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName);
192+
return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName, streamRetentionPeriod);
185193
}
186194

187195
}

ydb/core/tx/replication/controller/stream_creator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace NKikimr::NReplication::NController {
66

77
IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx);
88
IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
9-
TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, const TString& streamName);
9+
TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath,
10+
const TString& streamName, const TDuration& streamRetentionPeriod);
1011

1112
}

0 commit comments

Comments
 (0)