Skip to content

Commit c680e73

Browse files
committed
added the check of the transfer write permission (#17938)
1 parent e5594f7 commit c680e73

File tree

12 files changed

+91
-17
lines changed

12 files changed

+91
-17
lines changed

ydb/core/protos/replication.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ message TReplicationConfig {
9696
}
9797
optional TTarget Target = 1;
9898
optional TBatchingSettings Batching = 2;
99+
optional string RunAsUser = 3;
99100
}
100101

101102
optional TConnectionParams SrcConnectionParams = 1;
@@ -258,6 +259,7 @@ message TTransferWriterSettings {
258259
optional NKikimrProto.TPathID PathId = 1;
259260
optional string TransformLambda = 2;
260261
optional TBatchingSettings Batching = 3;
262+
optional string RunAsUser = 4;
261263
}
262264

263265
message TRunWorkerCommand {

ydb/core/tx/replication/controller/event_util.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
6060
dstPathId.ToProto(writerSettings.MutablePathId());
6161
writerSettings.SetTransformLambda(p->GetTransformLambda());
6262
writerSettings.MutableBatching()->CopyFrom(batchingSettings);
63+
writerSettings.SetRunAsUser(p->GetRunAsUser());
6364
break;
6465
}
6566
}

ydb/core/tx/replication/controller/schema.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ struct TControllerSchema: NIceDb::Schema {
4848
};
4949
struct Issue: Column<9, NScheme::NTypeIds::Utf8> {};
5050
struct TransformLambda: Column<10, NScheme::NTypeIds::Utf8> {};
51+
struct RunAsUser: Column<11, NScheme::NTypeIds::Utf8> {};
5152

5253
using TKey = TableKey<ReplicationId, Id>;
53-
using TColumns = TableColumns<ReplicationId, Id, Kind, SrcPath, DstPath, DstState, DstPathOwnerId, DstPathLocalId, Issue, TransformLambda>;
54+
using TColumns = TableColumns<ReplicationId, Id, Kind, SrcPath, DstPath, DstState, DstPathOwnerId, DstPathLocalId, Issue, TransformLambda, RunAsUser>;
5455
};
5556

5657
struct SrcStreams: Table<4> {

ydb/core/tx/replication/controller/target_discoverer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ class TTargetDiscoverer: public TActorBootstrapped<TTargetDiscoverer> {
183183
const auto& targetConf = Config.GetTransferSpecific().GetTarget();
184184

185185
const auto& target = ToAdd.emplace_back(TReplication::ETargetKind::Transfer,
186-
std::make_shared<TTargetTransfer::TTransferConfig>(path.first, path.second, targetConf.GetTransformLambda()));
186+
std::make_shared<TTargetTransfer::TTransferConfig>(path.first, path.second, targetConf.GetTransformLambda(),
187+
Config.GetTransferSpecific().GetRunAsUser()));
187188
LOG_I("Add target"
188189
<< ": srcPath# " << target.Config->GetSrcPath()
189190
<< ", dstPath# " << target.Config->GetDstPath()

ydb/core/tx/replication/controller/target_transfer.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig&
1717
Config = std::make_shared<TTargetTransfer::TTransferConfig>(
1818
GetConfig()->GetSrcPath(),
1919
GetConfig()->GetDstPath(),
20-
t.GetTransformLambda());
20+
t.GetTransformLambda(),
21+
cfg.GetTransferSpecific().GetRunAsUser());
2122
}
2223

2324
void TTargetTransfer::Progress(const TActorContext& ctx) {
@@ -55,14 +56,19 @@ TString TTargetTransfer::GetStreamPath() const {
5556
return CanonizePath(GetSrcPath());
5657
}
5758

58-
TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda)
59+
TTargetTransfer::TTransferConfig::TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda, const TString& runAsUser)
5960
: TConfigBase(ETargetKind::Transfer, srcPath, dstPath)
6061
, TransformLambda(transformLambda)
62+
, RunAsUser(runAsUser)
6163
{
6264
}
6365

6466
const TString& TTargetTransfer::TTransferConfig::GetTransformLambda() const {
6567
return TransformLambda;
6668
}
6769

