Skip to content

Commit 620351c

Browse files
committed
YQ-3445 added feature flag for resource pools on sls (ydb-platform#6808)
1 parent e4e8421 commit 620351c

File tree

12 files changed

+65
-25
lines changed

12 files changed

+65
-25
lines changed

ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -192,19 +192,19 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::DoModify(const NYql::TO
192192
TResourcePoolManager::TAsyncStatus TResourcePoolManager::CreateResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const {
193193
NKqpProto::TKqpSchemeOperation schemeOperation;
194194
PrepareCreateResourcePool(schemeOperation, settings, context);
195-
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context.GetExternalData(), nodeId);
195+
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kCreateResourcePool);
196196
}
197197

198198
TResourcePoolManager::TAsyncStatus TResourcePoolManager::AlterResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const {
199199
NKqpProto::TKqpSchemeOperation schemeOperation;
200200
PrepareAlterResourcePool(schemeOperation, settings, context);
201-
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context.GetExternalData(), nodeId);
201+
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kAlterResourcePool);
202202
}
203203

204204
TResourcePoolManager::TAsyncStatus TResourcePoolManager::DropResourcePool(const NYql::TCreateObjectSettings& settings, TInternalModificationContext& context, ui32 nodeId) const {
205205
NKqpProto::TKqpSchemeOperation schemeOperation;
206206
PrepareDropResourcePool(schemeOperation, settings, context);
207-
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context.GetExternalData(), nodeId);
207+
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context.GetExternalData(), nodeId, NKqpProto::TKqpSchemeOperation::kDropResourcePool);
208208
}
209209

210210
//// Deferred modification
@@ -271,11 +271,11 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecutePrepared(const N
271271
try {
272272
switch (schemeOperation.GetOperationCase()) {
273273
case NKqpProto::TKqpSchemeOperation::kCreateResourcePool:
274-
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context, nodeId);
274+
return ExecuteSchemeRequest(schemeOperation.GetCreateResourcePool(), context, nodeId, schemeOperation.GetOperationCase());
275275
case NKqpProto::TKqpSchemeOperation::kAlterResourcePool:
276-
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context, nodeId);
276+
return ExecuteSchemeRequest(schemeOperation.GetAlterResourcePool(), context, nodeId, schemeOperation.GetOperationCase());
277277
case NKqpProto::TKqpSchemeOperation::kDropResourcePool:
278-
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context, nodeId);
278+
return ExecuteSchemeRequest(schemeOperation.GetDropResourcePool(), context, nodeId, schemeOperation.GetOperationCase());
279279
default:
280280
return NThreading::MakeFuture(TYqlConclusionStatus::Fail(TStringBuilder() << "Execution of prepare operation for RESOURCE_POOL object: unsupported operation: " << static_cast<i32>(schemeOperation.GetOperationCase())));
281281
}
@@ -294,8 +294,13 @@ TResourcePoolManager::TAsyncStatus TResourcePoolManager::ChainFeatures(TAsyncSta
294294
});
295295
}
296296

297-
TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId) const {
298-
auto validationFuture = CheckFeatureFlag(context, nodeId);
297+
TResourcePoolManager::TAsyncStatus TResourcePoolManager::ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId, NKqpProto::TKqpSchemeOperation::OperationCase operationCase) const {
298+
TAsyncStatus validationFuture = NThreading::MakeFuture<TYqlConclusionStatus>(TYqlConclusionStatus::Success());
299+
if (operationCase != NKqpProto::TKqpSchemeOperation::kDropResourcePool) {
300+
validationFuture = ChainFeatures(validationFuture, [context, nodeId] {
301+
return CheckFeatureFlag(context, nodeId);
302+
});
303+
}
299304
return ChainFeatures(validationFuture, [schemeTx, context] {
300305
return SendSchemeRequest(schemeTx, context);
301306
});

ydb/core/kqp/gateway/behaviour/resource_pool/manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class TResourcePoolManager : public NMetadata::NModifications::IOperationsManage
3131
void PrepareDropResourcePool(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TDropObjectSettings& settings, TInternalModificationContext& context) const;
3232

3333
TAsyncStatus ChainFeatures(TAsyncStatus lastFeature, std::function<TAsyncStatus()> callback) const;
34-
TAsyncStatus ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId) const;
34+
TAsyncStatus ExecuteSchemeRequest(const NKikimrSchemeOp::TModifyScheme& schemeTx, const TExternalModificationContext& context, ui32 nodeId, NKqpProto::TKqpSchemeOperation::OperationCase operationCase) const;
3535
};
3636

3737
} // namespace NKikimr::NKqp

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6060,11 +6060,15 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
60606060
auto db = kikimr.GetTableClient();
60616061
auto session = db.CreateSession().GetValueSync().GetSession();
60626062

