Skip to content

Commit 810d5d5

Browse files
Yq 3560 Add row dispatcher to dqrun (#9697)
Co-authored-by: Fiodar Miron <fedor-miron@ydb.tech> Co-authored-by: Fiodar Miron <61616792+fedor-miron@users.noreply.github.com>
1 parent a0930a2 commit 810d5d5

File tree

8 files changed

+109
-11
lines changed

8 files changed

+109
-11
lines changed

ydb/core/fq/libs/control_plane_storage/ya.make

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ PEERDIR(
2020
library/cpp/lwtrace
2121
library/cpp/protobuf/interop
2222
ydb/core/base
23+
ydb/core/external_sources
2324
ydb/core/fq/libs/actors/logging
2425
ydb/core/fq/libs/common
2526
ydb/core/fq/libs/config
@@ -33,13 +34,13 @@ PEERDIR(
3334
ydb/core/fq/libs/shared_resources
3435
ydb/core/fq/libs/ydb
3536
ydb/core/mon
37+
ydb/library/db_pool
3638
ydb/library/security
39+
ydb/library/yql/providers/s3/path_generator
40+
ydb/library/yql/public/issue
3741
ydb/public/api/protos
3842
ydb/public/sdk/cpp/client/ydb_scheme
3943
ydb/public/sdk/cpp/client/ydb_table
40-
ydb/library/db_pool
41-
ydb/library/yql/providers/s3/path_generator
42-
ydb/library/yql/public/issue
4344
)
4445

4546
YQL_LAST_ABI_VERSION()

ydb/core/fq/libs/init/init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ void Init(
207207
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());
208208

209209
RegisterDqInputTransformLookupActorFactory(*asyncIoFactory);
210-
210+
211211
NYql::TPqGatewayServices pqServices(
212212
yqSharedResources->UserSpaceYdbDriver,
213213
pqCmConnections,

ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ class TLocalServiceHolder {
3131
NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
3232
IMetricsRegistryPtr metricsRegistry,
3333
const std::function<IActor*(void)>& metricsPusherFactory,
34-
bool withSpilling)
34+
bool withSpilling,
35+
TVector<std::pair<TActorId, TActorSetupCmd>>&& additionalLocalServices)
3536
: MetricsRegistry(metricsRegistry
3637
? metricsRegistry
3738
: CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq))
@@ -89,6 +90,9 @@ class TLocalServiceHolder {
8990
NDq::MakeDqLocalFileSpillingServiceID(nodeId),
9091
TActorSetupCmd(spillingActor, TMailboxType::Simple, 0));
9192
}
93+
for (auto& [actorId, setupCmd] : additionalLocalServices) {
94+
ServiceNode->AddLocalService(actorId, std::move(setupCmd));
95+
}
9296

9397
auto statsCollector = CreateStatsCollector(1, *ServiceNode->GetSetup(), MetricsRegistry->GetSensors());
9498

@@ -248,7 +252,8 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
248252
NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
249253
NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
250254
IMetricsRegistryPtr metricsRegistry,
251-
const std::function<IActor*(void)>& metricsPusherFactory, bool withSpilling)
255+
const std::function<IActor*(void)>& metricsPusherFactory, bool withSpilling,
256+
TVector<std::pair<TActorId, TActorSetupCmd>>&& additionalLocalServices)
252257
{
253258
return MakeHolder<TLocalServiceHolder>(functionRegistry,
254259
compFactory,
@@ -260,15 +265,17 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
260265
threads,
261266
metricsRegistry,
262267
metricsPusherFactory,
263-
withSpilling);
268+
withSpilling,
269+
std::move(additionalLocalServices));
264270
}
265271

