Skip to content

Commit 26b0dc5

Browse files
committed
YQ-3561 first version of FQ run tool (ydb-platform#14259)
1 parent a37f865 commit 26b0dc5

27 files changed

+1441
-437
lines changed

ydb/core/testlib/test_client.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ namespace Tests {
440440
GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true));
441441
GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1));
442442
GRpcServer->AddService(new NGRpcService::TGRpcYdbTabletService(system, counters, grpcRequestProxies, true, 1));
443-
if (Settings->EnableYq) {
443+
if (Settings->EnableYq || Settings->EnableYqGrpc) {
444444
GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0]));
445445
GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0]));
446446
}
@@ -1099,6 +1099,12 @@ namespace Tests {
10991099
}
11001100
}
11011101

1102+
{
1103+
auto statActor = NStat::CreateStatService();
1104+
const TActorId statActorId = Runtime->Register(statActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).UserPoolId);
1105+
Runtime->RegisterService(NStat::MakeStatServiceID(Runtime->GetNodeId(nodeIdx)), statActorId, nodeIdx);
1106+
}
1107+
11021108
{
11031109
IActor* kesusService = NKesus::CreateKesusProxyService();
11041110
TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId);

ydb/core/testlib/test_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ namespace Tests {
140140
bool UseRealThreads = true;
141141
bool EnableKqpSpilling = false;
142142
bool EnableYq = false;
143+
bool EnableYqGrpc = false;
143144
TDuration KeepSnapshotTimeout = TDuration::Zero();
144145
ui64 ChangesQueueItemsLimit = 0;
145146
ui64 ChangesQueueBytesLimit = 0;
@@ -205,6 +206,7 @@ namespace Tests {
205206
TServerSettings& SetEnableDbCounters(bool value) { FeatureFlags.SetEnableDbCounters(value); return *this; }
206207
TServerSettings& SetEnablePersistentQueryStats(bool value) { FeatureFlags.SetEnablePersistentQueryStats(value); return *this; }
207208
TServerSettings& SetEnableYq(bool value) { EnableYq = value; return *this; }
209+
TServerSettings& SetEnableYqGrpc(bool value) { EnableYqGrpc = value; return *this; }
208210
TServerSettings& SetKeepSnapshotTimeout(TDuration value) { KeepSnapshotTimeout = value; return *this; }
209211
TServerSettings& SetChangesQueueItemsLimit(ui64 value) { ChangesQueueItemsLimit = value; return *this; }
210212
TServerSettings& SetChangesQueueBytesLimit(ui64 value) { ChangesQueueBytesLimit = value; return *this; }

ydb/tests/tools/fqrun/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
sync_dir
2+
3+
*.log
4+
*.sql
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
Enabled: true
2+
EnableDynamicNameservice: true
3+
EnableTaskCounters: true
4+
5+
CheckpointCoordinator {
6+
Enabled: true
7+
8+
Storage {
9+
TablePrefix: "yq/checkpoints"
10+
ClientTimeoutSec: 70
11+
OperationTimeoutSec: 60
12+
CancelAfterSec: 60
13+
}
14+
}
15+
16+
Common {
17+
MdbGateway: "https://mdb.api.cloud.yandex.net:443"
18+
ObjectStorageEndpoint: "https://storage-internal.cloud.yandex.net"
19+
IdsPrefix: "kr"
20+
QueryArtifactsCompressionMethod: "zstd_6"
21+
MonitoringEndpoint: "monitoring.api.cloud.yandex.net"
22+
KeepInternalErrors: true
23+
UseNativeProtocolForClickHouse: true
24+
DisableSslForGenericDataSources: true
25+
ShowQueryTimeline: true
26+
}
27+
28+
ControlPlaneProxy {
29+
Enabled: true
30+
}
31+
32+
ControlPlaneStorage {
33+
Enabled: true
34+
StatsMode: STATS_MODE_PROFILE
35+
36+
AvailableConnection: "OBJECT_STORAGE"
37+
AvailableConnection: "DATA_STREAMS"
38+
AvailableConnection: "MONITORING"
39+
AvailableConnection: "POSTGRESQL_CLUSTER"
40+
AvailableConnection: "CLICKHOUSE_CLUSTER"
41+
AvailableConnection: "YDB_DATABASE"
42+
AvailableConnection: "GREENPLUM_CLUSTER"
43+
AvailableConnection: "MYSQL_CLUSTER"
44+
45+
AvailableStreamingConnection: "OBJECT_STORAGE"
46+
AvailableStreamingConnection: "DATA_STREAMS"
47+
AvailableStreamingConnection: "MONITORING"
48+
AvailableStreamingConnection: "YDB_DATABASE"
49+
50+
AvailableBinding: "OBJECT_STORAGE"
51+
AvailableBinding: "DATA_STREAMS"
52+
53+
Storage {
54+
TablePrefix: "yq/control_plane"
55+
ClientTimeoutSec: 70
56+
OperationTimeoutSec: 60
57+
CancelAfterSec: 60
58+
}
59+
}
60+
61+
DbPool {
62+
Enabled: true
63+
64+
Storage {
65+
TablePrefix: "yq/db_pool"
66+
ClientTimeoutSec: 70
67+
OperationTimeoutSec: 60
68+
CancelAfterSec: 60
69+
}
70+
}
71+
72+
NodesManager {
73+
Enabled: true
74+
}
75+
76+
PendingFetcher {
77+
Enabled: true
78+
}
79+
80+
PrivateApi {
81+
Enabled: true
82+
}
83+
84+
PrivateProxy {
85+
Enabled: true
86+
}
87+
88+
QuotasManager {
89+
Enabled: true
90+
}
91+
92+
RateLimiter {
93+
Enabled: true
94+
ControlPlaneEnabled: true
95+
DataPlaneEnabled: true
96+
97+
Database {
98+
TablePrefix: "yq/rate_limiter"
99+
ClientTimeoutSec: 70
100+
OperationTimeoutSec: 60
101+
CancelAfterSec: 60
102+
}
103+
104+
Limiters {
105+
CoordinationNodePath: "limiter_alpha"
106+
}
107+
}
108+
109+
ResourceManager {
110+
Enabled: true
111+
}
112+
113+
RowDispatcher {
114+
Enabled: true
115+
SendStatusPeriodSec: 10
116+
TimeoutBeforeStartSessionSec: 10
117+
118+
CompileService {
119+
ParallelCompilationLimit: 20
120+
}
121+
122+
Coordinator {
123+
CoordinationNodePath: "yq/row_dispatcher"
124+
125+
Database {
126+
TablePrefix: "yq/row_dispatcher"
127+
ClientTimeoutSec: 70
128+
OperationTimeoutSec: 60
129+
CancelAfterSec: 60
130+
}
131+
}
132+
133+
JsonParser {
134+
BatchSizeBytes: 1048576
135+
BatchCreationTimeoutMs: 1000
136+
}
137+
}
138+
139+
TestConnection {
140+
Enabled: true
141+
}

ydb/tests/tools/fqrun/fqprun.cpp

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
#include <library/cpp/colorizer/colors.h>
2+
#include <library/cpp/getopt/last_getopt.h>
3+
4+
#include <util/datetime/base.h>
5+
6+
#include <ydb/tests/tools/fqrun/src/fq_runner.h>
7+
#include <ydb/tests/tools/kqprun/runlib/application.h>
8+
#include <ydb/tests/tools/kqprun/runlib/utils.h>
9+
10+
using namespace NKikimrRun;
11+
12+
namespace NFqRun {
13+
14+
namespace {
15+
16+
struct TExecutionOptions {
17+
TString Query;
18+
19+
bool HasResults() const {
20+
return !Query.empty();
21+
}
22+
23+
TRequestOptions GetQueryOptions() const {
24+
return {
25+
.Query = Query
26+
};
27+
}
28+
29+
void Validate(const TRunnerOptions& runnerOptions) const {
30+
if (!Query && !runnerOptions.FqSettings.MonitoringEnabled && !runnerOptions.FqSettings.GrpcEnabled) {
31+
ythrow yexception() << "Nothing to execute and is not running as daemon";
32+
}
33+
}
34+
};
35+
36+
void RunArgumentQueries(const TExecutionOptions& executionOptions, TFqRunner& runner) {
37+
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
38+
39+
if (executionOptions.Query) {
40+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing query..." << colors.Default() << Endl;
41+
if (!runner.ExecuteStreamQuery(executionOptions.GetQueryOptions())) {
42+
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
43+
}
44+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching query results..." << colors.Default() << Endl;
45+
if (!runner.FetchQueryResults()) {
46+
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch query results failed";
47+
}
48+
}
49+
50+
if (executionOptions.HasResults()) {
51+
try {
52+
runner.PrintQueryResults();
53+
} catch (...) {
54+
ythrow yexception() << "Failed to print script results, reason:\n" << CurrentExceptionMessage();
55+
}
56+
}
57+
}
58+
59+
void RunAsDaemon() {
60+
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
61+
62+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization finished" << colors.Default() << Endl;
63+
while (true) {
64+
Sleep(TDuration::Seconds(1));
65+
}
66+
}
67+
68+
void RunScript(const TExecutionOptions& executionOptions, const TRunnerOptions& runnerOptions) {
69+
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
70+
71+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Initialization of fq runner..." << colors.Default() << Endl;
72+
TFqRunner runner(runnerOptions);
73+
74+
try {
75+
RunArgumentQueries(executionOptions, runner);
76+
} catch (const yexception& exception) {
77+
if (runnerOptions.FqSettings.MonitoringEnabled) {
78+
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
79+
} else {
80+
throw exception;
81+
}
82+
}
83+
84+
if (runnerOptions.FqSettings.MonitoringEnabled || runnerOptions.FqSettings.GrpcEnabled) {
85+
RunAsDaemon();
86+
}
87+
88+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Finalization of fq runner..." << colors.Default() << Endl;
89+
}
90+
91+
class TMain : public TMainBase {
92+
protected:
93+
void RegisterOptions(NLastGetopt::TOpts& options) override {
94+
options.SetTitle("FqRun -- tool to execute stream queries through FQ proxy");
95+
options.AddHelpOption('h');
96+
options.SetFreeArgsNum(0);
97+
98+
// Inputs
99+
100+
options.AddLongOption('p', "query", "Query to execute")
101+
.RequiredArgument("file")
102+
.StoreMappedResult(&ExecutionOptions.Query, &LoadFile);
103+
104+
options.AddLongOption("fq-cfg", "File with FQ config (NFq::NConfig::TConfig for FQ proxy)")
105+
.RequiredArgument("file")
106+
.DefaultValue("./configuration/fq_config.conf")
107+
.Handler1([this](const NLastGetopt::TOptsParser* option) {
108+
if (!google::protobuf::TextFormat::ParseFromString(LoadFile(TString(option->CurValOrDef())), &RunnerOptions.FqSettings.FqConfig)) {
109+
ythrow yexception() << "Bad format of FQ configuration";
110+
}
111+
});
112+
113+
// Outputs
114+
115+
options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)")
116+
.RequiredArgument("file")
117+
.DefaultValue("-")
118+
.StoreMappedResultT<TString>(&RunnerOptions.ResultOutput, &GetDefaultOutput);
119+
120+
TChoices<EResultOutputFormat> resultFormat({
121+
{"rows", EResultOutputFormat::RowsJson},
122+
{"full-json", EResultOutputFormat::FullJson},
123+
{"full-proto", EResultOutputFormat::FullProto}
124+
});
125+
options.AddLongOption('R', "result-format", "Query result format")
126+
.RequiredArgument("result-format")
127+
.DefaultValue("rows")
128+
.Choices(resultFormat.GetChoices())
129+
.StoreMappedResultT<TString>(&RunnerOptions.ResultOutputFormat, resultFormat);
130+
131+
RegisterKikimrOptions(options, RunnerOptions.FqSettings);
132+
}
133+
134+
int DoRun(NLastGetopt::TOptsParseResult&&) override {
135+
ExecutionOptions.Validate(RunnerOptions);
136+
137+
auto& logConfig = RunnerOptions.FqSettings.LogConfig;
138+
logConfig.SetDefaultLevel(NActors::NLog::EPriority::PRI_CRIT);
139+
FillLogConfig(logConfig);
140+
141+
RunScript(ExecutionOptions, RunnerOptions);
142+
143+
return 0;
144+
}
145+
146+
private:
147+
TExecutionOptions ExecutionOptions;
148+
TRunnerOptions RunnerOptions;
149+
};
150+
151+
} // anonymous namespace
152+
153+
} // namespace NFqRun
154+
155+
int main(int argc, const char* argv[]) {
156+
SetupSignalActions();
157+
158+
try {
159+
NFqRun::TMain().Run(argc, argv);
160+
} catch (...) {
161+
NColorizer::TColors colors = NColorizer::AutoColors(Cerr);
162+
163+
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
164+
return 1;
165+
}
166+
}

ydb/tests/tools/fqrun/src/common.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#pragma once
2+
3+
#include <util/generic/string.h>
4+
5+
#include <ydb/core/fq/libs/config/protos/fq_config.pb.h>
6+
#include <ydb/core/protos/config.pb.h>
7+
#include <ydb/tests/tools/kqprun/runlib/settings.h>
8+
9+
namespace NFqRun {
10+
11+
constexpr i64 MAX_RESULT_SET_ROWS = 1000;
12+
13+
struct TFqSetupSettings : public NKikimrRun::TServerSettings {
14+
NFq::NConfig::TConfig FqConfig;
15+
NKikimrConfig::TLogConfig LogConfig;
16+
};
17+
18+
struct TRunnerOptions {
19+
IOutputStream* ResultOutput = nullptr;
20+
NKikimrRun::EResultOutputFormat ResultOutputFormat = NKikimrRun::EResultOutputFormat::RowsJson;
21+
22+
TFqSetupSettings FqSettings;
23+
};
24+
25+
struct TRequestOptions {
26+
TString Query;
27+
};
28+
29+
} // namespace NFqRun

0 commit comments

Comments
 (0)