Skip to content

Commit 3e1ac5a

Browse files
authored
Merge pull request #18001 from nshestakov/TR-25-1
Added the transfer data from topics to tables
2 parents 171d4a9 + c680e73 commit 3e1ac5a

File tree

146 files changed

+5801
-2109
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

146 files changed

+5801
-2109
lines changed

ydb/core/backup/impl/local_partition_reader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,11 @@ class TLocalPartitionReader
135135

136136
for (auto& result : readResult.GetResult()) {
137137
gotOffset = std::max(gotOffset, result.GetOffset());
138-
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData());
138+
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo());
139139
}
140140
SentOffset = gotOffset + 1;
141141

142-
Send(Worker, new TEvWorker::TEvData(ToString(Partition), std::move(records)));
142+
Send(Worker, new TEvWorker::TEvData(Partition, ToString(Partition), std::move(records)));
143143
}
144144

145145
void Leave(TEvWorker::TEvGone::EStatus status) {

ydb/core/base/appdata_fwd.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ namespace NSchemeShard {
153153
class IOperationFactory;
154154
}
155155

156+
namespace NReplication::NService {
157+
class ITransferWriterFactory;
158+
}
159+
156160
class TFormatFactory;
157161

158162
namespace NYamlConfig {
@@ -188,6 +192,7 @@ struct TAppData {
188192

189193
const NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory* PersQueueGetReadSessionsInfoWorkerFactory = nullptr;
190194
const NPQ::IPersQueueMirrorReaderFactory* PersQueueMirrorReaderFactory = nullptr;
195+
std::shared_ptr<NReplication::NService::ITransferWriterFactory> TransferWriterFactory = nullptr;
191196
NYdb::TDriver* YdbDriver = nullptr;
192197
const NPDisk::IIoContextFactory* IoContextFactory = nullptr;
193198

ydb/core/driver_lib/run/factories.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_devicemode.h>
33
#include <ydb/core/kqp/common/kqp.h>
44
#include <ydb/core/tx/datashard/export_iface.h>
5+
#include <ydb/core/tx/replication/service/transfer_writer_factory.h>
56
#include <ydb/core/tx/schemeshard/schemeshard_operation_factory.h>
67
#include <ydb/core/persqueue/actor_persqueue_client_iface.h>
78
#include <ydb/core/protos/auth.pb.h>
@@ -53,6 +54,7 @@ struct TModuleFactories {
5354
TGrpcServiceFactory GrpcServiceFactory;
5455

5556
std::shared_ptr<NPQ::IPersQueueMirrorReaderFactory> PersQueueMirrorReaderFactory;
57+
std::shared_ptr<NReplication::NService::ITransferWriterFactory> TransferWriterFactory;
5658
/// Factory for pdisk's aio engines
5759
std::shared_ptr<NPDisk::IIoContextFactory> IoContextFactory;
5860

ydb/core/driver_lib/run/run.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,9 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
11261126
AppData->IoContextFactory = ModuleFactories ? ModuleFactories->IoContextFactory.get() : nullptr;
11271127
AppData->SchemeOperationFactory = ModuleFactories ? ModuleFactories->SchemeOperationFactory.get() : nullptr;
11281128
AppData->ConfigSwissKnife = ModuleFactories ? ModuleFactories->ConfigSwissKnife.get() : nullptr;
1129+
if (ModuleFactories) {
1130+
AppData->TransferWriterFactory = ModuleFactories->TransferWriterFactory;
1131+
}
11291132

11301133
AppData->SqsAuthFactory = ModuleFactories
11311134
? ModuleFactories->SqsAuthFactory.get()

ydb/core/grpc_services/rpc_replication.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,9 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
232232
case NKikimrReplication::TReplicationState::kDone:
233233
to.mutable_done();
234234
break;
235+
case NKikimrReplication::TReplicationState::kPaused:
236+
to.mutable_paused();
237+
break;
235238
default:
236239
break;
237240
}
@@ -246,4 +249,11 @@ void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProv
246249
f.RegisterActor(new TDescribeReplicationRPC(p.release()));
247250
}
248251

252+
using TEvDescribeReplicationRequest = TGrpcRequestOperationCall<Ydb::Replication::DescribeReplicationRequest, Ydb::Replication::DescribeReplicationResponse>;
253+
254+
template<>
255+
IActor* TEvDescribeReplicationRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
256+
return new TDescribeReplicationRPC(msg);
257+
}
258+
249259
}

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2154,6 +2154,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
21542154
TTopicTabletTxs topicTxs;
21552155
TDatashardTxs datashardTxs;
21562156
TEvWriteTxs evWriteTxs;
2157+
21572158
if (!TxManager) {
21582159
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
21592160
}

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2469,6 +2469,12 @@ class TKqpGatewayProxy : public IKikimrGateway {
24692469
auto& state = *op.MutableState();
24702470
state.MutableDone()->SetFailoverMode(
24712471
static_cast<NKikimrReplication::TReplicationState::TDone::EFailoverMode>(done->FailoverMode));
2472+
} else if (const auto& paused = settings.Settings.StatePaused) {
2473+
auto& state = *op.MutableState();
2474+
state.MutablePaused();
2475+
} else if (const auto& standBy = settings.Settings.StateStandBy) {
2476+
auto& state = *op.MutableState();
2477+
state.MutableStandBy();
24722478
}
24732479

