Skip to content

Commit 3321dfa

Browse files
authored
YQ-4188 Add task controller config and add new sensors (#16437)
1 parent 8814d7a commit 3321dfa

File tree

8 files changed

+49
-7
lines changed

8 files changed

+49
-7
lines changed

ydb/core/fq/libs/actors/run_actor.cpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,6 +1571,18 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
15711571
ClearResultFormatSettings();
15721572
}
15731573

1574+
TDuration pingPeriod = TDuration::Seconds(3);
1575+
TDuration aggrPeriod = TDuration::Seconds(1);
1576+
{
1577+
const auto& taskControllerConfig = Params.Config.GetTaskController();
1578+
if (taskControllerConfig.GetPingPeriod()) {
1579+
Y_ABORT_UNLESS(TDuration::TryParse(taskControllerConfig.GetPingPeriod(), pingPeriod));
1580+
}
1581+
if (taskControllerConfig.GetAggrPeriod()) {
1582+
Y_ABORT_UNLESS(TDuration::TryParse(taskControllerConfig.GetAggrPeriod(), aggrPeriod));
1583+
}
1584+
}
1585+
15741586
if (enableCheckpointCoordinator) {
15751587
ControlId = Register(MakeCheckpointCoordinator(
15761588
::NFq::TCoordinatorId(Params.QueryId + "-" + ToString(DqGraphIndex), Params.PreviousQueryRevision),
@@ -1587,8 +1599,8 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
15871599
resultId,
15881600
dqConfiguration,
15891601
QueryCounters,
1590-
TDuration::Seconds(3),
1591-
TDuration::Seconds(1)
1602+
pingPeriod,
1603+
aggrPeriod
15921604
).Release());
15931605
} else {
15941606
ControlId = Register(NYql::MakeTaskController(
@@ -1597,7 +1609,8 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
15971609
resultId,
15981610
dqConfiguration,
15991611
QueryCounters,
1600-
TDuration::Seconds(3)
1612+
pingPeriod,
1613+
aggrPeriod
16011614
).Release());
16021615
}
16031616

ydb/core/fq/libs/config/protos/fq_config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import "ydb/core/fq/libs/config/protos/rate_limiter.proto";
2323
import "ydb/core/fq/libs/config/protos/read_actors_factory.proto";
2424
import "ydb/core/fq/libs/config/protos/resource_manager.proto";
2525
import "ydb/core/fq/libs/config/protos/row_dispatcher.proto";
26+
import "ydb/core/fq/libs/config/protos/task_controller.proto";
2627
import "ydb/core/fq/libs/config/protos/test_connection.proto";
2728
import "ydb/core/fq/libs/config/protos/token_accessor.proto";
2829
import "ydb/library/folder_service/proto/config.proto";
@@ -55,4 +56,5 @@ message TConfig {
5556
bool EnableTaskCounters = 23;
5657
TComputeConfig Compute = 24;
5758
TRowDispatcherConfig RowDispatcher = 25;
59+
TTaskControllerConfig TaskController = 26;
5860
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
syntax = "proto3";
2+
option cc_enable_arenas = true;
3+
4+
package NFq.NConfig;
5+
option java_package = "ru.yandex.kikimr.proto";
6+
7+
////////////////////////////////////////////////////////////
8+
9+
message TTaskControllerConfig {
10+
string PingPeriod = 1;
11+
string AggrPeriod = 2;
12+
}

ydb/core/fq/libs/config/protos/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ SRCS(
2424
resource_manager.proto
2525
row_dispatcher.proto
2626
storage.proto
27+
task_controller.proto
2728
test_connection.proto
2829
token_accessor.proto
2930
)

ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ TYdbControlPlaneStorageActor::TPingTaskParams TYdbControlPlaneStorageActor::Cons
6565
" WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
6666
"SELECT `" JOB_ID_COLUMN_NAME "`, `" JOB_COLUMN_NAME "` FROM `" JOBS_TABLE_NAME "`\n"
6767
" WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" JOB_ID_COLUMN_NAME "` = $last_job_id;\n"
68-
"SELECT `" OWNER_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`\n"
68+
"SELECT `" OWNER_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`, `" ASSIGNED_UNTIL_COLUMN_NAME "`\n"
6969
"FROM `" PENDING_SMALL_TABLE_NAME "` WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
7070
);
7171

