Skip to content

Commit f6a6332

Browse files
committed
TMP2
1 parent 0e4e79c commit f6a6332

File tree

10 files changed

+96
-52
lines changed

10 files changed

+96
-52
lines changed

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct TDummyTopic {
2626
TString TopicName;
2727
TMaybe<TString> Path;
2828
size_t PartitionsCount;
29+
bool CancelOnFileFinish = false;
2930
};
3031

3132
// Dummy Pq gateway for tests.

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@ class TFileTopicReadSession : public NYdb::NTopic::IReadSession {
1616
constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
1717

1818
public:
19-
TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "")
19+
TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "", bool cancelOnFileFinish = false)
2020
: File_(std::move(file))
2121
, Session_(std::move(session))
2222
, ProducerId_(producerId)
2323
, FilePoller_([this] () {
2424
PollFileForChanges();
2525
})
2626
, Counters_()
27+
, CancelOnFileFinish_(cancelOnFileFinish)
2728
{
2829
Pool_.Start(1);
2930
}
@@ -133,7 +134,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
133134
}
134135
if (!msgs.empty()) {
135136
EventsQ_.Push(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, Session_), size);
136-
} else {
137+
} else if (CancelOnFileFinish_) {
137138
EventsQ_.Push(NYdb::NTopic::TSessionClosedEvent(NYdb::EStatus::CANCELLED, {NYdb::NIssue::TIssue("PQ file topic was finished")}), size);
138139
}
139140

@@ -147,6 +148,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
147148
TString ProducerId_;
148149
std::thread FilePoller_;
149150
NYdb::NTopic::TReaderCounters::TPtr Counters_;
151+
bool CancelOnFileFinish_ = false;
150152

151153
TThreadPool Pool_;
152154
size_t MsgOffset_ = 0;
@@ -338,6 +340,10 @@ struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession {
338340
}
339341
};
340342

343+
TFileTopicClient::TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics)
344+
: Topics_(topics)
345+
{}
346+
341347
std::shared_ptr<NYdb::NTopic::IReadSession> TFileTopicClient::CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) {
342348
Y_ENSURE(!settings.Topics_.empty());
343349
const auto& topic = settings.Topics_.front();
@@ -360,7 +366,8 @@ std::shared_ptr<NYdb::NTopic::IReadSession> TFileTopicClient::CreateReadSession(
360366
ui64 sessionId = 0;
361367
return std::make_shared<TFileTopicReadSession>(
362368
TFile(*filePath, EOpenMode::TEnum::RdOnly),
363-
MakeIntrusive<TDummyPartitionSession>(sessionId, TString{topicPath}, partitionId)
369+
MakeIntrusive<TDummyPartitionSession>(sessionId, TString{topicPath}, partitionId),
370+
"", topicsIt->second.CancelOnFileFinish
364371
);
365372
}
366373

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace NYql {
66
struct TFileTopicClient : public ITopicClient {
7-
TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics): Topics_(topics) {}
7+
explicit TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics);
88

99
NYdb::TAsyncStatus CreateTopic(const TString& path, const NYdb::NTopic::TCreateTopicSettings& settings = {}) override;
1010

@@ -32,5 +32,6 @@ struct TFileTopicClient : public ITopicClient {
3232

3333
private:
3434
THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> Topics_;
35+
bool CancelOnFileFinish_ = false;
3536
};
3637
}

ydb/tests/tools/fqrun/fqrun.cpp

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <util/datetime/base.h>
55

6+
#include <ydb/core/blob_depot/mon_main.h>
67
#include <ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h>
78
#include <ydb/tests/tools/fqrun/src/fq_runner.h>
89
#include <ydb/tests/tools/kqprun/runlib/application.h>
@@ -195,6 +196,20 @@ class TMain : public TMainBase {
195196
}
196197
});
197198

