Skip to content

Commit 2e3f1cb

Browse files
authored
Account usage from ColumnShards (#20245)
1 parent 9da7675 commit 2e3f1cb

File tree

14 files changed

+65
-9
lines changed

14 files changed

+65
-9
lines changed

ydb/core/base/appdata_fwd.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ namespace NKikimr {
2626
namespace NJaegerTracing {
2727
class TSamplingThrottlingConfigurator;
2828
}
29+
namespace NKqp::NScheduler {
30+
class TComputeScheduler;
31+
}
2932
}
3033

3134
namespace NKikimrCms {
@@ -309,6 +312,8 @@ struct TAppData {
309312
// Tracing configurator (look for tracing config in ydb/core/jaeger_tracing/actors_tracing_control)
310313
TIntrusivePtr<NKikimr::NJaegerTracing::TSamplingThrottlingConfigurator> TracingConfigurator;
311314

315+
std::shared_ptr<NKqp::NScheduler::TComputeScheduler> ComputeScheduler;
316+
312317
TAppData(
313318
ui32 sysPoolId, ui32 userPoolId, ui32 ioPoolId, ui32 batchPoolId,
314319
TMap<TString, ui32> servicePools,

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
300300
MakeKqpCompileComputationPatternServiceID(SelfId().NodeId()), CompileComputationPatternService);
301301
}
302302

303-
auto scheduler = std::make_shared<NScheduler::TComputeScheduler>(Counters);
303+
auto scheduler = AppData()->ComputeScheduler = std::make_shared<NScheduler::TComputeScheduler>(Counters);
304304

305305
ResourceManager_ = GetKqpResourceManager();
306306
CaFactory_ = NComputeActor::MakeKqpCaFactory(

ydb/core/kqp/runtime/scheduler/new/kqp_schedulable_actor.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,19 @@ void TSchedulableTask::DecreaseUsage(const TDuration& burstUsage) {
5959
}
6060
}
6161

62+
void TSchedulableTask::IncreaseExtraUsage() {
63+
for (TTreeElementBase* parent = Query.get(); parent; parent = parent->Parent) {
64+
++parent->UsageExtra;
65+
}
66+
}
67+
68+
void TSchedulableTask::DecreaseExtraUsage(const TDuration& burstUsageExtra) {
69+
for (TTreeElementBase* parent = Query.get(); parent; parent = parent->Parent) {
70+
--parent->UsageExtra;
71+
parent->BurstUsageExtra += burstUsageExtra.MicroSeconds();
72+
}
73+
}
74+
6275
void TSchedulableTask::IncreaseBurstThrottle(const TDuration& burstThrottle) {
6376
for (TTreeElementBase* parent = Query.get(); parent; parent = parent->Parent) {
6477
parent->BurstThrottle += burstThrottle.MicroSeconds();

ydb/core/kqp/runtime/scheduler/new/kqp_schedulable_actor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ struct TSchedulableTask {
1818
void IncreaseUsage();
1919
void DecreaseUsage(const TDuration& burstUsage);
2020

21+
// Account extra usage which doesn't affect scheduling
22+
void IncreaseExtraUsage();
23+
void DecreaseExtraUsage(const TDuration& burstUsage);
24+
2125
void IncreaseBurstThrottle(const TDuration& burstThrottle);
2226
void IncreaseThrottle();
2327
void DecreaseThrottle();

ydb/core/kqp/runtime/scheduler/new/tree/common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ namespace NKikimr::NKqp::NScheduler::NHdrf {
5151
NMonitoring::TDynamicCounters::TCounterPtr FairShare;
5252
NMonitoring::TDynamicCounters::TCounterPtr InFlight;
5353
NMonitoring::TDynamicCounters::TCounterPtr Waiting;
54+
NMonitoring::TDynamicCounters::TCounterPtr InFlightExtra;
55+
NMonitoring::TDynamicCounters::TCounterPtr UsageExtra;
5456
NMonitoring::THistogramPtr Delay;
5557
};
5658

ydb/core/kqp/runtime/scheduler/new/tree/dynamic.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ TPool::TPool(const TString& id, const TIntrusivePtr<TKqpCounters>& counters, con
7676
Counters.Throttle = group->GetCounter("Throttle", true);
7777
Counters.FairShare = group->GetCounter("FairShare", true); // snapshot
7878

79+
Counters.InFlightExtra = group->GetCounter("InFlightExtra", false);
80+
Counters.UsageExtra = group->GetCounter("UsageExtra", true);
81+
7982
Counters.Delay = group->GetHistogram("Delay",
8083
NMonitoring::ExplicitHistogram({10, 10e2, 10e3, 10e4, 10e5, 10e6, 10e7}), true); // TODO: make from MinDelay to MaxDelay.
8184
}
@@ -90,6 +93,9 @@ NSnapshot::TPool* TPool::TakeSnapshot() const {
9093
Counters.Usage->Set(BurstUsage);
9194
Counters.Throttle->Set(BurstThrottle);
9295

96+
Counters.InFlightExtra->Set(UsageExtra * 1'000'000);
97+
Counters.UsageExtra->Set(BurstUsageExtra);
98+
9399
for (const auto& child : Children) {
94100
newPool->AddQuery(std::shared_ptr<NSnapshot::TQuery>(std::dynamic_pointer_cast<TQuery>(child)->TakeSnapshot()));
95101
}

ydb/core/kqp/runtime/scheduler/new/tree/dynamic.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ namespace NKikimr::NKqp::NScheduler::NHdrf::NDynamic {
3434

3535
struct TTreeElementBase : public TStaticAttributes {
3636
std::atomic<ui64> Usage = 0;
37+
std::atomic<ui64> UsageExtra = 0;
3738
std::atomic<ui64> Demand = 0;
3839
std::atomic<ui64> Throttle = 0;
3940

4041
std::atomic<ui64> BurstUsage = 0;
42+
std::atomic<ui64> BurstUsageExtra = 0;
4143
std::atomic<ui64> BurstThrottle = 0;
4244

4345
TTreeElementBase* Parent = nullptr;

ydb/core/tx/columnshard/engines/reader/abstract/read_context.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ TReadContext::TReadContext(const std::shared_ptr<IStoragesManager>& storagesMana
1313
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
1414
const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId,
1515
const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy,
16-
const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits)
16+
const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits, NKqp::NScheduler::TSchedulableTaskPtr schedulableTask)
1717
: StoragesManager(storagesManager)
1818
, DataAccessorsManager(dataAccessorsManager)
19+
, SchedulableTask(std::move(schedulableTask))
1920
, Counters(counters)
2021
, ReadMetadata(readMetadata)
2122
, ResourcesTaskContext("CS::SCAN_READ", counters.ResourcesSubscriberCounters)

ydb/core/tx/columnshard/engines/reader/abstract/read_context.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22
#include "read_metadata.h"
33

4+
#include <ydb/core/kqp/runtime/scheduler/new/kqp_schedulable_actor.h>
45
#include <ydb/core/protos/tx_datashard.pb.h>
56
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
67
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
@@ -50,6 +51,7 @@ class TReadContext {
5051
private:
5152
YDB_READONLY_DEF(std::shared_ptr<IStoragesManager>, StoragesManager);
5253
YDB_READONLY_DEF(std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>, DataAccessorsManager);
54+
YDB_READONLY_DEF(NKqp::NScheduler::TSchedulableTaskPtr, SchedulableTask);
5355
const NColumnShard::TConcreteScanCounters Counters;
5456
TReadMetadataBase::TConstPtr ReadMetadata;
5557
NResourceBroker::NSubscribe::TTaskContext ResourcesTaskContext;
@@ -151,7 +153,7 @@ class TReadContext {
151153
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
152154
const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId,
153155
const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy,
154-
const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits);
156+
const ui64 scanId, const NConveyorComposite::TCPULimitsConfig& cpuLimits, NKqp::NScheduler::TSchedulableTaskPtr schedulableTask);
155157
};
156158

157159
class IDataReader {

ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
2121
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
2222
const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, ui64 tabletId,
2323
TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat,
24-
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits)
24+
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits,
25+
NKqp::NScheduler::TSchedulableTaskPtr schedulableTask)
2526
: StoragesManager(storagesManager)
2627
, DataAccessorsManager(dataAccessorsManager)
2728
, ColumnShardActorId(columnShardActorId)
@@ -34,6 +35,7 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
3435
, DataFormat(dataFormat)
3536
, TabletId(tabletId)
3637
, CPULimits(cpuLimits)
38+
, SchedulableTask(std::move(schedulableTask))
3739
, ReadMetadataRange(readMetadataRange)
3840
, Timeout(timeout ? timeout : COMPUTE_HARD_TIMEOUT)
3941
, ScanCountersPool(scanCountersPool, TValidator::CheckNotNull(ReadMetadataRange)->GetProgram().GetGraphOptional())
@@ -54,7 +56,8 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
5456
ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
5557

5658
std::shared_ptr<TReadContext> context = std::make_shared<TReadContext>(StoragesManager, DataAccessorsManager, ScanCountersPool,
57-
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId, CPULimits);
59+
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId, CPULimits,
60+
std::move(SchedulableTask));
5861
ScanIterator = ReadMetadataRange->StartScan(context);
5962
auto startResult = ScanIterator->Start();
6063
StartInstant = TMonotonic::Now();

ydb/core/tx/columnshard/engines/reader/actor/actor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>,
3535
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
3636
const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, ui64 tabletId,
3737
TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange, NKikimrDataEvents::EDataFormat dataFormat,
38-
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits);
38+
const NColumnShard::TScanCounters& scanCountersPool, const NConveyorComposite::TCPULimitsConfig& cpuLimits,
39+
NKqp::NScheduler::TSchedulableTaskPtr schedulableTask);
3940

4041
void Bootstrap(const TActorContext& ctx);
4142

@@ -131,6 +132,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>,
131132
const NKikimrDataEvents::EDataFormat DataFormat;
132133
const ui64 TabletId;
133134
const NConveyorComposite::TCPULimitsConfig CPULimits;
135+
NKqp::NScheduler::TSchedulableTaskPtr SchedulableTask;
134136

135137
TReadMetadataBase::TConstPtr ReadMetadataRange;
136138
std::unique_ptr<TScanIteratorBase> ScanIterator;

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "source.h"
66
#include "sub_columns_fetching.h"
77

8+
#include <ydb/core/kqp/runtime/scheduler/new/kqp_schedulable_actor.h>
89
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
910

1011
#include <util/string/builder.h>
@@ -65,9 +66,20 @@ TConclusion<bool> TFetchingScriptCursor::Execute(const std::shared_ptr<IDataSour
6566
IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
6667
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("scan_step", step->DebugString())("scan_step_idx", CurrentStepIdx)("source_id", source->GetSourceId());
6768

69+
auto& schedulableTask = source->GetContext()->GetCommonContext()->GetSchedulableTask();
70+
if (schedulableTask) {
71+
schedulableTask->IncreaseExtraUsage();
72+
}
73+
6874
const TMonotonic startInstant = TMonotonic::Now();
6975
const TConclusion<bool> resultStep = step->ExecuteInplace(source, *this);
70-
FlushDuration(TMonotonic::Now() - startInstant);
76+
const auto executionTime = TMonotonic::Now() - startInstant;
77+
78+
if (schedulableTask) {
79+
schedulableTask->DecreaseExtraUsage(executionTime);
80+
}
81+
82+
FlushDuration(executionTime);
7183
if (!resultStep) {
7284
return resultStep;
7385
}

ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {
8787
auto scanActorId = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(),
8888
Self->DataAccessorsManager.GetObjectPtrVerified(),
8989
TComputeShardingPolicy(), ScanId, LockId.value_or(0), ScanGen, requestCookie, Self->TabletID(), TDuration::Max(), readMetadataRange,
90-
NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters(), {}));
90+
NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters(), {}, {}));
9191

