Skip to content

Commit 55b4a87

Browse files
committed
Fixes
1 parent 5e28ea4 commit 55b4a87

File tree

18 files changed

+165
-70
lines changed

18 files changed

+165
-70
lines changed

ydb/core/kqp/common/compilation/events.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace NKikimr::NKqp::NPrivateEvents {
1616

1717
struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
1818
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& clientAddress, const TMaybe<TString>& uid,
19-
TMaybe<TKqpQueryId>&& query, bool keepInCache, NKikimrKqp::EQueryAction queryAction, bool perStatementResult, TInstant deadline,
19+
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, TInstant deadline,
2020
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
2121
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
2222
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false, TMaybe<TQueryAst> queryAst = Nothing(),
@@ -26,7 +26,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
2626
, Uid(uid)
2727
, Query(std::move(query))
2828
, KeepInCache(keepInCache)
29-
, QueryAction(queryAction)
29+
, IsQueryActionPrepare(isQueryActionPrepare)
3030
, PerStatementResult(perStatementResult)
3131
, Deadline(deadline)
3232
, DbCounters(dbCounters)
@@ -50,7 +50,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
5050
TMaybe<TString> Uid;
5151
TMaybe<TKqpQueryId> Query;
5252
bool KeepInCache = false;
53-
NKikimrKqp::EQueryAction QueryAction;
53+
bool IsQueryActionPrepare = false;
5454
bool PerStatementResult = false;
5555
// it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration)
5656
TInstant Deadline;
@@ -76,7 +76,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
7676

7777
struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
7878
TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& clientAddress, const TString& uid,
79-
const TMaybe<TKqpQueryId>& query, NKikimrKqp::EQueryAction queryAction, TInstant deadline,
79+
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
8080
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
8181
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
8282
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing(),
@@ -85,7 +85,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
8585
, ClientAddress(clientAddress)
8686
, Uid(uid)
8787
, Query(query)
88-
, QueryAction(queryAction)
88+
, IsQueryActionPrepare(isQueryActionPrepare)
8989
, Deadline(deadline)
9090
, DbCounters(dbCounters)
9191
, GUCSettings(gUCSettings)
@@ -105,7 +105,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
105105
TString ClientAddress;
106106
TString Uid;
107107
TMaybe<TKqpQueryId> Query;
108-
NKikimrKqp::EQueryAction QueryAction;
108+
bool IsQueryActionPrepare = false;
109109

110110
TInstant Deadline;
111111
TKqpDbCountersPtr DbCounters;
@@ -126,16 +126,14 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
126126
};
127127

128128
struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
129-
TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {}, const std::optional<TString>& replayMessage = std::nullopt)
129+
TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {})
130130
: CompileResult(compileResult)
131-
, ReplayMessage(replayMessage)
132131
, Orbit(std::move(orbit)) {
133132
}
134133

135134
TKqpCompileResult::TConstPtr CompileResult;
136135
TKqpStatsCompile Stats;
137136
std::optional<TString> ReplayMessage;
138-
std::optional<TString> ReplayMessageUserView;
139137

140138
NLWTrace::TOrbit Orbit;
141139
};

ydb/core/kqp/common/compilation/result.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,22 @@ struct TKqpCompileResult {
1616

1717
TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues,
1818
ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {}, TMaybe<TQueryAst> queryAst = {},
19-
bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
19+
bool needToSplit = false, const TMaybe<TString>& commandTagName = {}, const TMaybe<TString>& replayMessageUserView = {})
2020
: Status(status)
2121
, Issues(issues)
2222
, Query(std::move(query))
2323
, Uid(uid)
2424
, MaxReadType(maxReadType)
2525
, QueryAst(std::move(queryAst))
2626
, NeedToSplit(needToSplit)
27-
, CommandTagName(commandTagName) {}
27+
, CommandTagName(commandTagName)
28+
, ReplayMessageUserView(replayMessageUserView) {}
2829

2930
static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, const Ydb::StatusIds::StatusCode& status,
3031
const NYql::TIssues& issues, ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {},
31-
TMaybe<TQueryAst> queryAst = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
32+
TMaybe<TQueryAst> queryAst = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {}, const TMaybe<TString>& replayMessageUserView = {})
3233
{
33-
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(queryAst), needToSplit, commandTagName);
34+
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(queryAst), needToSplit, commandTagName, replayMessageUserView);
3435
}
3536

3637
std::shared_ptr<NYql::TAstParseResult> GetAst() const;
@@ -47,6 +48,8 @@ struct TKqpCompileResult {
4748
bool NeedToSplit = false;
4849
TMaybe<TString> CommandTagName = {};
4950

51+
TMaybe<TString> ReplayMessageUserView;
52+
5053
std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
5154
};
5255

