Skip to content

Commit e98b6b8

Browse files
authored
YQ-3970 RD added parser mkql counters (ydb-platform#12615)
1 parent fc43113 commit e98b6b8

24 files changed

+89
-39
lines changed

ydb/core/fq/libs/init/init.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ void Init(
205205
tenant,
206206
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
207207
CreatePqNativeGateway(pqServices),
208-
appData->Mon);
208+
appData->Mon,
209+
appData->Counters);
209210
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
210211
}
211212

ydb/core/fq/libs/row_dispatcher/actors_factory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ struct TActorFactory : public IActorFactory {
2020
NYdb::TDriver driver,
2121
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
2222
const ::NMonitoring::TDynamicCounterPtr& counters,
23+
const ::NMonitoring::TDynamicCounterPtr& countersRoot,
2324
const NYql::IPqGateway::TPtr& pqGateway,
2425
ui64 maxBufferSize) const override {
2526

@@ -35,6 +36,7 @@ struct TActorFactory : public IActorFactory {
3536
std::move(driver),
3637
credentialsProviderFactory,
3738
counters,
39+
countersRoot,
3840
pqGateway,
3941
maxBufferSize
4042
);

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ struct IActorFactory : public TThrRefBase {
2323
NYdb::TDriver driver,
2424
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
2525
const ::NMonitoring::TDynamicCounterPtr& counters,
26+
const ::NMonitoring::TDynamicCounterPtr& countersRoot,
2627
const NYql::IPqGateway::TPtr& pqGateway,
2728
ui64 maxBufferSize) const = 0;
2829
};

ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,12 @@ TString TSchemaColumn::ToString() const {
1010
return TStringBuilder() << "'" << Name << "' : " << TypeYson;
1111
}
1212

13+
//// TCountersDesc
14+
15+
TCountersDesc TCountersDesc::CopyWithNewMkqlCountersName(const TString& mkqlCountersName) const {
16+
TCountersDesc result(*this);
17+
result.MkqlCountersName = mkqlCountersName;
18+
return result;
19+
}
20+
1321
} // namespace NFq::NRowDispatcher

ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include <library/cpp/monlib/dynamic_counters/counters.h>
4+
35
#include <ydb/library/conclusion/generic/result.h>
46
#include <ydb/library/conclusion/status.h>
57
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
@@ -22,4 +24,12 @@ struct TSchemaColumn {
2224
TString ToString() const;
2325
};
2426

27+
struct TCountersDesc {
28+
NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>();
29+
NMonitoring::TDynamicCounterPtr CountersSubgroup = MakeIntrusive<NMonitoring::TDynamicCounters>();
30+
TString MkqlCountersName; // Used for TAlignedPagePoolCounters created from CountersRoot
31+
32+
[[nodiscard]] TCountersDesc CopyWithNewMkqlCountersName(const TString& mkqlCountersName) const;
33+
};
34+
2535
} // namespace NFq::NRowDispatcher

ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ SRCS(
55
)
66

77
PEERDIR(
8+
library/cpp/monlib/dynamic_counters
9+
810
ydb/library/conclusion
911
ydb/library/yql/dq/actors/protos
1012

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,24 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
1818
using TBase = NActors::TActor<TTopicFormatHandler>;
1919

2020
struct TCounters {
21-
const NMonitoring::TDynamicCounterPtr CountersRoot;
22-
const NMonitoring::TDynamicCounterPtr CountersSubgroup;
21+
TCountersDesc Desc;
2322

2423
NMonitoring::TDynamicCounters::TCounterPtr ActiveFormatHandlers;
2524
NMonitoring::TDynamicCounters::TCounterPtr ActiveClients;
2625

27-
TCounters(NMonitoring::TDynamicCounterPtr counters, const TSettings& settings)
28-
: CountersRoot(counters)
29-
, CountersSubgroup(counters->GetSubgroup("format", settings.ParsingFormat))
26+
TCounters(const TCountersDesc& counters, const TSettings& settings)
27+
: Desc(counters)
3028
{
29+
Desc.CountersSubgroup = Desc.CountersSubgroup->GetSubgroup("format", settings.ParsingFormat);
30+
3131
Register();
3232
}
3333

3434
private:
3535
void Register() {
36-
ActiveFormatHandlers = CountersRoot->GetCounter("ActiveFormatHandlers", false);
36+
ActiveFormatHandlers = Desc.CountersRoot->GetCounter("ActiveFormatHandlers", false);
3737

38-
ActiveClients = CountersSubgroup->GetCounter("ActiveClients", false);
38+
ActiveClients = Desc.CountersSubgroup->GetCounter("ActiveClients", false);
3939
}
4040
};
4141

@@ -282,9 +282,9 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
282282
};
283283