9292
Self->InFlightReadsTracker.AddScanActorId(requestCookie, scanActorId);
9393
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxInternalScan started")("actor_id", scanActorId)("trace_detailed", detailedInfo);

ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "tx_scan.h"
22

33
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
4+
#include <ydb/core/kqp/runtime/scheduler/new/kqp_compute_scheduler_service.h>
45
#include <ydb/core/sys_view/common/schema.h>
56
#include <ydb/core/tx/columnshard/engines/reader/actor/actor.h>
67
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.h>
@@ -161,9 +162,12 @@ void TTxScan::Complete(const TActorContext& ctx) {
161162
TComputeShardingPolicy shardingPolicy;
162163
AFL_VERIFY(shardingPolicy.DeserializeFromProto(request.GetComputeShardingPolicy()));
163164

165+
const auto& scheduler = AppData(ctx)->ComputeScheduler;
166+
auto schedulableTask = scheduler ? scheduler->CreateSchedulableTaskFactory()(txId) : nullptr;
167+
164168
auto scanActorId = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(),
165169
Self->DataAccessorsManager.GetObjectPtrVerified(), shardingPolicy, scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout,
166-
readMetadataRange, dataFormat, Self->Counters.GetScanCounters(), cpuLimits));
170+
readMetadataRange, dataFormat, Self->Counters.GetScanCounters(), cpuLimits, std::move(schedulableTask)));
167171
Self->InFlightReadsTracker.AddScanActorId(requestCookie, scanActorId);
168172

169173
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan started")("actor_id", scanActorId)("trace_detailed", detailedInfo);

0 commit comments

Comments
 (0)