266272
TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
267273
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
268274
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
269275
bool withSpilling, NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
270276
IMetricsRegistryPtr metricsRegistry,
271-
const std::function<IActor*(void)>& metricsPusherFactory)
277+
const std::function<IActor*(void)>& metricsPusherFactory,
278+
TVector<std::pair<TActorId, TActorSetupCmd>>&& additionalLocalServices)
272279
{
273280
int startPort = 31337;
274281
TRangeWalker<int> portWalker(startPort, startPort+100);
@@ -287,7 +294,8 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio
287294
threads,
288295
metricsRegistry,
289296
metricsPusherFactory,
290-
withSpilling),
297+
withSpilling,
298+
std::move(additionalLocalServices)),
291299
CreateDqGateway("[::1]", grpcPort.Addr.GetPort()));
292300
}
293301

ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <ydb/library/actors/core/actorsystem.h>
34
#include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h>
45
#include <ydb/library/yql/providers/dq/interface/yql_dq_task_preprocessor.h>
56
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
@@ -17,6 +18,7 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio
1718
bool withSpilling,
1819
NDq::IDqAsyncIoFactory::TPtr = nullptr, int threads = 16,
1920
IMetricsRegistryPtr metricsRegistry = {},
20-
const std::function<NActors::IActor*(void)>& metricsPusherFactory = {});
21+
const std::function<NActors::IActor*(void)>& metricsPusherFactory = {},
22+
TVector<std::pair<NActors::TActorId, NActors::TActorSetupCmd>>&& additionalLocalServices = {});
2123

2224
} // namespace NYql

ydb/library/yql/tools/dqrun/dqrun.cpp

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,12 @@
8181
#include <ydb/library/yql/public/result_format/yql_result_format_data.h>
8282

8383
#include <ydb/core/fq/libs/actors/database_resolver.h>
84+
#include <ydb/core/fq/libs/config/protos/fq_config.pb.h>
8485
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
8586
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
87+
#include <ydb/core/fq/libs/init/init.h>
88+
#include <ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h>
89+
8690
#include <ydb/core/util/pb.h>
8791

8892
#include <yt/cpp/mapreduce/interface/init.h>
@@ -180,6 +184,17 @@ void ReadGatewaysConfig(const TString& configFile, TGatewaysConfig* config, THas
180184
}
181185
}
182186

187+
void ReadFqConfig(const TString& fqCfgFile, NFq::NConfig::TConfig* fqConfig) {
188+
if (fqCfgFile.empty()) {
189+
return;
190+
}
191+
auto configData = TFileInput(fqCfgFile).ReadAll();
192+
using ::google::protobuf::TextFormat;
193+
if (!TextFormat::ParseFromString(configData, fqConfig)) {
194+
ythrow yexception() << "Bad format of fq configuration";
195+
}
196+
}
197+
183198
void PatchGatewaysConfig(TGatewaysConfig* config, const TString& mrJobBin, const TString& mrJobUdfsDir,
184199
size_t numThreads, bool keepTemp)
185200
{
@@ -483,9 +498,35 @@ int RunProgram(TProgramPtr program, const TRunOptions& options, const THashMap<T
483498
return 0;
484499
}
485500

501+
void InitFq(const NFq::NConfig::TConfig& fqConfig, TVector<std::pair<TActorId, TActorSetupCmd>>& additionalLocalServices) {
502+
if (fqConfig.HasRowDispatcher() && fqConfig.GetRowDispatcher().GetEnabled()) {
503+
NFq::IYqSharedResources::TPtr iSharedResources = NFq::CreateYqSharedResources(
504+
fqConfig,
505+
NKikimr::CreateYdbCredentialsProviderFactory,
506+
MakeIntrusive<NMonitoring::TDynamicCounters>());
507+
NFq::TYqSharedResources::TPtr yqSharedResources = NFq::TYqSharedResources::Cast(iSharedResources);
508+
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory;
509+
510+
NFq::NConfig::TCommonConfig commonConfig;
511+
auto rowDispatcher = NFq::NewRowDispatcherService(
512+
fqConfig.GetRowDispatcher(),
513+
commonConfig,
514+
NKikimr::CreateYdbCredentialsProviderFactory,
515+
yqSharedResources,
516+
credentialsFactory,
517+
"/tenant",
518+
MakeIntrusive<NMonitoring::TDynamicCounters>());
519+
520+
additionalLocalServices.emplace_back(
521+
NFq::RowDispatcherServiceActorId(),
522+
TActorSetupCmd(rowDispatcher.release(), TMailboxType::Simple, 0));
523+
}
524+
}
525+
486526
int RunMain(int argc, const char* argv[])
487527
{
488528
TString gatewaysCfgFile;
529+
TString fqCfgFile;
489530
TString progFile;
490531
TVector<TString> tablesMappingList;
491532
THashMap<TString, TString> tablesMapping;
@@ -581,6 +622,10 @@ int RunMain(int argc, const char* argv[])
581622
.Optional()
582623
.RequiredArgument("FILE")
583624
.StoreResult(&gatewaysCfgFile);
625+
opts.AddLongOption("fq-cfg", "federated query configuration file")
626+
.Optional()
627+
.RequiredArgument("FILE")
628+
.StoreResult(&fqCfgFile);
584629
opts.AddLongOption("fs-cfg", "Path to file storage config")
585630
.Optional()
586631
.StoreResult(&fileStorageCfg);
@@ -874,6 +919,9 @@ int RunMain(int argc, const char* argv[])
874919
setting->SetValue("1");
875920
}
876921

922+
NFq::NConfig::TConfig fqConfig;
923+
ReadFqConfig(fqCfgFile, &fqConfig);
924+
877925
if (res.Has("enable-spilling")) {
878926
auto* setting = gatewaysConfig.MutableDq()->AddDefaultSettings();
879927
setting->SetName("SpillingEngine");
@@ -1040,6 +1088,8 @@ int RunMain(int argc, const char* argv[])
10401088
clusters.emplace(to_lower(cluster.GetName()), TString{NYql::SolomonProviderName});
10411089
}
10421090
}
1091+
TVector<std::pair<TActorId, TActorSetupCmd>> additionalLocalServices;
1092+
InitFq(fqConfig, additionalLocalServices);
10431093

