Skip to content

Commit 56d008b

Browse files
authored
Move transfer to internal repository (#16671)
1 parent 1788c9e commit 56d008b

File tree

20 files changed

+113
-2118
lines changed

20 files changed

+113
-2118
lines changed

ydb/core/base/appdata_fwd.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ namespace NSchemeShard {
153153
class IOperationFactory;
154154
}
155155

156+
namespace NReplication::NService {
157+
class ITransferWriterFactory;
158+
}
159+
156160
class TFormatFactory;
157161

158162
namespace NYamlConfig {
@@ -188,6 +192,7 @@ struct TAppData {
188192

189193
const NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory* PersQueueGetReadSessionsInfoWorkerFactory = nullptr;
190194
const NPQ::IPersQueueMirrorReaderFactory* PersQueueMirrorReaderFactory = nullptr;
195+
std::shared_ptr<NReplication::NService::ITransferWriterFactory> TransferWriterFactory = nullptr;
191196
NYdb::TDriver* YdbDriver = nullptr;
192197
const NPDisk::IIoContextFactory* IoContextFactory = nullptr;
193198

ydb/core/driver_lib/run/factories.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_devicemode.h>
33
#include <ydb/core/kqp/common/kqp.h>
44
#include <ydb/core/tx/datashard/export_iface.h>
5+
#include <ydb/core/tx/replication/service/transfer_writer_factory.h>
56
#include <ydb/core/tx/schemeshard/schemeshard_operation_factory.h>
67
#include <ydb/core/persqueue/actor_persqueue_client_iface.h>
78
#include <ydb/core/protos/auth.pb.h>
@@ -53,6 +54,7 @@ struct TModuleFactories {
5354
TGrpcServiceFactory GrpcServiceFactory;
5455

5556
std::shared_ptr<NPQ::IPersQueueMirrorReaderFactory> PersQueueMirrorReaderFactory;
57+
std::shared_ptr<NReplication::NService::ITransferWriterFactory> TransferWriterFactory;
5658
/// Factory for pdisk's aio engines
5759
std::shared_ptr<NPDisk::IIoContextFactory> IoContextFactory;
5860

ydb/core/driver_lib/run/run.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,9 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
11271127
AppData->IoContextFactory = ModuleFactories ? ModuleFactories->IoContextFactory.get() : nullptr;
11281128
AppData->SchemeOperationFactory = ModuleFactories ? ModuleFactories->SchemeOperationFactory.get() : nullptr;
11291129
AppData->ConfigSwissKnife = ModuleFactories ? ModuleFactories->ConfigSwissKnife.get() : nullptr;
1130+
if (ModuleFactories) {
1131+
AppData->TransferWriterFactory = ModuleFactories->TransferWriterFactory;
1132+
}
11301133

11311134
AppData->SqsAuthFactory = ModuleFactories
11321135
? ModuleFactories->SqsAuthFactory.get()

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ namespace NKqp {
2626

2727
using namespace NYdb;
2828
using namespace NYdb::NTable;
29+
using namespace NYdb::NReplication;
2930

3031
Y_UNIT_TEST_SUITE(KqpScheme) {
3132
Y_UNIT_TEST(UseUnauthorizedTable) {

ydb/core/testlib/actors/test_runtime.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/core/mon_alloc/profiler.h>
99
#include <ydb/core/grpc_services/grpc_helper.h>
1010
#include <ydb/core/tablet/tablet_impl.h>
11+
#include <ydb/core/testlib/mock_transfer_writer_factory.h>
1112

1213
#include <ydb/library/actors/core/executor_pool_basic.h>
1314
#include <ydb/library/actors/core/executor_pool_io.h>
@@ -182,6 +183,7 @@ namespace NActors {
182183
nodeAppData->EnableMvccSnapshotWithLegacyDomainRoot = app0->EnableMvccSnapshotWithLegacyDomainRoot;
183184
nodeAppData->IoContextFactory = app0->IoContextFactory;
184185
nodeAppData->SchemeOperationFactory = app0->SchemeOperationFactory;
186+
nodeAppData->TransferWriterFactory = std::make_shared<NKikimr::Tests::MockTransferWriterFactory>();
185187
if (nodeIndex < egg.Icb.size()) {
186188
nodeAppData->Icb = std::move(egg.Icb[nodeIndex]);
187189
nodeAppData->InFlightLimiterRegistry.Reset(new NKikimr::NGRpcService::TInFlightLimiterRegistry(nodeAppData->Icb));
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/replication/service/transfer_writer_factory.h>
4+
#include <ydb/library/actors/core/actor_bootstrapped.h>
5+
6+
namespace NKikimr::Tests {
7+
8+
struct MockTransferWriterFactory : public NKikimr::NReplication::NService::ITransferWriterFactory {
9+
struct MockActor : public NActors::TActorBootstrapped<MockActor> {
10+
void Bootstrap() {}
11+
};
12+
13+
14+
NActors::IActor* Create(const Parameters&) const override {
15+
return new MockActor();
16+
}
17+
};
18+
19+
} // namespace NKikimr::Tests

ydb/core/testlib/test_client.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ namespace Tests {
538538
appData.GraphConfig.MergeFrom(Settings->AppConfig->GetGraphConfig());
539539
appData.SqsConfig.MergeFrom(Settings->AppConfig->GetSqsConfig());
540540
appData.SharedCacheConfig.MergeFrom(Settings->AppConfig->GetSharedCacheConfig());
541+
appData.TransferWriterFactory = Settings->TransferWriterFactory;
541542

542543
appData.DynamicNameserviceConfig = new TDynamicNameserviceConfig;
543544
auto dnConfig = appData.DynamicNameserviceConfig;

ydb/core/testlib/test_client.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <ydb/core/protos/flat_scheme_op.pb.h>
2525
#include <ydb/core/testlib/basics/runtime.h>
2626
#include <ydb/core/testlib/basics/appdata.h>
27+
#include <ydb/core/testlib/mock_transfer_writer_factory.h>
2728
#include <ydb/core/protos/kesus.pb.h>
2829
#include <ydb/core/protos/table_service_config.pb.h>
2930
#include <ydb/core/protos/console_tenant.pb.h>
@@ -157,6 +158,8 @@ namespace Tests {
157158
std::shared_ptr<NKikimr::NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PersQueueGetReadSessionsInfoWorkerFactory;
158159
std::shared_ptr<NKikimr::NHttpProxy::IAuthFactory> DataStreamsAuthFactory;
159160
std::shared_ptr<NKikimr::NPQ::TPersQueueMirrorReaderFactory> PersQueueMirrorReaderFactory = std::make_shared<NKikimr::NPQ::TPersQueueMirrorReaderFactory>();
161+
std::shared_ptr<NKikimr::NReplication::NService::ITransferWriterFactory> TransferWriterFactory = std::make_shared<MockTransferWriterFactory>();
162+
160163
bool EnableMetering = false;
161164
TString MeteringFilePath;
162165
TString AwsRegion;

ydb/core/tx/replication/service/service.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#include "service.h"
33
#include "table_writer.h"
44
#include "topic_reader.h"
5-
#include "transfer_writer.h"
5+
#include "transfer_writer_factory.h"
66
#include "worker.h"
77

88
#include <ydb/core/base/appdata.h>
@@ -417,7 +417,8 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
417417
}
418418

419419
std::function<IActor*(void)> TransferWriterFn(
420-
const NKikimrReplication::TTransferWriterSettings& writerSettings)
420+
const NKikimrReplication::TTransferWriterSettings& writerSettings,
421+
const ITransferWriterFactory* transferWriterFactory)
421422
{
422423
if (!CompilationService) {
423424
CompilationService = Register(
@@ -429,9 +430,10 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
429430
tablePathId = TPathId::FromProto(writerSettings.GetPathId()),
430431
transformLambda = writerSettings.GetTransformLambda(),
431432
compilationService = *CompilationService,
432-
batchingSettings = writerSettings.GetBatching()
433+
batchingSettings = writerSettings.GetBatching(),
434+
transferWriterFactory = transferWriterFactory
433435
]() {
434-
return CreateTransferWriter(transformLambda, tablePathId, compilationService, batchingSettings);
436+
return transferWriterFactory->Create({transformLambda, tablePathId, compilationService, batchingSettings});
435437
};
436438
}
437439

@@ -478,7 +480,12 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
478480
writerFn = WriterFn(writerSettings, consistencySettings);
479481
} else if (cmd.HasTransferWriter()) {
480482
const auto& writerSettings = cmd.GetTransferWriter();
481-
writerFn = TransferWriterFn(writerSettings);
483+
const auto* transferWriterFactory = AppData()->TransferWriterFactory.get();
484+
if (!transferWriterFactory) {
485+
LOG_C("Run transfer but TransferWriterFactory does not exists.");
486+
return;
487+
}
488+
writerFn = TransferWriterFn(writerSettings, transferWriterFactory);
482489
} else {
483490
Y_ABORT("Unsupported");
484491
}

0 commit comments

Comments
 (0)