Skip to content

Commit af8199d

Browse files
authored
YQ supported compute databases in fqrun and in memory cp storage (#16821)
1 parent 14032f4 commit af8199d

33 files changed

+1607
-1038
lines changed

ydb/core/fq/libs/compute/common/config.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,10 @@ class TComputeConfig {
159159
case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
160160
return {};
161161
case NConfig::TYdbComputeControlPlane::kSingle:
162-
return controlPlane.GetSingle().GetWorkloadManagerConfig();
162+
if (controlPlane.GetSingle().HasWorkloadManagerConfig()) {
163+
return controlPlane.GetSingle().GetWorkloadManagerConfig();
164+
}
165+
return controlPlane.GetDefaultWorkloadManagerConfig();
163166
case NConfig::TYdbComputeControlPlane::kCms:
164167
return GetWorkloadManagerConfig(scope, controlPlane.GetCms().GetDatabaseMapping());
165168
case NConfig::TYdbComputeControlPlane::kYdbcp:
@@ -169,7 +172,10 @@ class TComputeConfig {
169172

170173
NFq::NConfig::TWorkloadManagerConfig GetWorkloadManagerConfig(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const {
171174
auto computeDatabaseConfig = GetComputeDatabaseConfig(scope, databaseMapping);
172-
return computeDatabaseConfig.GetWorkloadManagerConfig();
175+
if (computeDatabaseConfig.HasWorkloadManagerConfig()) {
176+
return computeDatabaseConfig.GetWorkloadManagerConfig();
177+
}
178+
return ComputeConfig.GetYdb().GetControlPlane().GetDefaultWorkloadManagerConfig();
173179
}
174180

175181
NFq::NConfig::TComputeDatabaseConfig GetComputeDatabaseConfig(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const {

ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDa
8989
LOG_T("Scope: " << Scope << " Single control plane mode has been chosen");
9090
const auto& singleConfig = Config.GetYdb().GetControlPlane().GetSingle();
9191
*Result.mutable_connection() = singleConfig.GetConnection();
92-
Send(SynchronizationServiceActorId, new TEvYdbCompute::TEvSynchronizeRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope, singleConfig.GetConnection(), singleConfig.GetWorkloadManagerConfig()});
92+
Send(SynchronizationServiceActorId, new TEvYdbCompute::TEvSynchronizeRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope, singleConfig.GetConnection(), GetWorkloadManagerConfig(singleConfig)});
9393
}
9494
break;
9595
case NConfig::TYdbComputeControlPlane::kCms:
@@ -163,7 +163,7 @@ class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDa
163163
}
164164

165165
if (response.IsExists) {
166-
Send(SynchronizationServiceActorId, new TEvYdbCompute::TEvSynchronizeRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope, Result.connection(), client->Config.GetWorkloadManagerConfig()});
166+
Send(SynchronizationServiceActorId, new TEvYdbCompute::TEvSynchronizeRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope, Result.connection(), GetWorkloadManagerConfig(client->Config)});
167167
} else {
168168
auto invalidateSynchronizationEvent = std::make_unique<TEvControlPlaneStorage::TEvModifyDatabaseRequest>(Request->Get()->CloudId, Scope);
169169
invalidateSynchronizationEvent->Synchronized = false;
@@ -199,7 +199,7 @@ class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDa
199199
}
200200

201201
if (ev->Cookie == OnlyDatabaseCreateCookie) {
202-
Send(SynchronizationServiceActorId, new TEvYdbCompute::TEvSynchronizeRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope, Result.connection(), client->Config.GetWorkloadManagerConfig()});
202+
Send(SynchronizationServiceActorId, new TEvYdbCompute::TEvSynchronizeRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope, Result.connection(), GetWorkloadManagerConfig(client->Config)});
203203
return;
204204
}
205205
Send(ControlPlaneStorageServiceActorId(), new TEvControlPlaneStorage::TEvCreateDatabaseRequest{Request->Get()->CloudId, Scope, Result});
@@ -266,7 +266,7 @@ class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDa
266266
return;
267267
}
268268

269-
Send(SynchronizationServiceActorId, new TEvYdbCompute::TEvSynchronizeRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope, Result.connection(), client->Config.GetWorkloadManagerConfig()});
269+
Send(SynchronizationServiceActorId, new TEvYdbCompute::TEvSynchronizeRequest{Request.Get()->Get()->CloudId, Request.Get()->Get()->Scope, Result.connection(), GetWorkloadManagerConfig(client->Config)});
270270
}
271271

272272
void Handle(TEvYdbCompute::TEvSynchronizeResponse::TPtr& ev) {
@@ -298,11 +298,20 @@ class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDa
298298
}
299299
}
300300

301+
private:
302+
template <typename TComputeConfig>
303+
NConfig::TWorkloadManagerConfig GetWorkloadManagerConfig(const TComputeConfig& config) const {
304+
if (config.HasWorkloadManagerConfig()) {
305+
return config.GetWorkloadManagerConfig();
306+
}
307+
return Config.GetYdb().GetControlPlane().GetDefaultWorkloadManagerConfig();
308+
}
309+
301310
private:
302311
TString Scope;
303312
std::shared_ptr<TDatabaseClients> Clients;
304313
TActorId SynchronizationServiceActorId;
305-
NFq::NConfig::TComputeConfig Config;
314+
NConfig::TComputeConfig Config;
306315
TEvYdbCompute::TEvCreateDatabaseRequest::TPtr Request;
307316
FederatedQuery::Internal::ComputeDatabaseInternal Result;
308317

