Skip to content

Commit 020ae0d

Browse files
committed
YQ-3493 support resource pool classifiers kqp_proxy cache (#7688)
1 parent 0b5c1bd commit 020ae0d

25 files changed

+687
-69
lines changed

ydb/core/kqp/common/events/workload_service.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@
1212

1313
namespace NKikimr::NKqp::NWorkload {
1414

15+
struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
16+
TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
17+
: Database(database)
18+
, PoolId(poolId)
19+
{}
20+
21+
const TString Database;
22+
const TString PoolId;
23+
};
24+
1525
struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
1626
TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
1727
: Database(database)
@@ -80,4 +90,14 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
8090
const std::optional<NACLib::TSecurityObject> SecurityObject;
8191
};
8292

93+
struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
94+
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
95+
: Database(database)
96+
, Serverless(serverless)
97+
{}
98+
99+
const TString Database;
100+
const bool Serverless;
101+
};
102+
83103
} // NKikimr::NKqp::NWorkload

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ struct TKqpWorkloadServiceEvents {
175175
EvCleanupRequest,
176176
EvCleanupResponse,
177177
EvUpdatePoolInfo,
178+
EvUpdateDatabaseInfo,
179+
EvSubscribeOnPoolChanges,
178180
};
179181
};
180182

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
275275

276276
KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
277277
FederatedQuerySetup, UserToken, GUCSettings, QueryServiceConfig, ApplicationName, AppData(ctx)->FunctionRegistry,
278-
false, false, std::move(TempTablesState), nullptr, SplitCtx);
278+
false, false, std::move(TempTablesState), nullptr, SplitCtx, UserRequestContext);
279279

280280
IKqpHost::TPrepareSettings prepareSettings;
281281
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;

ydb/core/kqp/gateway/behaviour/resource_pool_classifier/object.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ NJson::TJsonValue TResourcePoolClassifierConfig::GetDebugJson() const {
123123
}
124124

125125
bool TResourcePoolClassifierConfig::operator==(const TResourcePoolClassifierConfig& other) const {
126-
return std::tie(Database, Name, Rank, ConfigJson) != std::tie(other.Database, other.Name, other.Rank, other.ConfigJson);
126+
return std::tie(Database, Name, Rank, ConfigJson) == std::tie(other.Database, other.Name, other.Rank, other.ConfigJson);
127127
}
128128

129129
NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierConfig::GetBehaviour() {

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,7 +1033,8 @@ class TKqpHost : public IKqpHost {
10331033
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
10341034
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
10351035
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr,
1036-
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig())
1036+
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig(),
1037+
const TIntrusivePtr<TUserRequestContext>& userRequestContext = nullptr)
10371038
: Gateway(gateway)
10381039
, Cluster(cluster)
10391040
, GUCSettings(gUCSettings)
@@ -1044,7 +1045,7 @@ class TKqpHost : public IKqpHost {
10441045
, KeepConfigChanges(keepConfigChanges)
10451046
, IsInternalCall(isInternalCall)
10461047
, FederatedQuerySetup(federatedQuerySetup)
1047-
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
1048+
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken, nullptr, userRequestContext))
10481049
, Config(config)
10491050
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
10501051
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
@@ -1957,10 +1958,10 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const
19571958
const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
19581959
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TGUCSettings::TPtr& gUCSettings,
19591960
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
1960-
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx)
1961+
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
19611962
{
19621963
return MakeIntrusive<TKqpHost>(gateway, cluster, database, gUCSettings, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
1963-
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig);
1964+
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig, userRequestContext);
19641965
}
19651966

19661967
} // namespace NKqp

ydb/core/kqp/host/kqp_host.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
123123
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
124124
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
125125
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/,
126-
NYql::TExprContext* ctx = nullptr);
126+
NYql::TExprContext* ctx = nullptr, const TIntrusivePtr<TUserRequestContext>& userRequestContext = nullptr);
127127

