Skip to content

Commit a170038

Browse files
authored
Schema changes for transfer data from the topic to the table (#12394)
1 parent 2204da3 commit a170038

31 files changed

+684
-19
lines changed

ydb/core/driver_lib/cli_base/cli_cmds_db.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ class TClientCommandSchemaDescribe : public TClientCommand {
273273
case NKikimrSchemeOp::EPathTypeReplication:
274274
type = "<replication>";
275275
break;
276+
case NKikimrSchemeOp::EPathTypeTransfer:
277+
type = "<transfer>";
278+
break;
276279
case NKikimrSchemeOp::EPathTypePersQueueGroup:
277280
type = "<pq group>";
278281
break;

ydb/core/protos/counters_schemeshard.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,13 @@ enum ESimpleCounters {
229229
COUNTER_IN_FLIGHT_OPS_TxDropBackupCollection = 181 [(CounterOpts) = {Name: "InFlightOps/DropBackupCollection"}];
230230

231231
COUNTER_IN_FLIGHT_OPS_TxMoveSequence = 182 [(CounterOpts) = {Name: "InFlightOps/MoveSequence"}];
232+
233+
COUNTER_IN_FLIGHT_OPS_TxCreateTransfer = 183 [(CounterOpts) = {Name: "InFlightOps/CreateTransfer"}];
234+
COUNTER_IN_FLIGHT_OPS_TxAlterTransfer = 184 [(CounterOpts) = {Name: "InFlightOps/AlterTransfer"}];
235+
COUNTER_IN_FLIGHT_OPS_TxDropTransfer = 185 [(CounterOpts) = {Name: "InFlightOps/DropTransfer"}];
236+
COUNTER_IN_FLIGHT_OPS_TxDropTransferCascade = 186 [(CounterOpts) = {Name: "InFlightOps/DropTransferCascade"}];
237+
238+
COUNTER_TRANSFER_COUNT = 187 [(CounterOpts) = {Name: "Transfers"}];
232239
}
233240

234241
enum ECumulativeCounters {
@@ -368,6 +375,11 @@ enum ECumulativeCounters {
368375
COUNTER_FINISHED_OPS_TxDropBackupCollection = 109 [(CounterOpts) = {Name: "FinishedOps/DropBackupCollection"}];
369376

370377
COUNTER_FINISHED_OPS_TxMoveSequence = 110 [(CounterOpts) = {Name: "FinishedOps/TxMoveSequence"}];
378+
379+
COUNTER_FINISHED_OPS_TxCreateTransfer = 111 [(CounterOpts) = {Name: "FinishedOps/CreateTransfer"}];
380+
COUNTER_FINISHED_OPS_TxAlterTransfer = 112 [(CounterOpts) = {Name: "FinishedOps/AlterTransfer"}];
381+
COUNTER_FINISHED_OPS_TxDropTransfer = 113 [(CounterOpts) = {Name: "FinishedOps/DropTransfer"}];
382+
COUNTER_FINISHED_OPS_TxDropTransferCascade = 114 [(CounterOpts) = {Name: "FinishedOps/DropTransferCascade"}];
371383
}
372384

373385
enum EPercentileCounters {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1763,6 +1763,7 @@ enum EPathType {
17631763
EPathTypeView = 20;
17641764
EPathTypeResourcePool = 21;
17651765
EPathTypeBackupCollection = 22;
1766+
EPathTypeTransfer = 23;
17661767
}
17671768

17681769
enum EPathSubType {

ydb/core/protos/replication.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,26 @@ message TReplicationConfig {
5858
repeated TTarget Targets = 1;
5959
}
6060

61+
message TTransferSpecific {
62+
message TTarget {
63+
// in/out
64+
optional string SrcPath = 1;
65+
66+
oneof Dst {
67+
string DstPath = 2;
68+
string DstPathLambda = 3;
69+
}
70+
71+
// out
72+
optional uint64 Id = 4;
73+
optional uint32 LagMilliSeconds = 5;
74+
// transfer
75+
optional string TransformLambda = 6;
76+
}
77+
78+
repeated TTarget Targets = 1;
79+
}
80+
6181
optional TConnectionParams SrcConnectionParams = 1;
6282

6383
// targets to be replicated
@@ -67,6 +87,8 @@ message TReplicationConfig {
6787
TTargetEverything Everything = 4;
6888
// replicate specified objects
6989
TTargetSpecific Specific = 5;
90+
// transfer specified objects
91+
TTransferSpecific TransferSpecific = 9;
7092
}
7193

7294
reserved 6; // InitialSync

ydb/core/protos/schemeshard/operations.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,5 +168,12 @@ enum EOperationType {
168168
// Move sequence
169169
ESchemeOpMoveSequence = 108;
170170

171+
// Transfer
172+
ESchemeOpCreateTransfer = 112;
173+
ESchemeOpAlterTransfer = 113;
174+
ESchemeOpDropTransfer = 114;
175+
ESchemeOpDropTransferCascade = 115;
176+
177+
171178
// Some entries are grouped by semantics, so are out of order
172179
}

ydb/core/tx/scheme_board/cache.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1592,6 +1592,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
15921592
Kind = TNavigate::KindReplication;
15931593
FillInfo(Kind, ReplicationInfo, std::move(*pathDesc.MutableReplicationDescription()));
15941594
break;
1595+
case NKikimrSchemeOp::EPathTypeTransfer:
1596+
Kind = TNavigate::KindTransfer;
1597+
FillInfo(Kind, ReplicationInfo, std::move(*pathDesc.MutableReplicationDescription()));
1598+
break;
15951599
case NKikimrSchemeOp::EPathTypeBlobDepot:
15961600
Kind = TNavigate::KindBlobDepot;
15971601
FillInfo(Kind, BlobDepotInfo, std::move(*pathDesc.MutableBlobDepotDescription()));
@@ -1676,6 +1680,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
16761680
case NKikimrSchemeOp::EPathTypeReplication:
16771681
ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindReplication);
16781682
break;
1683+
case NKikimrSchemeOp::EPathTypeTransfer:
1684+
ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindTransfer);
1685+
break;
16791686
case NKikimrSchemeOp::EPathTypeBlobDepot:
16801687
ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindBlobDepot);
16811688
break;

ydb/core/tx/scheme_cache/scheme_cache.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ struct TSchemeCacheNavigate {
160160
KindView = 21,
161161
KindResourcePool = 22,
162162
KindBackupCollection = 23,
163+
KindTransfer = 24,
163164
};
164165

165166
struct TListNodeEntry : public TAtomicRefCount<TListNodeEntry> {

ydb/core/tx/schemeshard/schemeshard__operation.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,16 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
11911191
case TTxState::ETxType::TxDropReplicationCascade:
11921192
return CreateDropReplication(NextPartId(), txState, true);
11931193

1194+
// Transfer
1195+
case TTxState::ETxType::TxCreateTransfer:
1196+
return CreateNewTransfer(NextPartId(), txState);
1197+
case TTxState::ETxType::TxAlterTransfer:
1198+
return CreateAlterTransfer(NextPartId(), txState);
1199+
case TTxState::ETxType::TxDropTransfer:
1200+
return CreateDropTransfer(NextPartId(), txState, false);
1201+
case TTxState::ETxType::TxDropTransferCascade:
1202+
return CreateDropTransfer(NextPartId(), txState, true);
1203+
11941204
// BlobDepot
11951205
case TTxState::ETxType::TxCreateBlobDepot:
11961206
return CreateNewBlobDepot(NextPartId(), txState);
@@ -1450,6 +1460,16 @@ TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx
14501460
case NKikimrSchemeOp::EOperationType::ESchemeOpDropReplicationCascade:
14511461
return {CreateDropReplication(NextPartId(), tx, true)};
14521462

1463+
// Transfer
1464+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTransfer:
1465+
return {CreateNewTransfer(NextPartId(), tx)};
1466+
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTransfer:
1467+
return {CreateAlterTransfer(NextPartId(), tx)};
1468+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropTransfer:
1469+
return {CreateDropTransfer(NextPartId(), tx, false)};
1470+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropTransferCascade:
1471+
return {CreateDropTransfer(NextPartId(), tx, true)};
1472+
14531473
// BlobDepot
14541474
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlobDepot:
14551475
return {CreateNewBlobDepot(NextPartId(), tx)};

ydb/core/tx/schemeshard/schemeshard__operation_alter_replication.cpp

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,25 @@ namespace NKikimr::NSchemeShard {
1414

1515
namespace {
1616

17+
struct IStrategy {
18+
virtual void Check(const TPath::TChecker& checks) const = 0;
19+
};
20+
21+
struct TReplicationStrategy : public IStrategy {
22+
void Check(const TPath::TChecker& checks) const override {
23+
checks.IsReplication();
24+
};
25+
};
26+
27+
struct TTransferStrategy : public IStrategy {
28+
void Check(const TPath::TChecker& checks) const override {
29+
checks.IsTransfer();
30+
};
31+
};
32+
33+
static constexpr TReplicationStrategy ReplicationStrategy;
34+
static constexpr TTransferStrategy TransferStrategy;
35+
1736
class TConfigureParts: public TSubOperationState {
1837
TString DebugHint() const override {
1938
return TStringBuilder()
@@ -278,6 +297,18 @@ class TAlterReplication: public TSubOperation {
278297
public:
279298
using TSubOperation::TSubOperation;
280299

300+
explicit TAlterReplication(TOperationId id, const TTxTransaction& tx, const IStrategy* strategy)
301+
: TSubOperation(id, tx)
302+
, Strategy(strategy)
303+
{
304+
}
305+
306+
explicit TAlterReplication(TOperationId id, TTxState::ETxState state, const IStrategy* strategy)
307+
: TSubOperation(id, state)
308+
, Strategy(strategy)
309+
{
310+
}
311+
281312
THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
282313
const auto& workingDir = Transaction.GetWorkingDir();
283314
const auto& op = Transaction.GetAlterReplication();
@@ -309,8 +340,8 @@ class TAlterReplication: public TSubOperation {
309340
.IsAtLocalSchemeShard()
310341
.IsResolved()
311342
.NotDeleted()
312-
.IsReplication()
313343
.NotUnderOperation();
344+
Strategy->Check(checks);
314345

315346
if (!checks) {
316347
result->SetError(checks.GetStatus(), checks.GetError());
@@ -431,16 +462,27 @@ class TAlterReplication: public TSubOperation {
431462
context.OnComplete.DoneOperation(OperationId);
432463
}
433464

465+
private:
466+
const IStrategy* Strategy;
467+
434468
}; // TAlterReplication
435469

436470
} // anonymous
437471

438472
ISubOperation::TPtr CreateAlterReplication(TOperationId id, const TTxTransaction& tx) {
439-
return MakeSubOperation<TAlterReplication>(id, tx);
473+
return MakeSubOperation<TAlterReplication>(id, tx, &ReplicationStrategy);
440474
}
441475

442476
ISubOperation::TPtr CreateAlterReplication(TOperationId id, TTxState::ETxState state) {
443-
return MakeSubOperation<TAlterReplication>(id, state);
477+
return MakeSubOperation<TAlterReplication>(id, state, &ReplicationStrategy);
478+
}
479+
480+
ISubOperation::TPtr CreateAlterTransfer(TOperationId id, const TTxTransaction& tx) {
481+
return MakeSubOperation<TAlterReplication>(id, tx, &TransferStrategy);
482+
}
483+
484+
ISubOperation::TPtr CreateAlterTransfer(TOperationId id, TTxState::ETxState state) {
485+
return MakeSubOperation<TAlterReplication>(id, state, &TransferStrategy);
444486
}
445487

446488
}

ydb/core/tx/schemeshard/schemeshard__operation_create_replication.cpp

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,52 @@ namespace NKikimr::NSchemeShard {
1414

1515
namespace {
1616

17+
struct IStrategy {
18+
virtual TPathElement::EPathType GetPathType() const = 0;
19+
virtual bool Validate(TProposeResponse& result, const NKikimrSchemeOp::TReplicationDescription& desc) const = 0;
20+
};
21+
22+
struct TReplicationStrategy : public IStrategy {
23+
TPathElement::EPathType GetPathType() const override {
24+
return TPathElement::EPathType::EPathTypeReplication;
25+
};
26+
27+
bool Validate(TProposeResponse& result, const NKikimrSchemeOp::TReplicationDescription& desc) const override {
28+
if (desc.GetConfig().HasTransferSpecific()) {
29+
result.SetError(NKikimrScheme::StatusInvalidParameter, "Wrong replication configuration");
30+
return true;
31+
}
32+
if (desc.HasState()) {
33+
result.SetError(NKikimrScheme::StatusInvalidParameter, "Cannot create replication with explicit state");
34+
return true;
35+
}
36+
37+
return false;
38+
}
39+
};
40+
41+
struct TTransferStrategy : public IStrategy {
42+
TPathElement::EPathType GetPathType() const override {
43+
return TPathElement::EPathType::EPathTypeTransfer;
44+
};
45+
46+
bool Validate(TProposeResponse& result, const NKikimrSchemeOp::TReplicationDescription& desc) const override {
47+
if (!desc.GetConfig().HasTransferSpecific()) {
48+
result.SetError(NKikimrScheme::StatusInvalidParameter, "Wrong transfer configuration");
49+
return true;
50+
}
51+
if (desc.HasState()) {
52+
result.SetError(NKikimrScheme::StatusInvalidParameter, "Cannot create transfer with explicit state");
53+
return true;
54+
}
55+
56+
return false;
57+
}
58+
};
59+
60+
static constexpr TReplicationStrategy ReplicationStrategy;
61+
static constexpr TTransferStrategy TransferStrategy;
62+
1763
class TConfigureParts: public TSubOperationState {
1864
TString DebugHint() const override {
1965
return TStringBuilder()
@@ -235,6 +281,18 @@ class TCreateReplication: public TSubOperation {
235281
public:
236282
using TSubOperation::TSubOperation;
237283

284+
explicit TCreateReplication(const TOperationId& id, TTxState::ETxState state, const IStrategy* strategy)
285+
: TSubOperation(id, state)
286+
, Strategy(strategy)
287+
{
288+
}
289+
290+
explicit TCreateReplication(const TOperationId& id, const TTxTransaction& tx, const IStrategy* strategy)
291+
: TSubOperation(id, tx)
292+
, Strategy(strategy)
293+
{
294+
}
295+
238296
THolder<TProposeResponse> Propose(const TString& owner, TOperationContext& context) override {
239297
const auto& workingDir = Transaction.GetWorkingDir();
240298
auto desc = Transaction.GetReplication();
@@ -268,6 +326,10 @@ class TCreateReplication: public TSubOperation {
268326
}
269327
}
270328

329+
if (Strategy->Validate(*result, desc)) {
330+
return result;
331+
}
332+
271333
auto path = parentPath.Child(name);
272334
{
273335
const auto checks = path.Check();
@@ -306,6 +368,10 @@ class TCreateReplication: public TSubOperation {
306368
}
307369
}
308370

371+
if (Strategy->Validate(*result.Get(), desc)) {
372+
return result;
373+
}
374+
309375
TString errStr;
310376
if (!context.SS->CheckApplyIf(Transaction, errStr)) {
311377
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
@@ -319,16 +385,11 @@ class TCreateReplication: public TSubOperation {
319385
return result;
320386
}
321387

322-
if (desc.HasState()) {
323-
result->SetError(NKikimrScheme::StatusInvalidParameter, "Cannot create replication with explicit state");
324-
return result;
325-
}
326-
327388
path.MaterializeLeaf(owner);
328389
path->CreateTxId = OperationId.GetTxId();
329390
path->LastTxId = OperationId.GetTxId();
330391
path->PathState = TPathElement::EPathState::EPathStateCreate;
331-
path->PathType = TPathElement::EPathType::EPathTypeReplication;
392+
path->PathType = Strategy->GetPathType();
332393
result->SetPathId(path->PathId.LocalPathId);
333394

334395
context.SS->IncrementPathDbRefCount(path->PathId);
@@ -419,16 +480,27 @@ class TCreateReplication: public TSubOperation {
419480
context.OnComplete.DoneOperation(OperationId);
420481
}
421482

483+
private:
484+
const IStrategy* Strategy;
485+
422486
}; // TCreateReplication
423487

424488
} // anonymous
425489

426490
ISubOperation::TPtr CreateNewReplication(TOperationId id, const TTxTransaction& tx) {
427-
return MakeSubOperation<TCreateReplication>(id, tx);
491+
return MakeSubOperation<TCreateReplication>(id, tx, &ReplicationStrategy);
428492
}
429493

430494
ISubOperation::TPtr CreateNewReplication(TOperationId id, TTxState::ETxState state) {
431-
return MakeSubOperation<TCreateReplication>(id, state);
495+
return MakeSubOperation<TCreateReplication>(id, state, &ReplicationStrategy);
496+
}
497+
498+
ISubOperation::TPtr CreateNewTransfer(TOperationId id, const TTxTransaction& tx) {
499+
return MakeSubOperation<TCreateReplication>(id, tx, &TransferStrategy);
500+
}
501+
502+
ISubOperation::TPtr CreateNewTransfer(TOperationId id, TTxState::ETxState state) {
503+
return MakeSubOperation<TCreateReplication>(id, state, &TransferStrategy);
432504
}
433505

434506
}

0 commit comments

Comments
 (0)