ydb/core/fq/libs/config/protos/compute.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ message TYdbComputeControlPlane {
8686
}
8787
string DatabasePrefix = 5;
8888
string DatabasesCacheReloadPeriod = 6;
89+
TWorkloadManagerConfig DefaultWorkloadManagerConfig = 7;
8990
}
9091

9192
message TYdbCompute {

ydb/core/fq/libs/control_plane_storage/events/events.h

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -680,31 +680,36 @@ struct TEvControlPlaneStorage {
680680
};
681681

682682
struct TEvCreateDatabaseRequest : NActors::TEventLocal<TEvCreateDatabaseRequest, EvCreateDatabaseRequest> {
683+
using TProto = FederatedQuery::Internal::ComputeDatabaseInternal;
684+
683685
TEvCreateDatabaseRequest() = default;
684686

685687
explicit TEvCreateDatabaseRequest(const TString& cloudId, const TString& scope, const FederatedQuery::Internal::ComputeDatabaseInternal& record)
686688
: CloudId(cloudId)
687689
, Scope(scope)
688-
, Record(record)
690+
, Request(record)
689691
{}
690692

691693
size_t GetByteSize() const {
692694
return sizeof(*this)
693695
+ CloudId.size()
694696
+ Scope.size()
695-
+ Record.ByteSizeLong();
697+
+ Request.ByteSizeLong();
696698
}
697699

698700
TString CloudId;
699701
TString Scope;
700-
FederatedQuery::Internal::ComputeDatabaseInternal Record;
702+
FederatedQuery::Internal::ComputeDatabaseInternal Request;
701703
};
702704

703705
struct TEvCreateDatabaseResponse : NActors::TEventLocal<TEvCreateDatabaseResponse, EvCreateDatabaseResponse> {
706+
using TProto = google::protobuf::Empty;
707+
704708
static constexpr bool Auditable = false;
705709

706-
explicit TEvCreateDatabaseResponse()
707-
{}
710+
explicit TEvCreateDatabaseResponse(const google::protobuf::Empty& response = {}) {
711+
Y_UNUSED(response);
712+
}
708713

709714
explicit TEvCreateDatabaseResponse(const NYql::TIssues& issues)
710715
: Issues(issues)
@@ -721,6 +726,7 @@ struct TEvControlPlaneStorage {
721726
};
722727

723728
struct TEvDescribeDatabaseRequest : NActors::TEventLocal<TEvDescribeDatabaseRequest, EvDescribeDatabaseRequest> {
729+
using TProto = google::protobuf::Empty;
724730

725731
TEvDescribeDatabaseRequest() = default;
726732

@@ -744,6 +750,8 @@ struct TEvControlPlaneStorage {
744750
struct TEvDescribeDatabaseResponse : NActors::TEventLocal<TEvDescribeDatabaseResponse, EvDescribeDatabaseResponse> {
745751
static constexpr bool Auditable = false;
746752

753+
using TProto = FederatedQuery::Internal::ComputeDatabaseInternal;
754+
747755
explicit TEvDescribeDatabaseResponse(const FederatedQuery::Internal::ComputeDatabaseInternal& record)
748756
: Record(record)
749757
{}
@@ -767,6 +775,8 @@ struct TEvControlPlaneStorage {
767775
};
768776

769777
struct TEvModifyDatabaseRequest : NActors::TEventLocal<TEvModifyDatabaseRequest, EvModifyDatabaseRequest> {
778+
using TProto = google::protobuf::Empty;
779+
770780
TEvModifyDatabaseRequest() = default;
771781

772782
explicit TEvModifyDatabaseRequest(const TString& cloudId, const TString& scope)
@@ -776,10 +786,12 @@ struct TEvControlPlaneStorage {
776786

777787
size_t GetByteSize() const {
778788
return sizeof(*this)
789+
+ Request.ByteSizeLong()
779790
+ CloudId.size()
780791
+ Scope.size();
781792
}
782793

794+
google::protobuf::Empty Request;
783795
TString CloudId;
784796
TString Scope;
785797
TMaybe<bool> Synchronized;
@@ -788,10 +800,13 @@ struct TEvControlPlaneStorage {
788800
};
789801

790802
struct TEvModifyDatabaseResponse : NActors::TEventLocal<TEvModifyDatabaseResponse, EvModifyDatabaseResponse> {
803+
using TProto = google::protobuf::Empty;
804+
791805
static constexpr bool Auditable = false;
792806

793-
explicit TEvModifyDatabaseResponse()
794-
{}
807+
explicit TEvModifyDatabaseResponse(const google::protobuf::Empty& response = {}) {
808+
Y_UNUSED(response);
809+
}
795810

796811
explicit TEvModifyDatabaseResponse(const NYql::TIssues& issues)
797812
: Issues(issues)

0 commit comments

Comments
 (0)