Skip to content

merge to ydb stable YQ WM add database id #14276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions ydb/core/kqp/common/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ struct TEvKqp {
struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
TEvScriptRequest() = default;

const TString& GetDatabase() const {
return Record.GetRequest().GetDatabase();
}

const TString& GetDatabaseId() const {
return Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
Record.MutableRequest()->SetDatabaseId(databaseId);
}

mutable NKikimrKqp::TEvQueryRequest Record;
TDuration ForgetAfter;
TDuration ResultsTtl;
Expand Down Expand Up @@ -164,6 +176,40 @@ struct TEvKqp {
return issues;
}
};

struct TEvUpdateDatabaseInfo : public TEventLocal<TEvUpdateDatabaseInfo, TKqpEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Status(status)
, Database(database)
, Issues(std::move(issues))
{}

TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless)
: Status(Ydb::StatusIds::SUCCESS)
, Database(database)
, DatabaseId(databaseId)
, Serverless(serverless)
, Issues({})
{}

Ydb::StatusIds::StatusCode Status;
TString Database;
TString DatabaseId;
bool Serverless = false;
NYql::TIssues Issues;
};

struct TEvDelayedRequestError : public TEventLocal<TEvDelayedRequestError, TKqpEvents::EvDelayedRequestError> {
TEvDelayedRequestError(THolder<IEventHandle> requestEvent, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: RequestEvent(std::move(requestEvent))
, Status(status)
, Issues(std::move(issues))
{}

THolder<IEventHandle> RequestEvent;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};
};

} // namespace NKikimr::NKqp
9 changes: 9 additions & 0 deletions ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,14 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
return PoolConfig;
}

const TString& GetDatabaseId() const {
return DatabaseId ? DatabaseId : Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

mutable NKikimrKqp::TEvQueryRequest Record;

private:
Expand All @@ -365,6 +373,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
TActorId RequestActorId;
TString Database;
TString DatabaseId;
TString SessionId;
TString YqlText;
TString QueryId;
Expand Down
52 changes: 34 additions & 18 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 {
FS_ROLLBACK,
};

struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
template <typename TEv, ui32 TEventType>
struct TEventWithDatabaseId : public NActors::TEventLocal<TEv, TEventType> {
TEventWithDatabaseId(const TString& database)
: Database(database)
, OperationId(id)
{}

const TString& GetDatabase() const {
return Database;
}

const TString& GetDatabaseId() const {
return DatabaseId;
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

const TString Database;
TString DatabaseId;
};

struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{}

const NOperationId::TOperationId OperationId;
};

Expand All @@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<T
NYql::TIssues Issues;
};

struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down Expand Up @@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
TMaybe<google::protobuf::Any> Metadata;
};

struct TEvListScriptExecutionOperations : public NActors::TEventLocal<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
struct TEvListScriptExecutionOperations : public TEventWithDatabaseId<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken)
: Database(database)
: TEventWithDatabaseId(database)
, PageSize(pageSize)
, PageToken(pageToken)
{}

TString Database;
ui64 PageSize;
TString PageToken;
};
Expand Down Expand Up @@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKi
struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> {
};

struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down
39 changes: 24 additions & 15 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/core/scheme/scheme_pathid.h>

#include <ydb/library/aclib/aclib.h>
#include <ydb/library/actors/core/event_local.h>
Expand All @@ -13,24 +14,24 @@
namespace NKikimr::NKqp::NWorkload {

struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
: Database(database)
TEvSubscribeOnPoolChanges(const TString& databaseId, const TString& poolId)
: DatabaseId(databaseId)
, PoolId(poolId)
{}

const TString Database;
const TString DatabaseId;
const TString PoolId;
};

struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: Database(database)
TEvPlaceRequestIntoPool(const TString& databaseId, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: DatabaseId(databaseId)
, SessionId(sessionId)
, PoolId(poolId)
, UserToken(userToken)
{}

const TString Database;
const TString DatabaseId;
const TString SessionId;
TString PoolId; // Can be changed to default pool id
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
Expand All @@ -51,15 +52,15 @@ struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqp
};