199+
options.AddLongOption("cnacel-on-file-finish", "Cancel emulate YDS topics when topic file finished")
200+
.RequiredArgument("topic")
201+
.Handler1([this](const NLastGetopt::TOptsParser* option) {
202+
TStringBuf topicName;
203+
TStringBuf filePath;
204+
TStringBuf(option->CurVal()).Split('@', topicName, filePath);
205+
if (topicName.empty() || filePath.empty()) {
206+
ythrow yexception() << "Incorrect PQ file mapping, expected form topic@file";
207+
}
208+
if (!PqFilesMapping.emplace(topicName, filePath).second) {
209+
ythrow yexception() << "Got duplicated topic name: " << topicName;
210+
}
211+
});
212+
198213
// Outputs
199214

200215
options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)")
@@ -251,7 +266,7 @@ class TMain : public TMainBase {
251266
}
252267

253268
#ifdef PROFILE_MEMORY_ALLOCATIONS
254-
if (RunnerOptions.FqSettings.VerboseLevel >= 1) {
269+
if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) {
255270
Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl;
256271
}
257272
NAllocProfiler::StartAllocationSampling(true);
@@ -264,7 +279,7 @@ class TMain : public TMainBase {
264279
RunScript(ExecutionOptions, RunnerOptions);
265280

266281
#ifdef PROFILE_MEMORY_ALLOCATIONS
267-
if (RunnerOptions.FqSettings.VerboseLevel >= 1) {
282+
if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) {
268283
Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl;
269284
}
270285
FinishProfileMemoryAllocations();
@@ -284,8 +299,6 @@ class TMain : public TMainBase {
284299
}
285300

286301
private:
287-
inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout);
288-
289302
TExecutionOptions ExecutionOptions;
290303
TRunnerOptions RunnerOptions;
291304
std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping;

ydb/tests/tools/fqrun/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ PEERDIR(
1515
library/cpp/getopt
1616
library/cpp/lfalloc/alloc_profiler
1717
util
18+
ydb/core/blob_depot
1819
ydb/library/yql/providers/pq/gateway/dummy
1920
ydb/tests/tools/fqrun/src
2021
ydb/tests/tools/kqprun/runlib

ydb/tests/tools/kqprun/kqprun.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,6 @@ class TMain : public TMainBase {
403403
using EVerbose = TYdbSetupSettings::EVerbose;
404404

405405
inline static const TString YqlToken = GetEnv(YQL_TOKEN_VARIABLE);
406-
inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout);
407406

408407
TExecutionOptions ExecutionOptions;
409408
TRunnerOptions RunnerOptions;
@@ -808,7 +807,7 @@ class TMain : public TMainBase {
808807
}
809808

810809
#ifdef PROFILE_MEMORY_ALLOCATIONS
811-
if (RunnerOptions.YdbSettings.VerboseLevel >= 1) {
810+
if (RunnerOptions.YdbSettings.VerboseLevel >= EVerbose::Info) {
812811
Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl;
813812
}
814813
NAllocProfiler::StartAllocationSampling(true);
@@ -821,7 +820,7 @@ class TMain : public TMainBase {
821820
RunScript(ExecutionOptions, RunnerOptions);
822821

823822
#ifdef PROFILE_MEMORY_ALLOCATIONS
824-
if (RunnerOptions.YdbSettings.VerboseLevel >= 1) {
823+
if (RunnerOptions.YdbSettings.VerboseLevel >= EVerbose::Info) {
825824
Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl;
826825
}
827826
FinishProfileMemoryAllocations();

ydb/tests/tools/kqprun/runlib/application.cpp

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <util/stream/file.h>
88

99
#include <ydb/core/base/backtrace.h>
10+
#include <ydb/core/blob_depot/mon_main.h>
1011

1112
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
1213
#include <yql/essentials/public/udf/udf_static_registry.h>
@@ -53,40 +54,7 @@ void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettin
5354
}
5455
});
5556

56-
TChoices<NActors::NLog::EPriority> logPriority({
57-
{"emerg", NActors::NLog::EPriority::PRI_EMERG},
58-
{"alert", NActors::NLog::EPriority::PRI_ALERT},
59-
{"crit", NActors::NLog::EPriority::PRI_CRIT},
60-
{"error", NActors::NLog::EPriority::PRI_ERROR},
61-
{"warn", NActors::NLog::EPriority::PRI_WARN},
62-
{"notice", NActors::NLog::EPriority::PRI_NOTICE},
63-
{"info", NActors::NLog::EPriority::PRI_INFO},
64-
{"debug", NActors::NLog::EPriority::PRI_DEBUG},
65-
{"trace", NActors::NLog::EPriority::PRI_TRACE},
66-
});
67-
options.AddLongOption("log-default", "Default log priority")
68-
.RequiredArgument("priority")
69-
.StoreMappedResultT<TString>(&DefaultLogPriority, logPriority);
70-
71-
options.AddLongOption("log", "Component log priority in format <component>=<priority> (e. g. KQP_YQL=trace)")
72-
.RequiredArgument("component priority")
73-
.Handler1([this, logPriority](const NLastGetopt::TOptsParser* option) {
74-
TStringBuf component;
75-
TStringBuf priority;
76-
TStringBuf(option->CurVal()).Split('=', component, priority);
77-
if (component.empty() || priority.empty()) {
78-
ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace";
79-
}
80-
81-
if (!logPriority.Contains(TString(priority))) {
82-
ythrow yexception() << "Incorrect log priority: " << priority;
83-
}
84-
85-
const auto service = GetLogService(TString(component));
86-
if (!LogPriorities.emplace(service, logPriority(TString(priority))).second) {
87-
ythrow yexception() << "Got duplicated log service name: " << component;
88-
}
89-
});
57+
RegisterLogOptions(options);
9058

9159
options.AddLongOption("profile-output", "File with profile memory allocations output (use '-' to write in stdout)")
9260
.RequiredArgument("file")
@@ -129,6 +97,28 @@ void TMainBase::RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettin
12997
});
13098
}
13199