6063-
auto checkDisabled = [&session](const TString& query) {
6063+
auto checkQuery = [&session](const TString& query, EStatus status, const TString& error) {
60646064
Cerr << "Check query:\n" << query << "\n";
60656065
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
6066-
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::UNSUPPORTED);
6067-
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pools are disabled. Please contact your system administrator to enable it");
6066+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), status);
6067+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), error);
6068+
};
6069+
6070+
auto checkDisabled = [checkQuery](const TString& query) {
6071+
checkQuery(query, EStatus::UNSUPPORTED, "Resource pools are disabled. Please contact your system administrator to enable it");
60686072
};
60696073

60706074
// CREATE RESOURCE POOL
@@ -6083,7 +6087,9 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
60836087
)");
60846088

60856089
// DROP RESOURCE POOL
6086-
checkDisabled("DROP RESOURCE POOL MyResourcePool;");
6090+
checkQuery("DROP RESOURCE POOL MyResourcePool;",
6091+
EStatus::SCHEME_ERROR,
6092+
"Path does not exist");
60876093
}
60886094

60896095
Y_UNIT_TEST(ResourcePoolsValidation) {

ydb/core/kqp/workload_service/actors/actors.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ namespace NKikimr::NKqp::NWorkload {
99
NActors::IActor* CreatePoolHandlerActor(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters);
1010

1111
// Fetch pool and create default pool if needed
12-
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists);
12+
NActors::IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless);
1313

1414
// Fetch and create pool in scheme shard
15-
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken);
15+
NActors::IActor* CreatePoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless);
1616
NActors::IActor* CreatePoolCreatorActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl);
1717

1818
// Cpu load fetcher actor

ydb/core/kqp/workload_service/actors/scheme_actors.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ using namespace NActors;
2020

2121
class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
2222
public:
23-
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists)
23+
TPoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless)
2424
: Event(std::move(event))
25+
, EnableOnServerless(enableOnServerless)
2526
{
2627
if (!Event->Get()->PoolId) {
2728
Event->Get()->PoolId = NResourcePool::DEFAULT_POOL_ID;
@@ -36,7 +37,7 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
3637

3738
void StartPoolFetchRequest() const {
3839
LOG_D("Start pool fetching");
39-
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken));
40+
Register(CreatePoolFetcherActor(SelfId(), Event->Get()->Database, Event->Get()->PoolId, Event->Get()->UserToken, EnableOnServerless));
4041
}
4142

4243
void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) {
@@ -107,18 +108,20 @@ class TPoolResolverActor : public TActorBootstrapped<TPoolResolverActor> {
107108

108109
private:
109110
TEvPlaceRequestIntoPool::TPtr Event;
111+
const bool EnableOnServerless;
110112
bool CanCreatePool = false;
111113
bool DefaultPoolCreated = false;
112114
};
113115

114116

115117
class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
116118
public:
117-
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
119+
TPoolFetcherActor(const NActors::TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless)
118120
: ReplyActorId(replyActorId)
119121
, Database(database)
120122
, PoolId(poolId)
121123
, UserToken(userToken)
124+
, EnableOnServerless(enableOnServerless)
122125
{}
123126

124127
void DoBootstrap() {
@@ -133,6 +136,11 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
133136
}
134137

135138
const auto& result = results[0];
139+
if (!EnableOnServerless && result.DomainInfo && result.DomainInfo->IsServerless()) {
140+
Reply(Ydb::StatusIds::UNSUPPORTED, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
141+
return;
142+
}
143+
136144
switch (result.Status) {
137145
case EStatus::Unknown:
138146
case EStatus::PathNotTable:
@@ -222,6 +230,7 @@ class TPoolFetcherActor : public TSchemeActorBase<TPoolFetcherActor> {
222230
const TString Database;
223231
const TString PoolId;
224232
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
233+
const bool EnableOnServerless;
225234

226235
NResourcePool::TPoolSettings PoolConfig;
227236
NKikimrProto::TPathID PathId;
@@ -365,12 +374,12 @@ class TPoolCreatorActor : public TSchemeActorBase<TPoolCreatorActor> {
365374

366375
} // anonymous namespace
367376

368-
IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists) {
369-
return new TPoolResolverActor(std::move(event), defaultPoolExists);
377+
IActor* CreatePoolResolverActor(TEvPlaceRequestIntoPool::TPtr event, bool defaultPoolExists, bool enableOnServerless) {
378+
return new TPoolResolverActor(std::move(event), defaultPoolExists, enableOnServerless);
370379
}
371380

372-
IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken) {
373-
return new TPoolFetcherActor(replyActorId, database, poolId, userToken);
381+
IActor* CreatePoolFetcherActor(const TActorId& replyActorId, const TString& database, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken, bool enableOnServerless) {
382+
return new TPoolFetcherActor(replyActorId, database, poolId, userToken, enableOnServerless);
374383
}
375384

376385
IActor* CreatePoolCreatorActor(const TActorId& replyActorId, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NACLibProto::TDiffACL diffAcl) {

ydb/core/kqp/workload_service/kqp_workload_service.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
5858
CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(ActorContext(), Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));
5959

6060
EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools();
61+
EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless();
6162
if (EnabledResourcePools) {
6263
InitializeWorkloadService();
6364
}
@@ -84,6 +85,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
8485
const auto& event = ev->Get()->Record;
8586

8687
EnabledResourcePools = event.GetConfig().GetFeatureFlags().GetEnableResourcePools();
88+
EnabledResourcePoolsOnServerless = event.GetConfig().GetFeatureFlags().GetEnableResourcePoolsOnServerless();
8789
if (EnabledResourcePools) {
8890
LOG_I("Resource pools was enanbled");
8991
InitializeWorkloadService();
@@ -135,7 +137,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
135137

136138
LOG_D("Recieved new request from " << workerActorId << ", Database: " << ev->Get()->Database << ", PoolId: " << ev->Get()->PoolId << ", SessionId: " << ev->Get()->SessionId);
137139
bool hasDefaultPool = DatabasesWithDefaultPool.contains(CanonizePath(ev->Get()->Database));
138-
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool));
140+
Register(CreatePoolResolverActor(std::move(ev), hasDefaultPool, EnabledResourcePoolsOnServerless));
139141
}
140142

141143
void Handle(TEvCleanupRequest::TPtr& ev) {
@@ -520,6 +522,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
520522
NMonitoring::TDynamicCounterPtr Counters;
521523

522524
bool EnabledResourcePools = false;
525+
bool EnabledResourcePoolsOnServerless = false;
523526
bool ServiceInitialized = false;
524527
bool IdleChecksStarted = false;
525528
ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup;

ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ TEvPrivate::TEvFetchPoolResponse::TPtr FetchPool(TIntrusivePtr<IYdbSetup> ydb, c
1616
auto runtime = ydb->GetRuntime();
1717
const auto& edgeActor = runtime->AllocateEdgeActor();
1818

19-
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{})));
19+
runtime->Register(CreatePoolFetcherActor(edgeActor, settings.DomainName_, poolId ? poolId : settings.PoolId_, MakeIntrusive<NACLib::TUserToken>(userSID, TVector<NACLib::TSID>{}), true));
2020
return runtime->GrabEdgeEvent<TEvPrivate::TEvFetchPoolResponse>(edgeActor, FUTURE_WAIT_TIMEOUT);
2121
}
2222

