Skip to content

WM fixed bugs and performance #7254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/core/kqp/common/kqp_user_request_context.h>
Expand Down Expand Up @@ -342,6 +343,14 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
return Record.GetRequest().GetPoolId();
}

void SetPoolConfig(const NResourcePool::TPoolSettings& config) {
PoolConfig = config;
}

std::optional<NResourcePool::TPoolSettings> GetPoolConfig() const {
return PoolConfig;
}

mutable NKikimrKqp::TEvQueryRequest Record;

private:
Expand Down Expand Up @@ -370,6 +379,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
TDuration CancelAfter;
TIntrusivePtr<TUserRequestContext> UserRequestContext;
TDuration ProgressStatsPeriod;
std::optional<NResourcePool::TPoolSettings> PoolConfig;
};

struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,18 @@ struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqp
const NYql::TIssues Issues;
};

struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWorkloadServiceEvents::EvUpdatePoolInfo> {
TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
: Database(database)
, PoolId(poolId)
, Config(config)
, SecurityObject(securityObject)
{}

const TString Database;
const TString PoolId;
const std::optional<NResourcePool::TPoolSettings> Config;
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

} // NKikimr::NKqp::NWorkload
2 changes: 1 addition & 1 deletion ydb/core/kqp/common/kqp_user_request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace NKikimr::NKqp {
TString CurrentExecutionId;
TString CustomerSuppliedId;
TString PoolId;
NResourcePool::TPoolSettings PoolConfig;
std::optional<NResourcePool::TPoolSettings> PoolConfig;

TUserRequestContext() = default;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ struct TKqpWorkloadServiceEvents {
EvContinueRequest,
EvCleanupRequest,
EvCleanupResponse,
EvUpdatePoolInfo,
};
};

Expand Down
33 changes: 31 additions & 2 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2243,7 +2243,7 @@ TString AddSimplifiedPlan(const TString& planText, TIntrusivePtr<NOpt::TKqpOptim
return planJson.GetStringRobust();
}

TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "") {
TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "", const TString& queryStats = "") {
NJsonWriter::TBuf writer;
writer.SetIndentSpaces(2);

Expand All @@ -2266,6 +2266,15 @@ TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NO
writer.BeginObject();
writer.WriteKey("Node Type").WriteString("Query");
writer.WriteKey("PlanNodeType").WriteString("Query");

if (queryStats) {
NJson::TJsonValue queryStatsJson;
NJson::ReadJsonTree(queryStats, &queryStatsJson, true);

writer.WriteKey("Stats");
writer.WriteJsonValue(&queryStatsJson);
}

writer.WriteKey("Plans");
writer.BeginList();

Expand Down Expand Up @@ -2705,7 +2714,27 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
txPlans.push_back(txPlan);
}
}
return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>());

NJsonWriter::TBuf writer;
writer.BeginObject();

if (queryStats.HasCompilation()) {
const auto& compilation = queryStats.GetCompilation();

writer.WriteKey("Compilation");
writer.BeginObject();
writer.WriteKey("FromCache").WriteBool(compilation.GetFromCache());
writer.WriteKey("DurationUs").WriteLongLong(compilation.GetDurationUs());
writer.WriteKey("CpuTimeUs").WriteLongLong(compilation.GetCpuTimeUs());
writer.EndObject();
}

writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs());
writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs());
writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
writer.EndObject();

return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>(), "", writer.Str());
}

TString SerializeScriptPlan(const TVector<const TString>& queryPlans) {
Expand Down
52 changes: 47 additions & 5 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ydb/core/cms/console/console.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/common/events/script_executions.h>
#include <ydb/core/kqp/common/events/workload_service.h>
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/kqp/common/kqp_timeouts.h>
#include <ydb/core/kqp/compile_service/kqp_compile_service.h>
Expand Down Expand Up @@ -691,11 +692,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
LocalSessions->AttachQueryText(sessionInfo, ev->Get()->GetQuery());
}

if (!FeatureFlags.GetEnableResourcePools()) {
ev->Get()->SetPoolId("");
} else if (!ev->Get()->GetPoolId()) {
// TODO: do not use default pool if there is no limits
ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID);
if (!TryFillPoolInfoFromCache(ev, requestId)) {
return;
}

TActorId targetId;
Expand Down Expand Up @@ -1348,6 +1346,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(TEvKqp::TEvListSessionsRequest, Handle);
hFunc(TEvKqp::TEvListProxyNodesRequest, Handle);
hFunc(NWorkload::TEvUpdatePoolInfo, Handle);
default:
Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->ToString().data());
Expand Down Expand Up @@ -1570,6 +1569,43 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
}
}

bool TryFillPoolInfoFromCache(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 requestId) {
if (!FeatureFlags.GetEnableResourcePools()) {
ev->Get()->SetPoolId("");
return true;
}

if (!ev->Get()->GetPoolId()) {
ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID);
}

const auto& poolId = ev->Get()->GetPoolId();
const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(ev->Get()->GetDatabase(), poolId);
if (!poolInfo) {
return true;
}