struct TEvCleanupRequest : public NActors::TEventLocal<TEvCleanupRequest, TKqpWorkloadServiceEvents::EvCleanupRequest> {
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
: Database(database)
TEvCleanupRequest(const TString& databaseId, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
: DatabaseId(databaseId)
, SessionId(sessionId)
, PoolId(poolId)
, Duration(duration)
, CpuConsumed(cpuConsumed)
{}

const TString Database;
const TString DatabaseId;
const TString SessionId;
const TString PoolId;
const TDuration Duration;
Expand All @@ -77,27 +78,35 @@ struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqp
};

struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWorkloadServiceEvents::EvUpdatePoolInfo> {
TEvUpdatePoolInfo(const TString& database, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
: Database(database)
TEvUpdatePoolInfo(const TString& databaseId, const TString& poolId, const std::optional<NResourcePool::TPoolSettings>& config, const std::optional<NACLib::TSecurityObject>& securityObject)
: DatabaseId(databaseId)
, PoolId(poolId)
, Config(config)
, SecurityObject(securityObject)
{}

const TString Database;
const TString DatabaseId;
const TString PoolId;
const std::optional<NResourcePool::TPoolSettings> Config;
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
: Database(database)
struct TEvFetchDatabaseResponse : public NActors::TEventLocal<TEvFetchDatabaseResponse, TKqpWorkloadServiceEvents::EvFetchDatabaseResponse> {
TEvFetchDatabaseResponse(Ydb::StatusIds::StatusCode status, const TString& database, const TString& databaseId, bool serverless, TPathId pathId, NYql::TIssues issues)
: Status(status)
, Database(database)
, DatabaseId(databaseId)
, Serverless(serverless)
, PathId(pathId)
, Issues(std::move(issues))
{}

const Ydb::StatusIds::StatusCode Status;
const TString Database;
const TString DatabaseId;
const bool Serverless;
const TPathId PathId;
const NYql::TIssues Issues;
};

} // NKikimr::NKqp::NWorkload
1 change: 1 addition & 0 deletions ydb/core/kqp/common/events/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ PEERDIR(
ydb/core/kqp/common/shutdown
ydb/core/kqp/common/compilation
ydb/core/resource_pools
ydb/core/scheme

ydb/library/yql/dq/actors
ydb/public/api/protos
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
}

Record.MutableRequest()->SetUsePublicResponseDataFormat(true);

if (!DatabaseId.empty()) {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}

Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/kqp_user_request_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
namespace NKikimr::NKqp {

void TUserRequestContext::Out(IOutputStream& o) const {
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", DatabaseId: " << DatabaseId << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << ", PoolId: " << PoolId << "}";
}

void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap) {
resultMap["TraceId"] = ctx.TraceId;
resultMap["Database"] = ctx.Database;
resultMap["DatabaseId"] = ctx.DatabaseId;
resultMap["SessionId"] = ctx.SessionId;
resultMap["CurrentExecutionId"] = ctx.CurrentExecutionId;
resultMap["CustomerSuppliedId"] = ctx.CustomerSuppliedId;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_user_request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace NKikimr::NKqp {
struct TUserRequestContext : public TAtomicRefCount<TUserRequestContext> {
TString TraceId;
TString Database;
TString DatabaseId;
TString SessionId;
TString CurrentExecutionId;
TString CustomerSuppliedId;
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ struct TKqpEvents {
EvListSessionsRequest,
EvListSessionsResponse,
EvListProxyNodesRequest,
EvListProxyNodesResponse
EvListProxyNodesResponse,
EvUpdateDatabaseInfo,
EvDelayedRequestError
};

static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
Expand Down Expand Up @@ -175,8 +177,8 @@ struct TKqpWorkloadServiceEvents {
EvCleanupRequest,
EvCleanupResponse,
EvUpdatePoolInfo,
EvUpdateDatabaseInfo,
EvSubscribeOnPoolChanges,
EvFetchDatabaseResponse,
};
};

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/common/simple/query_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

namespace NKikimr::NKqp {

TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
TKqpQueryId::TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
const TGUCSettings& gUCSettings)
: Cluster(cluster)
, Database(database)
, DatabaseId(databaseId)
, Text(text)
, Settings(settings)
, QueryParameterTypes(queryParameterTypes)
Expand Down Expand Up @@ -42,6 +43,7 @@ bool TKqpQueryId::IsSql() const {
bool TKqpQueryId::operator==(const TKqpQueryId& other) const {
if (!(Cluster == other.Cluster &&
Database == other.Database &&
DatabaseId == other.DatabaseId &&
UserSid == other.UserSid &&
Text == other.Text &&
Settings == other.Settings &&
Expand Down Expand Up @@ -79,6 +81,7 @@ TString TKqpQueryId::SerializeToString() const {
TStringBuilder result = TStringBuilder() << "{"
<< "Cluster: " << Cluster << ", "
<< "Database: " << Database << ", "
<< "DatabaseId: " << DatabaseId << ", "
<< "UserSid: " << UserSid << ", "
<< "Text: " << EscapeC(Text) << ", "
<< "Settings: " << Settings.SerializeToString() << ", ";
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/simple/query_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NKikimr::NKqp {
struct TKqpQueryId {
TString Cluster;
TString Database;
TString DatabaseId;
TString UserSid;
TString Text;
TKqpQuerySettings Settings;
Expand All @@ -21,7 +22,7 @@ struct TKqpQueryId {
TGUCSettings GUCSettings;

public:
TKqpQueryId(const TString& cluster, const TString& database, const TString& text,
TKqpQueryId(const TString& cluster, const TString& database, const TString& databaseId, const TString& text,
const TKqpQuerySettings& settings, std::shared_ptr<std::map<TString, Ydb::Type>> queryParameterTypes,
const TGUCSettings& gUCSettings);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState);
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, std::move(loader),
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, QueryId.DatabaseId, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, QueryServiceConfig);
Gateway->SetToken(QueryId.Cluster, UserToken);

Expand Down
Loading
Loading