100+
void TMainBase::RegisterLogOptions(NLastGetopt::TOpts& options) {
101+
options.AddLongOption("log-default", "Default log priority")
102+
.RequiredArgument("priority")
103+
.StoreMappedResultT<TString>(&DefaultLogPriority, GetLogPrioritiesMap("log-default"));
104+
105+
options.AddLongOption("log", "Component log priority in format <component>=<priority> (e. g. KQP_YQL=trace)")
106+
.RequiredArgument("component priority")
107+
.Handler1([this, logPriority = GetLogPrioritiesMap("log")](const NLastGetopt::TOptsParser* option) {
108+
TStringBuf component;
109+
TStringBuf priority;
110+
TStringBuf(option->CurVal()).Split('=', component, priority);
111+
if (component.empty() || priority.empty()) {
112+
ythrow yexception() << "Incorrect log setting, expected form component=priority, e. g. KQP_YQL=trace";
113+
}
114+
115+
const auto service = GetLogService(TString(component));
116+
if (!LogPriorities.emplace(service, logPriority(TString(priority))).second) {
117+
ythrow yexception() << "Got duplicated log service name: " << component;
118+
}
119+
});
120+
}
121+
132122
void TMainBase::FillLogConfig(NKikimrConfig::TLogConfig& config) const {
133123
if (DefaultLogPriority) {
134124
config.SetDefaultLevel(*DefaultLogPriority);

ydb/tests/tools/kqprun/runlib/application.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "settings.h"
44

5+
#include <library/cpp/colorizer/colors.h>
56
#include <library/cpp/getopt/modchooser.h>
67

78
#include <util/stream/file.h>
@@ -23,13 +24,16 @@ class TMainBase : public TMainClassArgs {
2324
protected:
2425
void RegisterKikimrOptions(NLastGetopt::TOpts& options, TServerSettings& settings);
2526

27+
virtual void RegisterLogOptions(NLastGetopt::TOpts& options);
28+
2629
void FillLogConfig(NKikimrConfig::TLogConfig& config) const;
2730

2831
static IOutputStream* GetDefaultOutput(const TString& file);
2932

3033
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> CreateFunctionRegistry() const;
3134

3235
protected:
36+
inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout);
3337
inline static IOutputStream* ProfileAllocationsOutput = nullptr;
3438

3539
private:

ydb/tests/tools/kqprun/runlib/utils.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,27 @@ void InitLogSettings(const NKikimrConfig::TLogConfig& logConfig, NActors::TTestA
245245
}
246246
}
247247

248+
TChoices<NActors::NLog::EPriority> GetLogPrioritiesMap(const TString& optionName) {
249+
return TChoices<NActors::NLog::EPriority>({
250+
{"emerg", NActors::NLog::EPriority::PRI_EMERG},
251+
{"alert", NActors::NLog::EPriority::PRI_ALERT},
252+
{"crit", NActors::NLog::EPriority::PRI_CRIT},
253+
{"error", NActors::NLog::EPriority::PRI_ERROR},
254+
{"warn", NActors::NLog::EPriority::PRI_WARN},
255+
{"notice", NActors::NLog::EPriority::PRI_NOTICE},
256+
{"info", NActors::NLog::EPriority::PRI_INFO},
257+
{"debug", NActors::NLog::EPriority::PRI_DEBUG},
258+
{"trace", NActors::NLog::EPriority::PRI_TRACE},
259+
}, optionName, false);
260+
}
261+
248262
void SetupSignalActions() {
249263
std::set_terminate(&TerminateHandler);
250264
signal(SIGSEGV, &SegmentationFaultHandler);
251265
signal(SIGFPE, &FloatingPointExceptionHandler);
252266

253267
#ifdef PROFILE_MEMORY_ALLOCATIONS
254-
signal(SIGINT, &NKqpRun::InterruptHandler);
268+
signal(SIGINT, &InterruptHandler);
255269
#endif
256270
}
257271

ydb/tests/tools/kqprun/runlib/utils.h

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,28 @@ struct TRequestResult {
3434
template <typename TResult>
3535
class TChoices {
3636
public:
37-
explicit TChoices(std::map<TString, TResult> choicesMap, const TString& optionName = "")
37+
explicit TChoices(std::map<TString, TResult> choicesMap, const TString& optionName = "", bool checkRegister = true)
3838
: ChoicesMap(std::move(choicesMap))
3939
, OptionName(optionName)
40+
, CheckRegister(checkRegister)
4041
{}
4142

42-
TResult operator()(const TString& choice) const {
43-
const auto it = ChoicesMap.find(choice);
44-
// if (it == ChoicesMap.end()) {
43+
TResult operator()(TString choice) const {
44+
if (!CheckRegister) {
45+
std::for_each(choice.begin(), choice.vend(), [](char& c) { c = std::tolower(c); });
46+
}
4547

46-
// throw yexception() << "Value '" << choice << "' is not allowed " << (OptionName ? TStringBuilder() << "for option " << OptionName : TStringBuilder()) << ", available variants:\n" << Join(", ")
47-
// }
48+
const auto it = ChoicesMap.find(choice);
49+
if (it == ChoicesMap.end()) {
50+
auto error = yexception() << "Value '" << choice << "' is not allowed " << (OptionName ? TStringBuilder() << "for option " << OptionName : TStringBuilder()) << ", available variants:\n";
51+
for (auto it = ChoicesMap.begin(); it != ChoicesMap.end();) {
52+
error << choice;
53+
if (++it != ChoicesMap.end()) {
54+
error << ", ";
55+
}
56+
}
57+
ythrow error;
58+
}
4859
return it->second;
4960
}
5061

@@ -64,6 +75,7 @@ class TChoices {
6475
private:
6576
const std::map<TString, TResult> ChoicesMap;
6677
const TString OptionName;
78+
const bool CheckRegister;
6779
};
6880

6981
class TStatsPrinter {
@@ -95,6 +107,8 @@ void ModifyLogPriorities(std::unordered_map<NKikimrServices::EServiceKikimr, NAc
95107

96108
void InitLogSettings(const NKikimrConfig::TLogConfig& logConfig, NActors::TTestActorRuntimeBase& runtime);
97109

110+
TChoices<NActors::NLog::EPriority> GetLogPrioritiesMap(const TString& optionName);
111+
98112
void SetupSignalActions();
99113

100114
void PrintResultSet(EResultOutputFormat format, IOutputStream& output, const Ydb::ResultSet& resultSet);

0 commit comments

Comments
 (0)