284284
public:
285-
TTopicFormatHandler(const TFormatHandlerConfig& config, const TSettings& settings, NMonitoring::TDynamicCounterPtr counters)
285+
TTopicFormatHandler(const TFormatHandlerConfig& config, const TSettings& settings, const TCountersDesc& counters)
286286
: TBase(&TTopicFormatHandler::StateFunc)
287-
, TTypeParser(__LOCATION__)
287+
, TTypeParser(__LOCATION__, counters.CopyWithNewMkqlCountersName("row_dispatcher"))
288288
, Config(config)
289289
, Settings(settings)
290290
, LogPrefix(TStringBuilder() << "TTopicFormatHandler [" << Settings.ParsingFormat << "]: ")
@@ -487,18 +487,19 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
487487
}
488488

489489
TValueStatus<ITopicParser::TPtr> CreateParserForFormat() const {
490+
const auto& counters = Counters.Desc.CopyWithNewMkqlCountersName("row_dispatcher_parser");
490491
if (Settings.ParsingFormat == "raw") {
491-
return CreateRawParser(ParserHandler);
492+
return CreateRawParser(ParserHandler, counters);
492493
}
493494
if (Settings.ParsingFormat == "json_each_row") {
494-
return CreateJsonParser(ParserHandler, Config.JsonParserConfig);
495+
return CreateJsonParser(ParserHandler, Config.JsonParserConfig, counters);
495496
}
496497
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Unsupported parsing format: " << Settings.ParsingFormat);
497498
}
498499

499500
void CreateFilters() {
500501
if (!Filters) {
501-
Filters = CreateTopicFilters(SelfId(), Config.FiltersConfig, Counters.CountersSubgroup);
502+
Filters = CreateTopicFilters(SelfId(), Config.FiltersConfig, Counters.Desc.CountersSubgroup);
502503
}
503504
}
504505

@@ -567,7 +568,7 @@ void ITopicFormatHandler::TDestroy::Destroy(ITopicFormatHandler* handler) {
567568
}
568569
}
569570

570-
ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters) {
571+
ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters) {
571572
const auto handler = new TTopicFormatHandler(config, settings, counters);
572573
owner.RegisterWithSameMailbox(handler);
573574
return ITopicFormatHandler::TPtr(handler);
@@ -585,7 +586,7 @@ TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConf
585586
namespace NTests {
586587

587588
ITopicFormatHandler::TPtr CreateTestFormatHandler(const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings) {
588-
const auto handler = new TTopicFormatHandler(config, settings, MakeIntrusive<NMonitoring::TDynamicCounters>());
589+
const auto handler = new TTopicFormatHandler(config, settings, {});
589590
NActors::TActivationContext::ActorSystem()->Register(handler);
590591
return ITopicFormatHandler::TPtr(handler);
591592
}

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ struct TFormatHandlerConfig {
6666
TTopicFiltersConfig FiltersConfig;
6767
};
6868

69-
ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters);
69+
ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters);
7070
TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConfig& rowDispatcherConfig, NActors::TActorId compileServiceId);
7171

7272
namespace NTests {

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,8 @@ class TJsonParser : public TTopicParserBase {
316316
using TPtr = TIntrusivePtr<TJsonParser>;
317317

318318
public:
319-
TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config)
320-
: TBase(std::move(consumer), __LOCATION__)
319+
TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters)
320+
: TBase(std::move(consumer), __LOCATION__, counters)
321321
, Config(config)
322322
, NumberColumns(Consumer->GetColumns().size())
323323
, MaxNumberRows((config.BufferCellCount - 1) / NumberColumns + 1)
@@ -483,8 +483,8 @@ class TJsonParser : public TTopicParserBase {
483483

484484
} // anonymous namespace
485485

486-
TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config) {
487-
TJsonParser::TPtr parser = MakeIntrusive<TJsonParser>(consumer, config);
486+
TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters) {
487+
TJsonParser::TPtr parser = MakeIntrusive<TJsonParser>(consumer, config, counters);
488488
if (auto status = parser->InitColumnsParsers(); status.IsFail()) {
489489
return status;
490490
}

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ struct TJsonParserConfig {
1414
ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit
1515
};
1616

17-
TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config);
17+
TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters);
1818
TJsonParserConfig CreateJsonParserConfig(const NConfig::TJsonParserConfig& parserConfig);
1919

2020
} // namespace NFq::NRowDispatcher

0 commit comments

Comments
 (0)