24742480
if (settings.Settings.ConnectionString || settings.Settings.Endpoint || settings.Settings.Database ||
@@ -2601,12 +2607,21 @@ class TKqpGatewayProxy : public IKikimrGateway {
26012607
staticCreds->Serialize(*params.MutableStaticCredentials());
26022608
}
26032609

2604-
auto& targets = *config.MutableTransferSpecific();
2605-
for (const auto& [src, dst, lambda] : settings.Targets) {
2606-
auto& target = *targets.AddTargets();
2610+
{
2611+
const auto& [src, dst, lambda] = settings.Target;
2612+
auto& target = *config.MutableTransferSpecific()->MutableTarget();
26072613
target.SetSrcPath(AdjustPath(src, params.GetDatabase()));
26082614
target.SetDstPath(AdjustPath(dst, GetDatabase()));
26092615
target.SetTransformLambda(lambda);
2616+
if (settings.Settings.Batching && settings.Settings.Batching->BatchSizeBytes) {
2617+
config.MutableTransferSpecific()->MutableBatching()->SetBatchSizeBytes(settings.Settings.Batching->BatchSizeBytes.value());
2618+
}
2619+
if (settings.Settings.Batching && settings.Settings.Batching->FlushInterval) {
2620+
config.MutableTransferSpecific()->MutableBatching()->SetFlushIntervalMilliSeconds(settings.Settings.Batching->FlushInterval.MilliSeconds());
2621+
}
2622+
if (settings.Settings.ConsumerName) {
2623+
target.SetConsumerName(*settings.Settings.ConsumerName);
2624+
}
26102625
}
26112626

26122627
if (IsPrepare()) {
@@ -2650,13 +2665,27 @@ class TKqpGatewayProxy : public IKikimrGateway {
26502665
auto& op = *tx.MutableAlterReplication();
26512666
op.SetName(pathPair.second);
26522667
if (!settings.TranformLambda.empty()) {
2653-
op.SetTransferTransformLambda(settings.TranformLambda);
2668+
op.MutableAlterTransfer()->SetTransformLambda(settings.TranformLambda);
2669+
}
2670+
if (auto& batching = settings.Settings.Batching) {
2671+
if (batching->FlushInterval) {
2672+
op.MutableAlterTransfer()->SetFlushIntervalMilliSeconds(batching->FlushInterval.MilliSeconds());
2673+
}
2674+
if (batching->BatchSizeBytes) {
2675+
op.MutableAlterTransfer()->SetBatchSizeBytes(batching->BatchSizeBytes.value());
2676+
}
26542677
}
26552678

26562679
if (const auto& done = settings.Settings.StateDone) {
26572680
auto& state = *op.MutableState();
26582681
state.MutableDone()->SetFailoverMode(
26592682
static_cast<NKikimrReplication::TReplicationState::TDone::EFailoverMode>(done->FailoverMode));
2683+
} else if (const auto& paused = settings.Settings.StatePaused) {
2684+
auto& state = *op.MutableState();
2685+
state.MutablePaused();
2686+
} else if (const auto& standBy = settings.Settings.StateStandBy) {
2687+
auto& state = *op.MutableState();
2688+
state.MutableStandBy();
26602689
}
26612690

26622691
if (settings.Settings.ConnectionString || settings.Settings.Endpoint || settings.Settings.Database ||

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,10 @@ namespace {
708708
auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
709709
if (to_lower(value) == "done") {
710710
dstSettings.EnsureStateDone();
711+
} else if (to_lower(value) == "paused") {
712+
dstSettings.StatePaused = true;
713+
} else if (to_lower(value) == "standby") {
714+
dstSettings.StateStandBy = true;
711715
} else {
712716
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
713717
TStringBuilder() << "Unknown " << objectName << " state: " << value));
@@ -802,7 +806,52 @@ namespace {
802806
bool ParseTransferSettings(
803807
TTransferSettings& dstSettings, const TCoNameValueTupleList& srcSettings, TExprContext& ctx, TPositionHandle pos
804808
) {
805-
return ParseAsyncReplicationSettingsBase(dstSettings, srcSettings, ctx, pos, "transfer");
809+
if (!ParseAsyncReplicationSettingsBase(dstSettings, srcSettings, ctx, pos, "transfer")) {
810+
return false;
811+
}
812+
813+
for (auto setting : srcSettings) {
814+
auto name = setting.Name().Value();
815+
if (name == "batch_size_bytes") {
816+
auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
817+
auto batchSizeBytes = FromString<i64>(value);
818+
if (batchSizeBytes <= 0) {
819+
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
820+
TStringBuilder() << name << " must be greater than 0 but " << value));
821+
return false;
822+
}
823+
dstSettings.EnsureBatching().BatchSizeBytes = batchSizeBytes;
824+
} else if (name == "flush_interval") {
825+
if (!setting.Value().Maybe<TCoInterval>()) {
826+
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
827+
TStringBuilder() << name << " must be Interval"));
828+
return false;
829+
}
830+
831+
const auto value = FromString<i64>(
832+
setting.Value().Cast<TCoInterval>().Literal().Value()
833+
);
834+
835+
if (value <= 0) {
836+
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
837+
TStringBuilder() << name << " must be positive"));
838+
return false;
839+
}
840+
841+
dstSettings.EnsureBatching().FlushInterval = TDuration::FromValue(value);
842+
} else if (name == "consumer") {
843+
auto value = ToString(setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value());
844+
if (value.empty()) {
845+
ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
846+
TStringBuilder() << name << " must be not empty"));
847+
return false;
848+
}
849+
850+
dstSettings.ConsumerName = value;
851+
}
852+
}
853+
854+
return true;
806855
}
807856