128128
} // namespace NKqp
129129
} // namespace NKikimr

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class TKqpRunner : public IKqpRunner {
146146
, Config(sessionCtx->ConfigPtr())
147147
, TransformCtx(transformCtx)
148148
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
149-
sessionCtx->TablesPtr()))
149+
sessionCtx->TablesPtr(), sessionCtx->GetUserRequestContext()))
150150
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
151151
, Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel)))
152152
{

ydb/core/kqp/opt/kqp_opt.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ namespace NKikimr::NKqp::NOpt {
1010

1111
struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
1212
TKqpOptimizeContext(const TString& cluster, const NYql::TKikimrConfiguration::TPtr& config,
13-
const TIntrusivePtr<NYql::TKikimrQueryContext> queryCtx, const TIntrusivePtr<NYql::TKikimrTablesData>& tables)
13+
const TIntrusivePtr<NYql::TKikimrQueryContext> queryCtx, const TIntrusivePtr<NYql::TKikimrTablesData>& tables,
14+
const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext>& userRequestContext)
1415
: Cluster(cluster)
1516
, Config(config)
1617
, QueryCtx(queryCtx)
1718
, Tables(tables)
19+
, UserRequestContext(userRequestContext)
1820
{
1921
YQL_ENSURE(QueryCtx);
2022
YQL_ENSURE(Tables);
@@ -24,6 +26,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
2426
const NYql::TKikimrConfiguration::TPtr Config;
2527
const TIntrusivePtr<NYql::TKikimrQueryContext> QueryCtx;
2628
const TIntrusivePtr<NYql::TKikimrTablesData> Tables;
29+
const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext> UserRequestContext;
2730
int JoinsCount{};
2831
int EquiJoinsCount{};
2932

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2363,11 +2363,21 @@ void PhyQuerySetTxPlans(NKqpProto::TKqpPhyQuery& queryProto, const TKqpPhysicalQ
23632363
txPlans.emplace_back(phyTx.GetPlan());
23642364
}
23652365

2366+
TString queryStats = "";
2367+
if (optCtx && optCtx->UserRequestContext && optCtx->UserRequestContext->PoolId) {
2368+
NJsonWriter::TBuf writer;
2369+
writer.BeginObject();
2370+
writer.WriteKey("ResourcePoolId").WriteString(optCtx->UserRequestContext->PoolId);
2371+
writer.EndObject();
2372+
2373+
queryStats = writer.Str();
2374+
}
2375+
23662376
NJsonWriter::TBuf writer;
23672377
writer.SetIndentSpaces(2);
23682378
WriteCommonTablesInfo(writer, serializerCtx.Tables);
23692379

2370-
queryProto.SetQueryPlan(SerializeTxPlans(txPlans, optCtx, writer.Str()));
2380+
queryProto.SetQueryPlan(SerializeTxPlans(txPlans, optCtx, writer.Str(), queryStats));
23712381
}
23722382

23732383
void FillAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqStatsAggr& aggr, const TString& name) {
@@ -2707,7 +2717,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
27072717
return AddExecStatsToTxPlan(txPlanJson, stats, TIntrusivePtr<NOpt::TKqpOptimizeContext>());
27082718
}
27092719

2710-
TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
2720+
TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats, const TString& poolId) {
27112721
TVector<const TString> txPlans;
27122722
for (const auto& execStats: queryStats.GetExecutions()) {
27132723
for (const auto& txPlan: execStats.GetTxPlansWithStats()) {
@@ -2731,7 +2741,10 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
27312741

27322742
writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs());
27332743
writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs());
2734-
writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
2744+
if (poolId) {
2745+
writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
2746+
writer.WriteKey("ResourcePoolId").WriteString(poolId);
2747+
}
27352748
writer.EndObject();
27362749

27372750
return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>(), "", writer.Str());

ydb/core/kqp/opt/kqp_query_plan.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ void PhyQuerySetTxPlans(NKqpProto::TKqpPhyQuery& queryProto, const NYql::NNodes:
4444
*/
4545
TString AddExecStatsToTxPlan(const TString& txPlan, const NYql::NDqProto::TDqExecutionStats& stats);
4646

47-
TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats);
47+
TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats, const TString& poolId = "");
4848

4949
TString SerializeScriptPlan(const TVector<const TString>& queryPlans);
5050

0 commit comments

Comments
 (0)