ydb/core/protos/feature_flags.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ message TFeatureFlags {
141141
optional bool EnableExternalSourceSchemaInference = 126 [default = false];
142142
optional bool EnableDbMetadataCache = 127 [default = false];
143143
optional bool EnableTableDatetime64 = 128 [default = false];
144-
optional bool EnableResourcePools = 129 [default = false];
144+
optional bool EnableResourcePools = 129 [default = false];
145145
optional bool EnableColumnStatistics = 130 [default = false];
146146
optional bool EnableSingleCompositeActionGroup = 131 [default = false];
147+
optional bool EnableResourcePoolsOnServerless = 132 [default = false];
147148
}

ydb/core/tx/schemeshard/schemeshard__operation_alter_resource_pool.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@ class TAlterResourcePool : public TSubOperation {
129129
static_cast<ui64>(OperationId.GetTxId()),
130130
static_cast<ui64>(context.SS->SelfTabletId()));
131131

132+
if (context.SS->IsServerlessDomain(TPath::Init(context.SS->RootPathId(), context.SS))) {
133+
if (!context.SS->EnableResourcePoolsOnServerless) {
134+
result->SetError(NKikimrScheme::StatusPreconditionFailed, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
135+
return result;
136+
}
137+
}
138+
132139
const TPath& parentPath = TPath::Resolve(parentPathStr, context.SS);
133140
RETURN_RESULT_UNLESS(NResourcePool::IsParentPathValid(result, parentPath));
134141

ydb/core/tx/schemeshard/schemeshard__operation_create_resource_pool.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ class TCreateResourcePool : public TSubOperation {
155155
static_cast<ui64>(OperationId.GetTxId()),
156156
static_cast<ui64>(context.SS->SelfTabletId()));
157157

158+
if (context.SS->IsServerlessDomain(TPath::Init(context.SS->RootPathId(), context.SS))) {
159+
if (!context.SS->EnableResourcePoolsOnServerless) {
160+
result->SetError(NKikimrScheme::StatusPreconditionFailed, "Resource pools are disabled for serverless domains. Please contact your system administrator to enable it");
161+
return result;
162+
}
163+
}
164+
158165
const TPath& parentPath = TPath::Resolve(parentPathStr, context.SS);
159166
RETURN_RESULT_UNLESS(NResourcePool::IsParentPathValid(result, parentPath));
160167

0 commit comments

Comments
 (0)