const auto& securityObject = poolInfo->SecurityObject;
const auto& userToken = ev->Get()->GetUserToken();
if (securityObject && userToken && !userToken->GetSerializedToken().empty()) {
if (!securityObject->CheckAccess(NACLib::EAccessRights::DescribeSchema, *userToken)) {
ReplyProcessError(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Resource pool " << poolId << " not found or you don't have access permissions", requestId);
return false;
}
if (!securityObject->CheckAccess(NACLib::EAccessRights::SelectRow, *userToken)) {
ReplyProcessError(Ydb::StatusIds::UNAUTHORIZED, TStringBuilder() << "You don't have access permissions for resource pool " << poolId, requestId);
return false;
}
}

const auto& poolConfig = poolInfo->Config;
if (!NWorkload::IsWorkloadServiceRequired(poolConfig)) {
ev->Get()->SetPoolConfig(poolConfig);
}

return true;
}

void UpdateYqlLogLevels() {
const auto& kqpYqlName = NKikimrServices::EServiceKikimr_Name(NKikimrServices::KQP_YQL);
for (auto &entry : LogConfig.GetEntry()) {
Expand Down Expand Up @@ -1755,6 +1791,10 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
Send(ev->Sender, result.release(), 0, ev->Cookie);
}

void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) {
ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject);
}

private:
NKikimrConfig::TLogConfig LogConfig;
NKikimrConfig::TTableServiceConfig TableServiceConfig;
Expand Down Expand Up @@ -1816,6 +1856,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
std::deque<TDelayedEvent> DelayedEventsQueue;
bool IsLookupByRmScheduled = false;
TActorId KqpTempTablesAgentActor;

TResourcePoolsCache ResourcePoolsCache;
};

} // namespace
Expand Down
37 changes: 37 additions & 0 deletions ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <ydb/core/base/appdata.h>
#include <ydb/core/base/path.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
Expand Down Expand Up @@ -415,4 +416,40 @@ class TLocalSessionsRegistry {
}
};

class TResourcePoolsCache {
struct TPoolInfo {
NResourcePool::TPoolSettings Config;
std::optional<NACLib::TSecurityObject> SecurityObject;
};

public:
std::optional<TPoolInfo> GetPoolInfo(const TString& database, const TString& poolId) const {
auto it = PoolsCache.find(GetPoolKey(database, poolId));
if (it == PoolsCache.end()) {
return std::nullopt;
}
return it->second;
}

void UpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject) {
const TString& poolKey = GetPoolKey(database, poolId);
if (!config) {
PoolsCache.erase(poolKey);
return;
}

auto& poolInfo = PoolsCache[poolKey];
poolInfo.Config = *config;
poolInfo.SecurityObject = securityObject;
}

private:
static TString GetPoolKey(const TString& database, const TString& poolId) {
return CanonizePath(TStringBuilder() << database << "/" << poolId);
}

private:
std::unordered_map<TString, TPoolInfo> PoolsCache;
};

} // namespace NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TKqpQueryState : public TNonCopyable {
UserRequestContext = MakeIntrusive<TUserRequestContext>(RequestEv->GetTraceId(), Database, sessionId);
}
UserRequestContext->PoolId = RequestEv->GetPoolId();
UserRequestContext->PoolConfig = RequestEv->GetPoolConfig();
}

// the monotonously growing counter, the ordinal number of the query,
Expand Down Expand Up @@ -114,6 +115,7 @@ class TKqpQueryState : public TNonCopyable {
bool IsDocumentApiRestricted_ = false;

TInstant StartTime;
TInstant ContinueTime;
NYql::TKikimrQueryDeadlines QueryDeadlines;
TKqpQueryStats QueryStats;
bool KeepSession = false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) {
NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const {
NKqpProto::TKqpStatsQuery result;
result.SetDurationUs(DurationUs);
result.SetQueuedTimeUs(QueuedTimeUs);

if (Compilation) {
result.MutableCompilation()->SetFromCache(Compilation->FromCache);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NKikimr::NKqp {

struct TKqpQueryStats {
ui64 DurationUs = 0;
ui64 QueuedTimeUs = 0;
std::optional<TKqpStatsCompile> Compilation;

ui64 WorkerCpuTimeUs = 0;
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

void PassRequestToResourcePool() {
if (QueryState->UserRequestContext->PoolConfig) {
LOG_D("request placed into pool from cache: " << QueryState->UserRequestContext->PoolId);
CompileQuery();
return;
}

Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvPlaceRequestIntoPool(
QueryState->Database,
SessionId,
Expand Down Expand Up @@ -475,6 +481,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void Handle(NWorkload::TEvContinueRequest::TPtr& ev) {
YQL_ENSURE(QueryState);
QueryState->ContinueTime = TInstant::Now();

if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) {
LOG_T("Failed to place request in resource pool, feature flag is disabled");
Expand Down Expand Up @@ -1552,6 +1559,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds());
stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds());
if (const auto continueTime = QueryState->ContinueTime) {
stats->QueuedTimeUs = (continueTime - QueryState->StartTime).MicroSeconds();
}
if (QueryState->CompileResult) {
stats->Compilation.emplace();
stats->Compilation->FromCache = (QueryState->CompileStats.FromCache);
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/workload_service/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ namespace NKikimr::NKqp::NWorkload {
NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);

// Fetch pool and create default pool if needed
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless);
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists);

// Fetch and create pool in scheme shard
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless);
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);

// Checks that database is serverless
NActors::IActor* CreateDatabaseFetcherActor(const NActors::TActorId& replyActorId, const TString& database);

// Cpu load fetcher actor
NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId);

Expand Down
Loading
Loading