@@ -119,6 +119,8 @@ TYdbControlPlaneStorageActor::TPingTaskParams TYdbControlPlaneStorageActor::Cons
119119
if (owner != request.owner_id()) {
120120
ythrow NYql::TCodeLineException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request.query_id().value() << "\" MISMATCHED: \"" << request.owner_id() << "\" (received) != \"" << owner << "\" (selected)";
121121
}
122+
auto assignedUntil = parser.ColumnParser(ASSIGNED_UNTIL_COLUMN_NAME).GetOptionalTimestamp().value_or(TInstant::Now());
123+
Counters.LeaseLeftMs->Collect((assignedUntil - TInstant::Now()).MilliSeconds());
122124
retryLimiter.Assign(
123125
parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().value_or(0),
124126
parser.ColumnParser(RETRY_COUNTER_UPDATE_COLUMN_NAME).GetOptionalTimestamp().value_or(TInstant::Zero()),
@@ -252,7 +254,7 @@ TYdbControlPlaneStorageActor::TPingTaskParams TYdbControlPlaneStorageActor::Cons
252254
readQueryBuilder.AddText(
253255
"SELECT `" INTERNAL_COLUMN_NAME "`\n"
254256
"FROM `" QUERIES_TABLE_NAME "` WHERE `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" SCOPE_COLUMN_NAME "` = $scope;\n"
255-
"SELECT `" OWNER_COLUMN_NAME "`\n"
257+
"SELECT `" OWNER_COLUMN_NAME "`, `" ASSIGNED_UNTIL_COLUMN_NAME "`\n"
256258
"FROM `" PENDING_SMALL_TABLE_NAME "` WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
257259
);
258260

@@ -285,6 +287,8 @@ TYdbControlPlaneStorageActor::TPingTaskParams TYdbControlPlaneStorageActor::Cons
285287
if (owner != request.owner_id()) {
286288
ythrow NYql::TCodeLineException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request.query_id().value() << "\" MISMATCHED: \"" << request.owner_id() << "\" (received) != \"" << owner << "\" (selected)";
287289
}
290+
auto assignedUntil = parser.ColumnParser(ASSIGNED_UNTIL_COLUMN_NAME).GetOptionalTimestamp().value_or(TInstant::Now());
291+
Counters.LeaseLeftMs->Collect((assignedUntil - TInstant::Now()).MilliSeconds());
288292
}
289293

290294
TInstant ttl = TInstant::Now() + Config->TaskLeaseTtl;
@@ -657,7 +661,9 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
657661
std::shared_ptr<Fq::Private::PingTaskResult> response = std::make_shared<Fq::Private::PingTaskResult>();
658662
std::shared_ptr<TFinalStatus> finalStatus = std::make_shared<TFinalStatus>();
659663