808857
bool ParseBackupCollectionSettings(
@@ -2359,11 +2408,11 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
23592408
TCreateTransferSettings settings;
23602409
settings.Name = TString(createTransfer.Transfer());
23612410

2362-
settings.Targets.emplace_back(
2411+
settings.Target = std::tuple<TString, TString, TString>{
23632412
createTransfer.Source(),
23642413
createTransfer.Target(),
23652414
createTransfer.TransformLambda()
2366-
);
2415+
};
23672416

23682417
if (!ParseTransferSettings(settings.Settings, createTransfer.TransferSettings(), ctx, createTransfer.Pos())) {
23692418
return SyncError();

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,8 @@ struct TReplicationSettingsBase {
835835
TMaybe<TOAuthToken> OAuthToken;
836836
TMaybe<TStaticCredentials> StaticCredentials;
837837
TMaybe<TStateDone> StateDone;
838+
bool StatePaused = false;
839+
bool StateStandBy = false;
838840

839841
using EFailoverMode = TStateDone::EFailoverMode;
840842
TStateDone& EnsureStateDone(EFailoverMode mode = EFailoverMode::Consistent) {
@@ -911,11 +913,27 @@ struct TDropReplicationSettings {
911913
};
912914

913915
struct TTransferSettings : public TReplicationSettingsBase {
916+
917+
struct TBatching {
918+
TDuration FlushInterval;
919+
std::optional<ui64> BatchSizeBytes;
920+
};
921+
922+
TMaybe<TString> ConsumerName;
923+
TMaybe<TBatching> Batching;
924+
925+
TBatching& EnsureBatching() {
926+
if (!Batching) {
927+
Batching = TBatching();
928+
}
929+
930+
return *Batching;
931+
}
914932
};
915933

916934
struct TCreateTransferSettings {
917935
TString Name;
918-
TVector<std::tuple<TString, TString, TString>> Targets;
936+
std::tuple<TString, TString, TString> Target;
919937
TTransferSettings Settings;
920938
};
921939

ydb/core/kqp/provider/yql_kikimr_type_ann.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,6 +1833,9 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
18331833
"password",
18341834
"password_secret_name",
18351835
"commit_interval",
1836+
"flush_interval",
1837+
"batch_size_bytes",
1838+
"consumer",
18361839
};
18371840

18381841
if (!CheckReplicationSettings(node.TransferSettings(), supportedSettings, ctx)) {
@@ -1860,6 +1863,8 @@ virtual TStatus HandleCreateTable(TKiCreateTable create, TExprContext& ctx) over
18601863
"password_secret_name",
18611864
"state",
18621865
"failover_mode",
1866+
"flush_interval",
1867+
"batch_size_bytes",
18631868
};
18641869

18651870
if (!CheckReplicationSettings(node.TransferSettings(), supportedSettings, ctx)) {

0 commit comments

Comments
 (0)