Skip to content

Commit c89ab17

Browse files
committed
YQ-3689 added database id for workload manager (ydb-platform#9768)
1 parent a6127fb commit c89ab17

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+530
-356
lines changed

ydb/core/kqp/common/events/workload_service.h

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,24 @@
1414
namespace NKikimr::NKqp::NWorkload {
1515

1616
struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
17-
TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
18-
: Database(database)
17+
TEvSubscribeOnPoolChanges(const TString& databaseId, const TString& poolId)
18+
: DatabaseId(databaseId)
1919
, PoolId(poolId)
2020
{}
2121

22-
const TString Database;
22+
const TString DatabaseId;
2323
const TString PoolId;
2424
};
2525

2626
struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
27-
TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
28-
: Database(database)
27+
TEvPlaceRequestIntoPool(const TString& databaseId, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
28+
: DatabaseId(databaseId)
2929
, SessionId(sessionId)
3030
, PoolId(poolId)
3131
, UserToken(userToken)
3232
{}
3333

34-
const TString Database;
34+
const TString DatabaseId;
3535
const TString SessionId;
3636
TString PoolId; // Can be changed to default pool id
3737
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
@@ -52,15 +52,15 @@ struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqp
5252
};
5353

5454
struct TEvCleanupRequest : public NActors::TEventLocal<TEvCleanupRequest, TKqpWorkloadServiceEvents::EvCleanupRequest> {
55-
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
56-
: Database(database)
55+
TEvCleanupRequest(const TString& databaseId, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
56+
: DatabaseId(databaseId)
5757
, SessionId(sessionId)
5858
, PoolId(poolId)
5959
, Duration(duration)
6060
, CpuConsumed(cpuConsumed)
6161
{}
6262

63-
const TString Database;
63+
const TString DatabaseId;
6464
const TString SessionId;
6565
const TString PoolId;
6666
const TDuration Duration;
@@ -78,30 +78,32 @@ struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqp
7878
};
7979

8080
struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWorkloadServiceEvents::EvUpdatePoolInfo> {
81-
TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
82-
: Database(database)
81+
TEvUpdatePoolInfo(const TString& databaseId, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
82+
: DatabaseId(databaseId)
8383
, PoolId(poolId)
8484
, Config(config)
8585
, SecurityObject(securityObject)
8686
{}
8787

88-
const TString Database;
88+
const TString DatabaseId;
8989
const TString PoolId;
9090
const std::optional<NResourcePool::TPoolSettings> Config;
9191
const std::optional<NACLib::TSecurityObject> SecurityObject;
9292
};
9393

9494
struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, TKqpWorkloadServiceEvents::EvFetchDatabaseResponse> {
95-
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, bool serverless, TPathId pathId, NYql::TIssues issues)
95+
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, const TString& databaseId, bool serverless, TPathId pathId, NYql::TIssues issues)
9696
: Status(status)
9797
, Database(database)
98+
, DatabaseId(databaseId)
9899
, Serverless(serverless)
99100
, PathId(pathId)
100101
, Issues(std::move(issues))
101102
{}
102103

103104
const Ydb::StatusIds::StatusCode Status;
104105
const TString Database;
106+
const TString DatabaseId;
105107
const bool Serverless;
106108
const TPathId PathId;
107109
const NYql::TIssues Issues;

ydb/core/kqp/common/kqp_user_request_context.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
namespace NKikimr::NKqp {
44

55
void TUserRequestContext::Out(IOutputStream& o) const {
6-
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
6+
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", DatabaseId: " << DatabaseId << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
77
}
88

99
void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap) {
1010
resultMap["TraceId"] = ctx.TraceId;
1111
resultMap["Database"] = ctx.Database;
12+
resultMap["DatabaseId"] = ctx.DatabaseId;
1213
resultMap["SessionId"] = ctx.SessionId;
1314
resultMap["CurrentExecutionId"] = ctx.CurrentExecutionId;
1415
resultMap["CustomerSuppliedId"] = ctx.CustomerSuppliedId;

ydb/core/kqp/common/kqp_user_request_context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace NKikimr::NKqp {
1111
struct TUserRequestContext : public TAtomicRefCount<TUserRequestContext> {
1212
TString TraceId;
1313
TString Database;
14+
TString DatabaseId;
1415
TString SessionId;
1516
TString CurrentExecutionId;
1617
TString CustomerSuppliedId;

ydb/core/kqp/common/simple/query_id.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@
1010

1111
namespace NKikimr::NKqp {
1212

13-
TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
13+
TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
1414
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
1515
const TGUCSettings& gUCSettings)
1616
: Cluster(cluster)
1717
, Database(database)
18+
, DatabaseId(databaseId)
1819
, Text(text)
1920
, Settings(settings)
2021
, QueryParameterTypes(queryParameterTypes)
@@ -42,6 +43,7 @@ bool TKqpQueryId::IsSql() const {
4243
bool TKqpQueryId::operator==(const TKqpQueryId& other) const {
4344
if (!(Cluster == other.Cluster &&
4445
Database == other.Database &&
46+
DatabaseId == other.DatabaseId &&
4547
UserSid == other.UserSid &&
4648
Text == other.Text &&
4749
Settings == other.Settings &&
@@ -79,6 +81,7 @@ TString TKqpQueryId::SerializeToString() const {
7981
TStringBuilder result = TStringBuilder() << "{"
8082
<< "Cluster: " << Cluster << ", "
8183
<< "Database: " << Database << ", "
84+
<< "DatabaseId: " << DatabaseId << ", "
8285
<< "UserSid: " << UserSid << ", "
8386
<< "Text: " << EscapeC(Text) << ", "
8487
<< "Settings: " << Settings.SerializeToString() << ", ";

ydb/core/kqp/common/simple/query_id.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ namespace NKikimr::NKqp {
1313
struct TKqpQueryId {
1414
TString Cluster;
1515
TString Database;
16+
TString DatabaseId;
1617
TString UserSid;
1718
TString Text;
1819
TKqpQuerySettings Settings;
@@ -21,7 +22,7 @@ struct TKqpQueryId {
2122
TGUCSettings GUCSettings;
2223

2324
public:
24-
TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
25+
TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
2526
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
2627
const TGUCSettings& gUCSettings);
2728

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
266266
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
267267
std::make_shared<TKqpTableMetadataLoader>(
268268
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState);
269-
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
269+
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, QueryId.DatabaseId, std::move(loader),
270270
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
271271
Gateway->SetToken(QueryId.Cluster, UserToken);
272272

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ class TKqpQueryCache {
153153
}
154154
}
155155
}
156-
return TKqpQueryId{query.Cluster, query.Database, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings};
156+
return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings};
157157
}
158158

159159
TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
236236

237237
request.SetSchedulerGroup(UserRequestContext->PoolId);
238238
request.SetDatabase(Database);
239+
request.SetDatabaseId(UserRequestContext->DatabaseId);
239240
if (UserRequestContext->PoolConfig.has_value()) {
240241
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
241242
request.SetPoolMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0);

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
7272
, RequestType(requestType)
7373
, KqpTempTablesAgentActor(kqpTempTablesAgentActor)
7474
{
75+
YQL_ENSURE(RequestContext);
7576
YQL_ENSURE(PhyTx);
7677
YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME);
7778

@@ -402,6 +403,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
402403

403404
NMetadata::NModifications::IOperationsManager::TExternalModificationContext context;
404405
context.SetDatabase(Database);
406+
context.SetDatabaseId(RequestContext->DatabaseId);
405407
context.SetActorSystem(actorSystem);
406408
if (UserToken) {
407409
context.SetUserToken(*UserToken);

ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
4242
[[maybe_unused]]
4343
TIntrusivePtr<IKqpGateway> MakeIcGateway(const TKikimrRunner& kikimr) {
4444
auto actorSystem = kikimr.GetTestServer().GetRuntime()->GetAnyNodeActorSystem();
45-
return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", TKqpGatewaySettings(),
45+
return CreateKikimrIcGateway(TString(DefaultKikimrClusterName), "/Root", "/Root", TKqpGatewaySettings(),
4646
actorSystem, kikimr.GetTestServer().GetRuntime()->GetNodeId(0),
4747
TAlignedPagePoolCounters(), kikimr.GetTestServer().GetSettings().AppConfig->GetQueryServiceConfig());
4848
}

0 commit comments

Comments
 (0)