10441094
std::function<NActors::IActor*(void)> metricsPusherFactory = {};
10451095

@@ -1064,7 +1114,7 @@ int RunMain(int argc, const char* argv[])
10641114
bool enableSpilling = res.Has("enable-spilling");
10651115
dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, enableSpilling,
10661116
CreateAsyncIoFactory(driver, httpGateway, ytFileServices, genericClient, credentialsFactory, *funcRegistry, requestTimeout, maxRetries, pqGateway), threads,
1067-
metricsRegistry, metricsPusherFactory);
1117+
metricsRegistry, metricsPusherFactory, std::move(additionalLocalServices));
10681118
}
10691119

10701120
dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateDqExecTransformer, dqGateway, dqCompFactory, {}, storage));
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
RowDispatcher {
2+
Enabled: true
3+
TimeoutBeforeStartSessionSec: 2
4+
MaxSessionUsedMemory: 0
5+
SendStatusPeriodSec: 10
6+
WithoutConsumer: true
7+
Coordinator {
8+
CoordinationNodePath: "not_used"
9+
Database {
10+
Endpoint: "not_used:2135"
11+
Database: "/not/used/database"
12+
UseLocalMetadataService: true
13+
UseSsl: true
14+
ClientTimeoutSec: 70
15+
OperationTimeoutSec: 60
16+
CancelAfterSec: 60
17+
}
18+
}
19+
}
20+

ydb/library/yql/tools/dqrun/examples/gateways.conf

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ Dq {
6868
Name: "EnableDqReplicate"
6969
Value: "true"
7070
}
71+
DefaultSettings {
72+
Name: "_TableTimeout"
73+
Value: "600000"
74+
}
7175
}
7276

7377
Generic {
@@ -133,3 +137,14 @@ SqlCore {
133137
TranslationFlags: ["FlexibleTypes", "DisableAnsiOptionalAs", "EmitAggApply"]
134138
}
135139

140+
Pq {
141+
ClusterMapping {
142+
Name: "pq"
143+
Endpoint: "localhost:2135"
144+
Database: "local"
145+
ClusterType: CT_DATA_STREAMS
146+
UseSsl: True
147+
SharedReading:False
148+
}
149+
}
150+

ydb/library/yql/tools/dqrun/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ ENDIF()
9595
ydb/library/yql/utils/actor_system
9696
ydb/core/fq/libs/actors
9797
ydb/core/fq/libs/db_id_async_resolver_impl
98+
ydb/core/fq/libs/init
99+
ydb/core/external_sources
98100

99101
ydb/library/yql/udfs/common/clickhouse/client
100102
)

0 commit comments

Comments
 (0)