ydb/core/kqp/common/events/query.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
282282
}
283283

284284
bool GetCollectDiagnostics() const {
285-
return Record.MutableRequest()->GetCollectDiagnostics();
285+
return Record.GetRequest().GetCollectDiagnostics();
286286
}
287287

288288
ui32 CalculateSerializedSize() const override {

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
5252
TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
5353
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
5454
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics,
55-
bool perStatementResult, NKikimrKqp::EQueryAction queryAction,
55+
bool perStatementResult,
5656
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst,
5757
NYql::TExprContext* splitCtx,
5858
NYql::TExprNode::TPtr splitExpr)
@@ -75,7 +75,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
7575
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
7676
, TempTablesState(std::move(tempTablesState))
7777
, CollectFullDiagnostics(collectFullDiagnostics)
78-
, QueryAction(queryAction)
7978
, CompileAction(compileAction)
8079
, QueryAst(std::move(queryAst))
8180
, SplitCtx(splitCtx)
@@ -365,9 +364,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
365364
replayMessage.InsertValue("query_syntax", ToString(Config->_KqpYqlSyntaxVersion.Get().GetRef()));
366365
replayMessage.InsertValue("query_database", QueryId.Database);
367366
replayMessage.InsertValue("query_cluster", QueryId.Cluster);
368-
if (QueryAction == NKikimrKqp::QUERY_ACTION_EXPLAIN) {
369-
replayMessage.InsertValue("query_plan", queryPlan);
370-
}
371367
replayMessage.InsertValue("query_type", ToString(QueryId.Settings.QueryType));
372368

373369
if (CollectFullDiagnostics) {
@@ -382,6 +378,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
382378
ReplayMessageUserView = NJson::WriteJson(replayMessage, /*formatOutput*/ false);
383379
}
384380

381+
replayMessage.InsertValue("query_plan", queryPlan);
385382
replayMessage.InsertValue("query_text", EscapeC(QueryId.Text));
386383
replayMessage.InsertValue("table_metadata", TString(NJson::WriteJson(tablesMeta, false)));
387384
replayMessage.InsertValue("table_meta_serialization_type", EMetaSerializationType::EncodedProto);
@@ -404,10 +401,12 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
404401
<< ", issues: " << KqpCompileResult->Issues.ToString()
405402
<< ", uid: " << KqpCompileResult->Uid);
406403

404+
if (ReplayMessageUserView) {
405+
KqpCompileResult->ReplayMessageUserView = std::move(*ReplayMessageUserView);
406+
}
407407
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(KqpCompileResult);
408408

409409
responseEv->ReplayMessage = std::move(ReplayMessage);
410-
responseEv->ReplayMessageUserView = std::move(ReplayMessageUserView);
411410
ReplayMessage = std::nullopt;
412411
ReplayMessageUserView = std::nullopt;
413412
auto& stats = responseEv->Stats;
@@ -616,7 +615,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
616615
bool CollectFullDiagnostics;
617616

618617
bool PerStatementResult;
619-
NKikimrKqp::EQueryAction QueryAction;
620618
ECompileActorAction CompileAction;
621619
TMaybe<TQueryAst> QueryAst;
622620

@@ -666,7 +664,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
666664
const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& clientAddress,
667665
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings,
668666
const TMaybe<TString>& applicationName, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
669-
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, NKikimrKqp::EQueryAction queryAction,
667+
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
670668
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
671669
bool perStatementResult, NYql::TExprContext* splitCtx, NYql::TExprNode::TPtr splitExpr)
672670
{
@@ -675,7 +673,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
675673
uid, query, userToken, clientAddress, dbCounters,
676674
federatedQuerySetup, userRequestContext,
677675
std::move(traceId), std::move(tempTablesState), collectFullDiagnostics,
678-
perStatementResult, queryAction, compileAction, std::move(queryAst),
676+
perStatementResult, compileAction, std::move(queryAst),
679677
splitCtx, splitExpr);
680678
}
681679

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,17 @@ using namespace NYql;
2929

3030