70+
const TString& TTargetTransfer::TTransferConfig::GetRunAsUser() const {
71+
return RunAsUser;
72+
}
73+
6874
}

ydb/core/tx/replication/controller/target_transfer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ class TTargetTransfer: public TTargetWithStream {
99
struct TTransferConfig : public TConfigBase {
1010
using TPtr = std::shared_ptr<TTransferConfig>;
1111

12-
TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda);
12+
TTransferConfig(const TString& srcPath, const TString& dstPath, const TString& transformLambda, const TString& runAsUser);
1313

1414
const TString& GetTransformLambda() const;
15+
const TString& GetRunAsUser() const;
1516

1617
private:
1718
TString TransformLambda;
19+
TString RunAsUser;
1820
};
1921

2022
explicit TTargetTransfer(TReplication* replication,

ydb/core/tx/replication/controller/tx_discovery_targets_result.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,18 @@ class TController::TTxDiscoveryTargetsResult: public TTxBase {
4646
const auto tid = Replication->AddTarget(target.Kind, target.Config);
4747

4848
TString transformLambda;
49+
TString runAsUser;
4950
if (auto p = std::dynamic_pointer_cast<const TTargetTransfer::TTransferConfig>(target.Config)) {
5051
transformLambda = p->GetTransformLambda();
52+
runAsUser = p->GetRunAsUser();
5153
}
5254

5355
db.Table<Schema::Targets>().Key(rid, tid).Update(
5456
NIceDb::TUpdate<Schema::Targets::Kind>(target.Kind),
5557
NIceDb::TUpdate<Schema::Targets::SrcPath>(target.Config->GetSrcPath()),
5658
NIceDb::TUpdate<Schema::Targets::DstPath>(target.Config->GetDstPath()),
57-
NIceDb::TUpdate<Schema::Targets::TransformLambda>(transformLambda)
59+
NIceDb::TUpdate<Schema::Targets::TransformLambda>(transformLambda),
60+
NIceDb::TUpdate<Schema::Targets::RunAsUser>(runAsUser)
5861
);
5962

6063
CLOG_N(ctx, "Add target"

ydb/core/tx/replication/controller/tx_init.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ class TController::TTxInit: public TTxBase {
8888
rowset.GetValue<Schema::Targets::DstPathLocalId>()
8989
);
9090
const auto transformLambda = rowset.GetValue<Schema::Targets::TransformLambda>();
91+
const auto runAsUser = rowset.GetValue<Schema::Targets::RunAsUser>();
9192

9293
auto replication = Self->Find(rid);
9394
Y_VERIFY_S(replication, "Unknown replication: " << rid);
@@ -103,7 +104,7 @@ class TController::TTxInit: public TTxBase {
103104
break;
104105

105106
case TReplication::ETargetKind::Transfer:
106-
config = std::make_shared<TTargetTransfer::TTransferConfig>(srcPath, dstPath, transformLambda);
107+
config = std::make_shared<TTargetTransfer::TTransferConfig>(srcPath, dstPath, transformLambda, runAsUser);
107108
break;
108109
}
109110

ydb/core/tx/replication/service/service.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,9 +432,10 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
432432
transformLambda = writerSettings.GetTransformLambda(),
433433
compilationService = *CompilationService,
434434
batchingSettings = writerSettings.GetBatching(),
435-
transferWriterFactory = transferWriterFactory
435+
transferWriterFactory = transferWriterFactory,
436+
runAsUser = writerSettings.GetRunAsUser()
436437
]() {
437-
return transferWriterFactory->Create({transformLambda, tablePathId, compilationService, batchingSettings});
438+
return transferWriterFactory->Create({transformLambda, tablePathId, compilationService, batchingSettings, runAsUser});
438439
};
439440
}
440441

ydb/core/tx/replication/service/transfer_writer_factory.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class ITransferWriterFactory {
2121
const TPathId& TablePathId;
2222
const TActorId& CompileServiceId;
2323
const NKikimrReplication::TBatchingSettings& BatchingSettings;
24+
const TString& RunAsUser;
2425
};
2526

2627
virtual IActor* Create(const Parameters& parameters) const = 0;

0 commit comments

Comments
 (0)