diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 93f3ebb5b0dd..18818e958182 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -342,6 +343,14 @@ struct TEvQueryRequest: public NActors::TEventLocal GetPoolConfig() const { + return PoolConfig; + } + mutable NKikimrKqp::TEvQueryRequest Record; private: @@ -370,6 +379,7 @@ struct TEvQueryRequest: public NActors::TEventLocal UserRequestContext; TDuration ProgressStatsPeriod; + std::optional PoolConfig; }; struct TEvDataQueryStreamPart: public TEventPB { + TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional& config, const std::optional& securityObject) + : Database(database) + , PoolId(poolId) + , Config(config) + , SecurityObject(securityObject) + {} + + const TString Database; + const TString PoolId; + const std::optional Config; + const std::optional SecurityObject; +}; + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h index 1d5a966bd0fb..1aa4a0574d67 100644 --- a/ydb/core/kqp/common/kqp_user_request_context.h +++ b/ydb/core/kqp/common/kqp_user_request_context.h @@ -15,7 +15,7 @@ namespace NKikimr::NKqp { TString CurrentExecutionId; TString CustomerSuppliedId; TString PoolId; - NResourcePool::TPoolSettings PoolConfig; + std::optional PoolConfig; TUserRequestContext() = default; diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index de1be6b95043..b0002f332bd2 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -174,6 +174,7 @@ struct TKqpWorkloadServiceEvents { EvContinueRequest, EvCleanupRequest, EvCleanupResponse, + EvUpdatePoolInfo, }; }; diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index d9ce0c8f5345..bf0e642eb5d9 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -2243,7 +2243,7 @@ TString AddSimplifiedPlan(const TString& planText, TIntrusivePtr& txPlans, TIntrusivePtr optCtx, const TString commonPlanInfo = "") { +TString SerializeTxPlans(const TVector& txPlans, TIntrusivePtr optCtx, const TString commonPlanInfo = "", const TString& queryStats = "") { NJsonWriter::TBuf writer; writer.SetIndentSpaces(2); @@ -2266,6 +2266,15 @@ TString SerializeTxPlans(const TVector& txPlans, TIntrusivePtr()); + + NJsonWriter::TBuf writer; + writer.BeginObject(); + + if (queryStats.HasCompilation()) { + const auto& compilation = queryStats.GetCompilation(); + + writer.WriteKey("Compilation"); + writer.BeginObject(); + writer.WriteKey("FromCache").WriteBool(compilation.GetFromCache()); + writer.WriteKey("DurationUs").WriteLongLong(compilation.GetDurationUs()); + writer.WriteKey("CpuTimeUs").WriteLongLong(compilation.GetCpuTimeUs()); + writer.EndObject(); + } + + writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs()); + writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs()); + writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs()); + writer.EndObject(); + + return SerializeTxPlans(txPlans, TIntrusivePtr(), "", writer.Str()); } TString SerializeScriptPlan(const TVector& queryPlans) { diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 58a74ee9e2c8..ef26f3b6db75 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -691,11 +692,8 @@ class TKqpProxyService : public TActorBootstrapped { LocalSessions->AttachQueryText(sessionInfo, ev->Get()->GetQuery()); } - if (!FeatureFlags.GetEnableResourcePools()) { - ev->Get()->SetPoolId(""); - } else if (!ev->Get()->GetPoolId()) { - // TODO: do not use default pool if there is no limits - ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID); + if (!TryFillPoolInfoFromCache(ev, requestId)) { + return; } TActorId targetId; @@ -1348,6 +1346,7 @@ class TKqpProxyService : public TActorBootstrapped { hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); hFunc(TEvKqp::TEvListSessionsRequest, Handle); hFunc(TEvKqp::TEvListProxyNodesRequest, Handle); + hFunc(NWorkload::TEvUpdatePoolInfo, Handle); default: Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->ToString().data()); @@ -1570,6 +1569,43 @@ class TKqpProxyService : public TActorBootstrapped { } } + bool TryFillPoolInfoFromCache(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 requestId) { + if (!FeatureFlags.GetEnableResourcePools()) { + ev->Get()->SetPoolId(""); + return true; + } + + if (!ev->Get()->GetPoolId()) { + ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID); + } + + const auto& poolId = ev->Get()->GetPoolId(); + const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(ev->Get()->GetDatabase(), poolId); + if (!poolInfo) { + return true; + } + + const auto& securityObject = poolInfo->SecurityObject; + const auto& userToken = ev->Get()->GetUserToken(); + if (securityObject && userToken && !userToken->GetSerializedToken().empty()) { + if (!securityObject->CheckAccess(NACLib::EAccessRights::DescribeSchema, *userToken)) { + ReplyProcessError(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Resource pool " << poolId << " not found or you don't have access permissions", requestId); + return false; + } + if (!securityObject->CheckAccess(NACLib::EAccessRights::SelectRow, *userToken)) { + ReplyProcessError(Ydb::StatusIds::UNAUTHORIZED, TStringBuilder() << "You don't have access permissions for resource pool " << poolId, requestId); + return false; + } + } + + const auto& poolConfig = poolInfo->Config; + if (!NWorkload::IsWorkloadServiceRequired(poolConfig)) { + ev->Get()->SetPoolConfig(poolConfig); + } + + return true; + } + void UpdateYqlLogLevels() { const auto& kqpYqlName = NKikimrServices::EServiceKikimr_Name(NKikimrServices::KQP_YQL); for (auto &entry : LogConfig.GetEntry()) { @@ -1755,6 +1791,10 @@ class TKqpProxyService : public TActorBootstrapped { Send(ev->Sender, result.release(), 0, ev->Cookie); } + void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) { + ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject); + } + private: NKikimrConfig::TLogConfig LogConfig; NKikimrConfig::TTableServiceConfig TableServiceConfig; @@ -1816,6 +1856,8 @@ class TKqpProxyService : public TActorBootstrapped { std::deque DelayedEventsQueue; bool IsLookupByRmScheduled = false; TActorId KqpTempTablesAgentActor; + + TResourcePoolsCache ResourcePoolsCache; }; } // namespace diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index 1ca66b5f019a..e5023b09ed46 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -415,4 +416,40 @@ class TLocalSessionsRegistry { } }; +class TResourcePoolsCache { + struct TPoolInfo { + NResourcePool::TPoolSettings Config; + std::optional SecurityObject; + }; + +public: + std::optional GetPoolInfo(const TString& database, const TString& poolId) const { + auto it = PoolsCache.find(GetPoolKey(database, poolId)); + if (it == PoolsCache.end()) { + return std::nullopt; + } + return it->second; + } + + void UpdatePoolInfo(const TString& database, const TString& poolId, const std::optional& config, const std::optional& securityObject) { + const TString& poolKey = GetPoolKey(database, poolId); + if (!config) { + PoolsCache.erase(poolKey); + return; + } + + auto& poolInfo = PoolsCache[poolKey]; + poolInfo.Config = *config; + poolInfo.SecurityObject = securityObject; + } + +private: + static TString GetPoolKey(const TString& database, const TString& poolId) { + return CanonizePath(TStringBuilder() << database << "/" << poolId); + } + +private: + std::unordered_map PoolsCache; +}; + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 30b313fe9a7d..62c42af41072 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -85,6 +85,7 @@ class TKqpQueryState : public TNonCopyable { UserRequestContext = MakeIntrusive(RequestEv->GetTraceId(), Database, sessionId); } UserRequestContext->PoolId = RequestEv->GetPoolId(); + UserRequestContext->PoolConfig = RequestEv->GetPoolConfig(); } // the monotonously growing counter, the ordinal number of the query, @@ -114,6 +115,7 @@ class TKqpQueryState : public TNonCopyable { bool IsDocumentApiRestricted_ = false; TInstant StartTime; + TInstant ContinueTime; NYql::TKikimrQueryDeadlines QueryDeadlines; TKqpQueryStats QueryStats; bool KeepSession = false; diff --git a/ydb/core/kqp/session_actor/kqp_query_stats.cpp b/ydb/core/kqp/session_actor/kqp_query_stats.cpp index e26d6b5e7b8f..922b788419ea 100644 --- a/ydb/core/kqp/session_actor/kqp_query_stats.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_stats.cpp @@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) { NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const { NKqpProto::TKqpStatsQuery result; result.SetDurationUs(DurationUs); + result.SetQueuedTimeUs(QueuedTimeUs); if (Compilation) { result.MutableCompilation()->SetFromCache(Compilation->FromCache); diff --git a/ydb/core/kqp/session_actor/kqp_query_stats.h b/ydb/core/kqp/session_actor/kqp_query_stats.h index f73ce6316f07..9cda3417beb9 100644 --- a/ydb/core/kqp/session_actor/kqp_query_stats.h +++ b/ydb/core/kqp/session_actor/kqp_query_stats.h @@ -8,6 +8,7 @@ namespace NKikimr::NKqp { struct TKqpQueryStats { ui64 DurationUs = 0; + ui64 QueuedTimeUs = 0; std::optional Compilation; ui64 WorkerCpuTimeUs = 0; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index f763fb6b976e..9e12ffb19ae9 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -242,6 +242,12 @@ class TKqpSessionActor : public TActorBootstrapped { } void PassRequestToResourcePool() { + if (QueryState->UserRequestContext->PoolConfig) { + LOG_D("request placed into pool from cache: " << QueryState->UserRequestContext->PoolId); + CompileQuery(); + return; + } + Send(MakeKqpWorkloadServiceId(SelfId().NodeId()), new NWorkload::TEvPlaceRequestIntoPool( QueryState->Database, SessionId, @@ -475,6 +481,7 @@ class TKqpSessionActor : public TActorBootstrapped { void Handle(NWorkload::TEvContinueRequest::TPtr& ev) { YQL_ENSURE(QueryState); + QueryState->ContinueTime = TInstant::Now(); if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) { LOG_T("Failed to place request in resource pool, feature flag is disabled"); @@ -1552,6 +1559,9 @@ class TKqpSessionActor : public TActorBootstrapped { stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds()); stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds()); + if (const auto continueTime = QueryState->ContinueTime) { + stats->QueuedTimeUs = (continueTime - QueryState->StartTime).MicroSeconds(); + } if (QueryState->CompileResult) { stats->Compilation.emplace(); stats->Compilation->FromCache = (QueryState->CompileStats.FromCache); diff --git a/ydb/core/kqp/workload_service/actors/actors.h b/ydb/core/kqp/workload_service/actors/actors.h index 770867a58f1c..c575842faf78 100644 --- a/ydb/core/kqp/workload_service/actors/actors.h +++ b/ydb/core/kqp/workload_service/actors/actors.h @@ -9,12 +9,15 @@ namespace NKikimr::NKqp::NWorkload { NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters); // Fetch pool and create default pool if needed -NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless); +NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists); // Fetch and create pool in scheme shard -NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless); +NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken); NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl); +// Checks that database is serverless +NActors::IActor* CreateDatabaseFetcherActor(const NActors::TActorId& replyActorId, const TString& database); + // Cpu load fetcher actor NActors::IActor* CreateCpuLoadFetcherActor(const NActors::TActorId& replyActorId); diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index f0f11628a068..921944a0db41 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -54,6 +54,12 @@ class TPoolHandlerActorBase : public TActor { UpdateConfigCounters(poolConfig); } + void CollectRequestLatency(TInstant continueTime) { + if (continueTime) { + RequestsLatencyMs->Collect((TInstant::Now() - continueTime).MilliSeconds()); + } + } + void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) { InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0)); QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0)); @@ -106,6 +112,7 @@ class TPoolHandlerActorBase : public TActor { const TActorId WorkerActorId; const TString SessionId; const TInstant StartTime = TInstant::Now(); + TInstant ContinueTime; EState State = EState::Pending; bool Started = false; // after TEvContinueRequest success @@ -141,7 +148,7 @@ class TPoolHandlerActorBase : public TActor { // Schemeboard events hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); - IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted); + hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); IgnoreFunc(TEvTxProxySchemeCache::TEvWatchNotifyUnavailable); ) @@ -150,6 +157,8 @@ class TPoolHandlerActorBase : public TActor { this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0)); } + SendPoolInfoUpdate(std::nullopt, std::nullopt); + Counters.OnCleanup(); TBase::PassAway(); @@ -254,6 +263,24 @@ class TPoolHandlerActorBase : public TActor { NResourcePool::TPoolSettings poolConfig; ParsePoolSettings(result->GetPathDescription().GetResourcePoolDescription(), poolConfig); UpdatePoolConfig(poolConfig); + + const auto& pathDescription = result->GetPathDescription().GetSelf(); + NACLib::TSecurityObject object(pathDescription.GetOwner(), false); + if (object.MutableACL()->ParseFromString(pathDescription.GetEffectiveACL())) { + SendPoolInfoUpdate(poolConfig, object); + } else { + SendPoolInfoUpdate(poolConfig, std::nullopt); + } + } + + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) { + if (ev->Get()->Key != WatchKey) { + // Skip old paths watch notifications + return; + } + + LOG_D("Got delete notification"); + SendPoolInfoUpdate(std::nullopt, std::nullopt); } public: @@ -267,6 +294,7 @@ class TPoolHandlerActorBase : public TActor { if (status == Ydb::StatusIds::SUCCESS) { LocalInFlight++; request->Started = true; + request->ContinueTime = TInstant::Now(); Counters.LocalInFly->Inc(); Counters.ContinueOk->Inc(); Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); @@ -318,6 +346,10 @@ class TPoolHandlerActorBase : public TActor { RemoveRequest(request); } + void SendPoolInfoUpdate(const std::optional& config, const std::optional& securityObject) const { + this->Send(MakeKqpProxyID(this->SelfId().NodeId()), new TEvUpdatePoolInfo(Database, PoolId, config, securityObject)); + } + protected: virtual bool ShouldResign() const = 0; virtual void OnScheduleRequest(TRequest* request) = 0; @@ -387,7 +419,7 @@ class TPoolHandlerActorBase : public TActor { if (status == Ydb::StatusIds::SUCCESS) { Counters.CleanupOk->Inc(); - Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.CollectRequestLatency(request->ContinueTime); LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } else { Counters.CleanupError->Inc(); @@ -401,7 +433,7 @@ class TPoolHandlerActorBase : public TActor { this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release()); Counters.Cancelled->Inc(); - Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.CollectRequestLatency(request->ContinueTime); LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 55b2cd3085d7..7647bc4f348d 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -22,9 +22,8 @@ using namespace NActors; class TPoolResolverActor : public TActorBootstrapped { public: - TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) + TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) : Event(std::move(event)) - , EnableOnServerless(enableOnServerless) { if (!Event->Get()->PoolId) { Event->Get()->PoolId = NResourcePool::DEFAULT_POOL_ID; @@ -39,7 +38,7 @@ class TPoolResolverActor : public TActorBootstrapped { void StartPoolFetchRequest() const { LOG_D("Start pool fetching"); - Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken, EnableOnServerless)); + Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken)); } void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) { @@ -116,7 +115,6 @@ class TPoolResolverActor : public TActorBootstrapped { private: TEvPlaceRequestIntoPool::TPtr Event; - const bool EnableOnServerless; bool CanCreatePool = false; bool DefaultPoolCreated = false; }; @@ -124,12 +122,11 @@ class TPoolResolverActor : public TActorBootstrapped { class TPoolFetcherActor : public TSchemeActorBase { public: - TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) + TPoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken) : ReplyActorId(replyActorId) , Database(database) , PoolId(poolId) , UserToken(userToken) - , EnableOnServerless(enableOnServerless) {} void DoBootstrap() { @@ -144,11 +141,6 @@ class TPoolFetcherActor : public TSchemeActorBase { } const auto& result = results[0]; - if (!EnableOnServerless && result.DomainInfo && result.DomainInfo->IsServerless()) { - Reply(Ydb::StatusIds::UNSUPPORTED, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it"); - return; - } - switch (result.Status) { case EStatus::Unknown: case EStatus::PathNotTable: @@ -238,7 +230,6 @@ class TPoolFetcherActor : public TSchemeActorBase { const TString Database; const TString PoolId; const TIntrusiveConstPtr UserToken; - const bool EnableOnServerless; NResourcePool::TPoolSettings PoolConfig; NKikimrProto::TPathID PathId; @@ -451,18 +442,113 @@ class TPoolCreatorActor : public TSchemeActorBase { TActorId SchemePipeActorId; }; + +class TDatabaseFetcherActor : public TSchemeActorBase { +public: + TDatabaseFetcherActor(const TActorId& replyActorId, const TString& database) + : ReplyActorId(replyActorId) + , Database(database) + {} + + void DoBootstrap() { + Become(&TDatabaseFetcherActor::StateFunc); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + const auto& results = ev->Get()->Request->ResultSet; + if (results.size() != 1) { + Reply(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected scheme cache response"); + return; + } + + const auto& result = results[0]; + switch (result.Status) { + case EStatus::Unknown: + case EStatus::PathNotTable: + case EStatus::PathNotPath: + case EStatus::RedirectLookupError: + case EStatus::AccessDenied: + case EStatus::RootUnknown: + case EStatus::PathErrorUnknown: + Reply(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Database " << Database << " not found or you don't have access permissions"); + return; + case EStatus::LookupError: + case EStatus::TableCreationNotComplete: + if (!ScheduleRetry(TStringBuilder() << "Retry error " << result.Status)) { + Reply(Ydb::StatusIds::UNAVAILABLE, TStringBuilder() << "Retry limit exceeded on scheme error: " << result.Status); + } + return; + case EStatus::Ok: + Serverless = result.DomainInfo && result.DomainInfo->IsServerless(); + Reply(Ydb::StatusIds::SUCCESS); + return; + } + } + + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + StateFuncBase(ev); + } + } + +protected: + void StartRequest() override { + LOG_D("Start database fetching"); + auto event = NTableCreator::BuildSchemeCacheNavigateRequest({{}}, Database, nullptr); + event->ResultSet[0].Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(event.Release()), IEventHandle::FlagTrackDelivery); + } + + void OnFatalError(Ydb::StatusIds::StatusCode status, NYql::TIssue issue) override { + Reply(status, {std::move(issue)}); + } + + TString LogPrefix() const override { + return TStringBuilder() << "[TDatabaseFetcherActor] ActorId: " << SelfId() << ", Database: " << Database << ", "; + } + +private: + void Reply(Ydb::StatusIds::StatusCode status, const TString& message) { + Reply(status, {NYql::TIssue(message)}); + } + + void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) { + if (status == Ydb::StatusIds::SUCCESS) { + LOG_D("Database info successfully fetched"); + } else { + LOG_W("Failed to fetch database info, " << status << ", issues: " << issues.ToOneLineString()); + } + + Issues.AddIssues(std::move(issues)); + Send(ReplyActorId, new TEvPrivate::TEvFetchDatabaseResponse(status, Database, Serverless, std::move(Issues))); + PassAway(); + } + +private: + const TActorId ReplyActorId; + const TString Database; + + bool Serverless = false; +}; + } // anonymous namespace -IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) { - return new TPoolResolverActor(std::move(event), defaultPoolExists, enableOnServerless); +IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) { + return new TPoolResolverActor(std::move(event), defaultPoolExists); } -IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken, bool enableOnServerless) { - return new TPoolFetcherActor(replyActorId, database, poolId, userToken, enableOnServerless); +IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr userToken) { + return new TPoolFetcherActor(replyActorId, database, poolId, userToken); } IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr userToken, NACLibProto::TDiffACL diffAcl) { return new TPoolCreatorActor(replyActorId, database, poolId, poolConfig, userToken, diffAcl); } +IActor* CreateDatabaseFetcherActor(const TActorId& replyActorId, const TString& database) { + return new TDatabaseFetcherActor(replyActorId, database); +} + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index c32f4cd4f4d5..a0db39a644b5 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -22,6 +22,7 @@ struct TEvPrivate { EvRefreshPoolState = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvResolvePoolResponse, EvFetchPoolResponse, + EvFetchDatabaseResponse, EvCreatePoolResponse, EvPrepareTablesRequest, EvPlaceRequestIntoPoolResponse, @@ -85,6 +86,20 @@ struct TEvPrivate { const NYql::TIssues Issues; }; + struct TEvFetchDatabaseResponse : public NActors::TEventLocal { + TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, NYql::TIssues issues) + : Status(status) + , Database(database) + , Serverless(serverless) + , Issues(std::move(issues)) + {} + + const Ydb::StatusIds::StatusCode Status; + const TString Database; + const bool Serverless; + const NYql::TIssues Issues; + }; + struct TEvCreatePoolResponse : public NActors::TEventLocal { TEvCreatePoolResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) : Status(status) diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 66a6aaaaf64b..295e536d2499 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -27,6 +27,23 @@ using namespace NActors; class TKqpWorkloadService : public TActorBootstrapped { + struct TCounters { + const NMonitoring::TDynamicCounterPtr Counters; + + NMonitoring::TDynamicCounters::TCounterPtr ActivePools; + + TCounters(NMonitoring::TDynamicCounterPtr counters) + : Counters(counters) + { + Register(); + } + + private: + void Register() { + ActivePools = Counters->GetCounter("ActivePools", false); + } + }; + enum class ETablesCreationStatus { Cleanup, NotStarted, @@ -43,9 +60,7 @@ class TKqpWorkloadService : public TActorBootstrapped { public: explicit TKqpWorkloadService(NMonitoring::TDynamicCounterPtr counters) : Counters(counters) - { - RegisterCounters(); - } + {} void Bootstrap() { Become(&TKqpWorkloadService::MainState); @@ -55,7 +70,7 @@ class TKqpWorkloadService : public TActorBootstrapped { (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem }), IEventHandle::FlagTrackDelivery); - CpuQuotaManager = std::make_unique(ActorContext(), Counters->GetSubgroup("subcomponent", "CpuQuotaManager")); + CpuQuotaManager = std::make_unique(ActorContext(), Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager")); EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools(); EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless(); @@ -132,9 +147,9 @@ class TKqpWorkloadService : public TActorBootstrapped { return; } - LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); - bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database)); - Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless)); + const TString& database = ev->Get()->Database; + LOG_D("Recieved new request from " << workerActorId << ", Database: " << database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId); + GetOrCreateDatabaseState(database)->DoPlaceRequest(std::move(ev)); } void Handle(TEvCleanupRequest::TPtr& ev) { @@ -177,6 +192,7 @@ class TKqpWorkloadService : public TActorBootstrapped { hFunc(TEvCleanupRequest, Handle); hFunc(TEvents::TEvWakeup, Handle); + hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle); hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle); hFunc(TEvPrivate::TEvNodesInfoRequest, Handle); @@ -191,11 +207,15 @@ class TKqpWorkloadService : public TActorBootstrapped { ) private: + void Handle(TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + GetOrCreateDatabaseState(ev->Get()->Database)->UpdateDatabaseInfo(ev); + } + void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) { const auto& event = ev->Get()->Event; const TString& database = event->Get()->Database; if (ev->Get()->DefaultPoolCreated) { - DatabasesWithDefaultPool.insert(CanonizePath(database)); + GetOrCreateDatabaseState(database)->HasDefaultPool = true; } const TString& poolId = event->Get()->PoolId; @@ -211,10 +231,10 @@ class TKqpWorkloadService : public TActorBootstrapped { TString poolKey = GetPoolKey(database, poolId); LOG_I("Creating new handler for pool " << poolKey); - auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, ev->Get()->PoolConfig, Counters)); + auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, ev->Get()->PoolConfig, Counters.Counters)); poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second; - ActivePools->Inc(); + Counters.ActivePools->Inc(); ScheduleIdleCheck(); } @@ -392,7 +412,7 @@ class TKqpWorkloadService : public TActorBootstrapped { } IdleChecksStarted = true; - Schedule(IDLE_DURATION / 2, new TEvents::TEvWakeup()); + Schedule(IDLE_DURATION / 2, new TEvents::TEvWakeup(static_cast(EWakeUp::IdleCheck))); } void RunIdleCheck() { @@ -409,7 +429,7 @@ class TKqpWorkloadService : public TActorBootstrapped { } for (const auto& poolKey : poolsToDelete) { PoolIdToState.erase(poolKey); - ActivePools->Dec(); + Counters.ActivePools->Dec(); } if (!PoolIdToState.empty()) { @@ -446,7 +466,7 @@ class TKqpWorkloadService : public TActorBootstrapped { } void ScheduleNodeInfoRequest() const { - Schedule(IDLE_DURATION * 2, new TEvents::TEvWakeup(static_cast(EWakeUp::StartCpuLoadRequest))); + Schedule(IDLE_DURATION * 2, new TEvents::TEvWakeup(static_cast(EWakeUp::StartNodeInfoRequest))); } void RunNodeInfoRequest() const { @@ -472,6 +492,14 @@ class TKqpWorkloadService : public TActorBootstrapped { Send(replyActorId, new TEvCleanupResponse(status, {NYql::TIssue(message)})); } + TDatabaseState* GetOrCreateDatabaseState(const TString& database) { + auto databaseIt = DatabaseToState.find(database); + if (databaseIt != DatabaseToState.end()) { + return &databaseIt->second; + } + return &DatabaseToState.insert({database, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second; + } + TPoolState* GetPoolState(const TString& database, const TString& poolId) { return GetPoolState(GetPoolKey(database, poolId)); } @@ -492,12 +520,8 @@ class TKqpWorkloadService : public TActorBootstrapped { return "[Service] "; } - void RegisterCounters() { - ActivePools = Counters->GetCounter("ActivePools", false); - } - private: - NMonitoring::TDynamicCounterPtr Counters; + TCounters Counters; bool EnabledResourcePools = false; bool EnabledResourcePoolsOnServerless = false; @@ -506,16 +530,18 @@ class TKqpWorkloadService : public TActorBootstrapped { ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup; std::unordered_set PendingHandlers; - std::unordered_set DatabasesWithDefaultPool; + std::unordered_map DatabaseToState; std::unordered_map PoolIdToState; std::unique_ptr CpuQuotaManager; ui32 NodeCount = 0; - - NMonitoring::TDynamicCounters::TCounterPtr ActivePools; }; } // anonymous namespace +bool IsWorkloadServiceRequired(const NResourcePool::TPoolSettings& config) { + return config.ConcurrentQueryLimit != -1 || config.DatabaseLoadCpuThreshold >= 0.0 || config.QueryCancelAfter; +} + } // namespace NWorkload IActor* CreateKqpWorkloadService(NMonitoring::TDynamicCounterPtr counters) { diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.h b/ydb/core/kqp/workload_service/kqp_workload_service.h index 33371c97a249..5b6015328dd9 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service.h @@ -1,10 +1,18 @@ #pragma once +#include + #include namespace NKikimr::NKqp { +namespace NWorkload { + +bool IsWorkloadServiceRequired(const NResourcePool::TPoolSettings& config); + +} // namespace NWorkload + NActors::IActor* CreateKqpWorkloadService(NMonitoring::TDynamicCounterPtr counters); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 9ee91f077720..8503a4fb7949 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -2,14 +2,70 @@ #include +#include #include #include +#include namespace NKikimr::NKqp::NWorkload { constexpr TDuration IDLE_DURATION = TDuration::Seconds(60); + +struct TDatabaseState { + NActors::TActorContext ActorContext; + bool& EnabledResourcePoolsOnServerless; + + std::vector PendingRequersts = {}; + bool HasDefaultPool = false; + bool Serverless = false; + + TInstant LastUpdateTime = TInstant::Zero(); + + void DoPlaceRequest(TEvPlaceRequestIntoPool::TPtr ev) { + TString database = ev->Get()->Database; + PendingRequersts.emplace_back(std::move(ev)); + + if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) { + ActorContext.Register(CreateDatabaseFetcherActor(ActorContext.SelfID, database)); + } else { + StartPendingRequests(); + } + } + + void UpdateDatabaseInfo(const TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { + if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { + ReplyContinueError(ev->Get()->Status, GroupIssues(ev->Get()->Issues, "Failed to fetch database info")); + return; + } + + LastUpdateTime = TInstant::Now(); + Serverless = ev->Get()->Serverless; + StartPendingRequests(); + } + +private: + void StartPendingRequests() { + if (!EnabledResourcePoolsOnServerless && Serverless) { + ReplyContinueError(Ydb::StatusIds::UNSUPPORTED, {NYql::TIssue("Resource pools are disabled for serverless domains. Please contact your system administrator to enable it")}); + return; + } + + for (auto& ev : PendingRequersts) { + ActorContext.Register(CreatePoolResolverActor(std::move(ev), HasDefaultPool)); + } + PendingRequersts.clear(); + } + + void ReplyContinueError(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { + for (const auto& ev : PendingRequersts) { + ActorContext.Send(ev->Sender, new TEvContinueRequest(status, {}, {}, issues)); + } + PendingRequersts.clear(); + } +}; + struct TPoolState { NActors::TActorId PoolHandler; NActors::TActorContext ActorContext; diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp index 271d7accbbfd..8d6880d3eb58 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp @@ -18,7 +18,7 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr ydb, c auto userToken = MakeIntrusive(userSID, TVector{}); userToken->SaveSerializationInfo(); - runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken, true)); + runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, userToken)); return runtime->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); } diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index a750be18bd79..5dda602ba3fc 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -260,6 +260,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(NResourcePool::DEFAULT_POOL_ID))); ydb->WaitPoolHandlersCount(0, 2, TDuration::Seconds(95)); + + // Check pool creation after cleanup + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); } } diff --git a/ydb/core/protos/kqp_stats.proto b/ydb/core/protos/kqp_stats.proto index 24e09f2dcd90..df70faff1a13 100644 --- a/ydb/core/protos/kqp_stats.proto +++ b/ydb/core/protos/kqp_stats.proto @@ -73,6 +73,7 @@ message TKqpExecutionExtraStats { message TKqpStatsQuery { // Basic stats uint64 DurationUs = 1; + uint64 QueuedTimeUs = 9; TKqpStatsCompile Compilation = 2; reserved 3; // repeated TKqpStatsExecution Executions = 3;