3131
struct TKqpCompileSettings {
32-
TKqpCompileSettings(bool keepInCache, NKikimrKqp::EQueryAction queryAction, bool perStatementResult,
32+
TKqpCompileSettings(bool keepInCache, bool isQueryActionPrepare, bool perStatementResult,
3333
const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE)
3434
: KeepInCache(keepInCache)
35-
, QueryAction(queryAction)
35+
, IsQueryActionPrepare(isQueryActionPrepare)
3636
, PerStatementResult(perStatementResult)
3737
, Deadline(deadline)
3838
, Action(action)
3939
{}
4040

4141
bool KeepInCache;
42-
NKikimrKqp::EQueryAction QueryAction;
42+
bool IsQueryActionPrepare;
4343
bool PerStatementResult;
4444
TInstant Deadline;
4545
ECompileActorAction Action;
@@ -461,7 +461,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
461461

462462
TKqpCompileSettings compileSettings(
463463
request.KeepInCache,
464-
request.QueryAction,
464+
request.IsQueryActionPrepare,
465465
request.PerStatementResult,
466466
request.Deadline,
467467
ev->Get()->Split
@@ -529,7 +529,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
529529

530530
TKqpCompileSettings compileSettings(
531531
true,
532-
request.QueryAction,
532+
request.IsQueryActionPrepare,
533533
false,
534534
request.Deadline,
535535
ev->Get()->Split
@@ -610,7 +610,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
610610

611611
if (compileResult->NeedToSplit) {
612612
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
613-
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
613+
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
614614
ProcessQueue(ctx);
615615
return;
616616
}
@@ -623,7 +623,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
623623
try {
624624
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
625625
if (!hasTempTablesNameClashes) {
626-
UpdateQueryCache(ctx, compileResult, keepInCache, compileRequest.CompileSettings.QueryAction == NKikimrKqp::QUERY_ACTION_PREPARE, isPerStatementExecution);
626+
UpdateQueryCache(ctx, compileResult, keepInCache, compileRequest.CompileSettings.IsQueryActionPrepare, isPerStatementExecution);
627627
}
628628

629629
if (ev->Get()->ReplayMessage && !QueryReplayBackend->IsNull()) {
@@ -635,7 +635,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
635635
for (auto& request : requests) {
636636
LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString());
637637
Reply(request.Sender, compileResult, compileStats, ctx,
638-
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
638+
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan));
639639
}
640640
} else {
641641
if (!hasTempTablesNameClashes) {
@@ -647,7 +647,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
647647

648648
LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString());
649649
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
650-
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
650+
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
651651
}
652652
catch (const std::exception& e) {
653653
LogException("TEvCompileResponse", ev->Sender, e, ctx);
@@ -809,7 +809,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
809809
if (compileResult->GetAst() && QueryCache->FindByAst(query, *compileResult->GetAst(), keepInCache)) {
810810
return false;
811811
}
812-
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst);
812+
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst,
813+
false, {}, compileResult->ReplayMessageUserView);
813814
newCompileResult->AllowCache = compileResult->AllowCache;
814815
newCompileResult->PreparedQuery = compileResult->PreparedQuery;
815816
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString());
@@ -846,7 +847,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
846847
void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) {
847848
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, TableServiceConfig, QueryServiceConfig, ModuleResolverState, Counters,
848849
request.Uid, request.Query, request.UserToken, request.ClientAddress, FederatedQuerySetup, request.DbCounters, request.GUCSettings, request.ApplicationName, request.UserRequestContext,
849-
request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.CompileSettings.QueryAction, request.CompileSettings.Action, std::move(request.QueryAst), CollectDiagnostics,
850+
request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.CompileSettings.Action, std::move(request.QueryAst), CollectDiagnostics,
850851
request.CompileSettings.PerStatementResult, request.SplitCtx, request.SplitExpr);
851852
auto compileActorId = ctx.Register(compileActor, TMailboxType::HTSwap,
852853
AppData(ctx)->UserPoolId);
@@ -865,7 +866,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
865866

866867
void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
867868
const TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
868-
NLWTrace::TOrbit orbit, NWilson::TSpan span, const std::optional<TString>& replayMessage = std::nullopt)
869+
NLWTrace::TOrbit orbit, NWilson::TSpan span)
869870
{
870871
const auto& query = compileResult->Query;
871872
LWTRACK(KqpCompileServiceReply,
@@ -878,7 +879,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
878879
<< ", queryUid: " << compileResult->Uid
879880
<< ", status:" << compileResult->Status);
880881

881-
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit), replayMessage);
882+
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit));
882883
responseEv->Stats = compileStats;
883884

884885
if (span) {

ydb/core/kqp/compile_service/kqp_compile_service.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
166166
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
167167
const TIntrusivePtr<TUserRequestContext>& userRequestContext, NWilson::TTraceId traceId = {},
168168
TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
169-
NKikimrKqp::EQueryAction queryAction = NKikimrKqp::QUERY_ACTION_EXECUTE,
170169
ECompileActorAction compileAction = ECompileActorAction::COMPILE,
171170
TMaybe<TQueryAst> queryAst = {},
172171
bool collectFullDiagnostics = false,

0 commit comments

Comments
 (0)