660-
auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ?
664+
bool isHard = DoesPingTaskUpdateQueriesTable(request);
665+
Counters.Counters->GetCounter(isHard ? "HardPing" : "SoftPing", true)->Inc();
666+
auto pingTaskParams = isHard ?
661667
ConstructHardPingTask(request, response, finalStatus, requestCounters.Common) :
662668
ConstructSoftPingTask(request, response, requestCounters.Common);
663669
auto debugInfo = Config->Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{};

ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,11 +522,13 @@ class TControlPlaneStorageBase : public TControlPlaneStorageUtils {
522522

523523
public:
524524
::NMonitoring::TDynamicCounterPtr Counters;
525+
::NMonitoring::THistogramPtr LeaseLeftMs;
525526

526527
explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& counters, const ::NFq::TControlPlaneStorageConfig& config)
527528
: ScopeCounters{TTtlCacheSettings{}.SetTtl(config.MetricsTtl)}
528529
, FinalStatusCounters{TTtlCacheSettings{}.SetTtl(config.MetricsTtl)}
529530
, Counters(counters)
531+
, LeaseLeftMs(Counters->GetHistogram("LeaseLeftMs", ::NMonitoring::ExplicitHistogram({100, 1000, 5000, 10000, 20000})))
530532
{
531533
for (auto& request: CommonRequests) {
532534
request->Register(Counters);

ydb/library/db_pool/db_pool.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ class TDbPoolActor : public NActors::TActor<TDbPoolActor> {
2222
const NMonitoring::THistogramPtr RequestsTime;
2323
const ::NMonitoring::TDynamicCounterPtr StatusSubgroup;
2424
TMap<TString, ::NMonitoring::TDynamicCounters::TCounterPtr> Status;
25+
const ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRate;
2526

2627
TCounters(const ::NMonitoring::TDynamicCounterPtr& counters)
2728
: Counters(counters)
2829
, QueueSize(counters->GetSubgroup("subcomponent", "DbPool")->GetHistogram("InFlight", NMonitoring::ExponentialHistogram(10, 2, 10)))
2930
, TotalInFlight(counters->GetSubgroup("subcomponent", "DbPool")->GetCounter("TotalInflight"))
3031
, RequestsTime(counters->GetSubgroup("subcomponent", "DbPool")->GetHistogram("RequestTimeMs", NMonitoring::ExponentialHistogram(6, 3, 100)))
3132
, StatusSubgroup(counters->GetSubgroup("subcomponent", "DbPool")->GetSubgroup("component", "status"))
33+
, IncomingRate(counters->GetSubgroup("subcomponent", "DbPool")->GetCounter("IncomingRate", true))
3234
{}
3335

3436
::NMonitoring::TDynamicCounters::TCounterPtr GetStatus(const NYdb::TStatus& status) {
@@ -129,6 +131,7 @@ class TDbPoolActor : public NActors::TActor<TDbPoolActor> {
129131

130132
void HandleRequest(TEvents::TEvDbRequest::TPtr& ev) {
131133
LOG_D("TDbPoolActor: TEvDbRequest " << SelfId() << " Queue size = " << Requests.size());
134+
Counters.IncomingRate->Inc();
132135
auto request = ev->Get();
133136
Requests.emplace_back(TRequest{ev->Sender, ev->Cookie, request->Sql, std::move(request->Params), request->Idempotent});
134137
ProcessQueue();
@@ -152,6 +155,7 @@ class TDbPoolActor : public NActors::TActor<TDbPoolActor> {
152155

153156
void HandleRequest(TEvents::TEvDbFunctionRequest::TPtr& ev) {
154157
LOG_T("TDbPoolActor: TEvDbFunctionRequest " << SelfId() << " Queue size = " << Requests.size());
158+
Counters.IncomingRate->Inc();
155159
auto request = ev->Get();
156160
Requests.emplace_back(TFunctionRequest{ev->Sender, ev->Cookie, std::move(request->Handler)});
157161
ProcessQueue();
@@ -203,7 +207,6 @@ class TDbPoolActor : public NActors::TActor<TDbPoolActor> {
203207
bool RequestInProgress = false;
204208
TInstant RequestInProgressTimestamp = TInstant::Now();
205209
std::shared_ptr<int> State = std::make_shared<int>();
206-
207210
};
208211

209212
TDbPool::TDbPool(

ydb/tests/tools/fq_runner/kikimr_runner.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,9 @@ def fill_config(self, control_plane):
518518
if len(self.config_generator.dc_mapping) > 0:
519519
fq_config['nodes_manager']['use_data_center'] = True
520520
fq_config['enable_task_counters'] = True
521+
fq_config['task_controller'] = {}
522+
fq_config['task_controller']['ping_period'] = "5s" # task_lease_ttl / 4
523+
fq_config['task_controller']['aggr_period'] = "1s"
521524
else:
522525
fq_config['nodes_manager']['enabled'] = False
523526
fq_config['pending_fetcher']['enabled'] = False

0 commit comments

Comments
 (0)