Skip to content

Commit 5f596a9

Browse files
authored
[stable-25-1] EvWrite & CTAS fixes (#19171)
2 parents 1cf95a5 + c16e951 commit 5f596a9

File tree

15 files changed

+272
-67
lines changed

15 files changed

+272
-67
lines changed

.github/config/muted_ya.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart
4646
ydb/core/kqp/ut/olap [*/*] chunk chunk
4747
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable+ColumnStore
4848
ydb/core/kqp/ut/query KqpAnalyze.AnalyzeTable-ColumnStore
49+
ydb/core/kqp/ut/query KqpLimits.StreamWrite+Allowed
4950
ydb/core/kqp/ut/query KqpStats.DeferredEffects+UseSink
5051
ydb/core/kqp/ut/query KqpStats.SysViewClientLost
5152
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns

ydb/core/kqp/common/compilation/events.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
2020
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
2121
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
2222
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false, TMaybe<TQueryAst> queryAst = Nothing(),
23-
bool split = false, NYql::TExprContext* splitCtx = nullptr, NYql::TExprNode::TPtr splitExpr = nullptr)
23+
bool split = false, std::shared_ptr<NYql::TExprContext> splitCtx = nullptr, NYql::TExprNode::TPtr splitExpr = nullptr)
2424
: UserToken(userToken)
2525
, ClientAddress(clientAddress)
2626
, Uid(uid)
@@ -39,8 +39,8 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
3939
, CollectDiagnostics(collectDiagnostics)
4040
, QueryAst(queryAst)
4141
, Split(split)
42-
, SplitCtx(splitCtx)
43-
, SplitExpr(splitExpr)
42+
, SplitCtx(std::move(splitCtx))
43+
, SplitExpr(std::move(splitExpr))
4444
{
4545
Y_ENSURE(Uid.Defined() != Query.Defined());
4646
}
@@ -70,7 +70,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
7070
TMaybe<TQueryAst> QueryAst;
7171
bool Split = false;
7272

73-
NYql::TExprContext* SplitCtx = nullptr;
73+
std::shared_ptr<NYql::TExprContext> SplitCtx = nullptr;
7474
NYql::TExprNode::TPtr SplitExpr = nullptr;
7575
};
7676

@@ -80,7 +80,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
8080
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
8181
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
8282
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing(),
83-
bool split = false, NYql::TExprContext* splitCtx = nullptr, NYql::TExprNode::TPtr splitExpr = nullptr)
83+
bool split = false, std::shared_ptr<NYql::TExprContext> splitCtx = nullptr, NYql::TExprNode::TPtr splitExpr = nullptr)
8484
: UserToken(userToken)
8585
, ClientAddress(clientAddress)
8686
, Uid(uid)
@@ -96,8 +96,8 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
9696
, IntrestedInResult(std::move(intrestedInResult))
9797
, QueryAst(queryAst)
9898
, Split(split)
99-
, SplitCtx(splitCtx)
100-
, SplitExpr(splitExpr)
99+
, SplitCtx(std::move(splitCtx))
100+
, SplitExpr(std::move(splitExpr))
101101
{
102102
}
103103

@@ -121,7 +121,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
121121
TMaybe<TQueryAst> QueryAst;
122122
bool Split = false;
123123

124-
NYql::TExprContext* SplitCtx = nullptr;
124+
std::shared_ptr<NYql::TExprContext> SplitCtx = nullptr;
125125
NYql::TExprNode::TPtr SplitExpr = nullptr;
126126
};
127127

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
335335
return false;
336336
}
337337

338-
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit) {
338+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool canUseVolatileTx, const bool commit) {
339339
auto getTable = [](const NKqpProto::TKqpPhyTableId& table) {
340340
return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId());
341341
};
@@ -402,7 +402,7 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
402402
NKikimrKqp::TKqpTableSinkSettings settings;
403403
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
404404
modifiedTables.insert(getTable(settings.GetTable()));
405-
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && !commit) {
405+
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && (!commit || !canUseVolatileTx)) {
406406
// INSERT with sink should be executed immediately, because it returns an error in case of duplicate rows.
407407
return true;
408408
}
@@ -415,5 +415,21 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
415415
return false;
416416
}
417417

418+
bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx) {
419+
for (const auto &stage : tx->GetStages()) {
420+
for (const auto& sink : stage.GetSinks()) {
421+
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
422+
YQL_ENSURE(sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>());
423+
NKikimrKqp::TKqpTableSinkSettings settings;
424+
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
425+
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) {
426+
return true;
427+
}
428+
}
429+
}
430+
}
431+
return false;
432+
}
433+
418434
} // namespace NKqp
419435
} // namespace NKikimr

ydb/core/kqp/common/kqp_tx.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,11 @@ class TShardIdToTableInfo {
165165
};
166166
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;
167167

168-
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit);
168+
bool HasUncommittedChangesRead(
169+
THashSet<NKikimr::TTableId>& modifiedTables,
170+
const NKqpProto::TKqpPhyQuery& physicalQuery,
171+
const bool canUseVolatileTx,
172+
const bool commit);
169173

170174
class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
171175
public:
@@ -321,9 +325,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
321325
return true;
322326
}
323327

324-
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool commit) {
328+
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool canUseVolatileTx, const bool commit) {
325329
NeedUncommittedChangesFlush = (DeferredEffects.Size() > kMaxDeferredEffects)
326-
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
330+
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, canUseVolatileTx, commit);
327331
if (NeedUncommittedChangesFlush) {
328332
ModifiedTablesSinceLastFlush.clear();
329333
}
@@ -528,4 +532,6 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
528532
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
529533
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
530534

535+
bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx);
536+
531537
} // namespace NKikimr::NKqp

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ class TKqpTransactionManager : public IKqpTransactionManager {
312312

313313
bool NeedCommit() const override {
314314
AFL_ENSURE(ActionsCount != 1 || IsSingleShard()); // ActionsCount == 1 then IsSingleShard()
315-
const bool dontNeedCommit = IsEmpty() || IsReadOnly() && ((ActionsCount == 1) || HasSnapshot());
315+
const bool dontNeedCommit = IsEmpty() || (IsReadOnly() && ((ActionsCount == 1) || HasSnapshot()));
316316
return !dontNeedCommit;
317317
}
318318

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
5454
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics,
5555
bool perStatementResult,
5656
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst,
57-
NYql::TExprContext* splitCtx,
57+
std::shared_ptr<NYql::TExprContext> splitCtx,
5858
NYql::TExprNode::TPtr splitExpr)
5959
: Owner(owner)
6060
, ModuleResolverState(moduleResolverState)
@@ -71,14 +71,14 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
7171
, Config(MakeIntrusive<TKikimrConfiguration>())
7272
, QueryServiceConfig(queryServiceConfig)
7373
, CompilationTimeout(TDuration::MilliSeconds(tableServiceConfig.GetCompileTimeoutMs()))
74+
, SplitCtx(std::move(splitCtx))
75+
, SplitExpr(std::move(splitExpr))
7476
, UserRequestContext(userRequestContext)
7577
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
7678
, TempTablesState(std::move(tempTablesState))
7779
, CollectFullDiagnostics(collectFullDiagnostics)
7880
, CompileAction(compileAction)
7981
, QueryAst(std::move(queryAst))
80-
, SplitCtx(splitCtx)
81-
, SplitExpr(splitExpr)
8282
{
8383
Config->Init(kqpSettings->DefaultSettings.GetDefaultSettings(), QueryId.Cluster, kqpSettings->Settings, false);
8484

@@ -270,15 +270,15 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
270270

271271
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
272272
prepareSettings.ConcurrentResults = false;
273-
AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings, SplitExpr);
273+
AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings, SplitExpr.get());
274274
break;
275275

276276
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
277-
AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings, SplitExpr);
277+
AsyncCompileResult = KqpHost->PrepareGenericQuery(QueryRef, prepareSettings, SplitExpr.get());
278278
break;
279279

280280
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
281-
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings, SplitExpr);
281+
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings, SplitExpr.get());
282282
break;
283283

284284
default:
@@ -319,7 +319,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
319319

320320
KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
321321
FederatedQuerySetup, UserToken, GUCSettings, QueryServiceConfig, ApplicationName, AppData(ctx)->FunctionRegistry,
322-
false, false, std::move(TempTablesState), nullptr, SplitCtx, UserRequestContext);
322+
false, false, std::move(TempTablesState), nullptr, SplitCtx.get(), UserRequestContext);
323323

324324
IKqpHost::TPrepareSettings prepareSettings;
325325
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
@@ -600,6 +600,8 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
600600
TDuration CompileCpuTime;
601601
TInstant RecompileStartTime;
602602
TActorId TimeoutTimerActorId;
603+
std::shared_ptr<NYql::TExprContext> SplitCtx;
604+
NYql::TExprNode::TPtr SplitExpr;
603605
TIntrusivePtr<IKqpGateway> Gateway;
604606
TIntrusivePtr<IKqpHost> KqpHost;
605607
TIntrusivePtr<IKqpHost::IAsyncQueryResult> AsyncCompileResult;
@@ -617,9 +619,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
617619
bool PerStatementResult;
618620
ECompileActorAction CompileAction;
619621
TMaybe<TQueryAst> QueryAst;
620-
621-
NYql::TExprContext* SplitCtx = nullptr;
622-
NYql::TExprNode::TPtr SplitExpr = nullptr;
623622
};
624623

625624
void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConfig& serviceConfig) {
@@ -667,15 +666,15 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
667666
const TMaybe<TString>& applicationName, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
668667
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
669668
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
670-
bool perStatementResult, NYql::TExprContext* splitCtx, NYql::TExprNode::TPtr splitExpr)
669+
bool perStatementResult, std::shared_ptr<NYql::TExprContext> splitCtx, NYql::TExprNode::TPtr splitExpr)
671670
{
672671
return new TKqpCompileActor(owner, kqpSettings, tableServiceConfig, queryServiceConfig,
673672
moduleResolverState, counters, gUCSettings, applicationName,
674673
uid, query, userToken, clientAddress, dbCounters,
675674
federatedQuerySetup, userRequestContext,
676675
std::move(traceId), std::move(tempTablesState), collectFullDiagnostics,
677676
perStatementResult, compileAction, std::move(queryAst),
678-
splitCtx, splitExpr);
677+
std::move(splitCtx), std::move(splitExpr));
679678
}
680679

681680
} // namespace NKqp

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ struct TKqpCompileRequest {
5252
const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {}, NWilson::TSpan span = {},
5353
TKqpTempTablesState::TConstPtr tempTablesState = {},
5454
TMaybe<TQueryAst> queryAst = {},
55-
NYql::TExprContext* splitCtx = nullptr,
55+
std::shared_ptr<NYql::TExprContext> splitCtx = nullptr,
5656
NYql::TExprNode::TPtr splitExpr = nullptr)
5757
: Sender(sender)
5858
, Query(std::move(query))
@@ -70,8 +70,8 @@ struct TKqpCompileRequest {
7070
, TempTablesState(std::move(tempTablesState))
7171
, IntrestedInResult(std::move(intrestedInResult))
7272
, QueryAst(std::move(queryAst))
73-
, SplitCtx(splitCtx)
74-
, SplitExpr(splitExpr)
73+
, SplitCtx(std::move(splitCtx))
74+
, SplitExpr(std::move(splitExpr))
7575
{}
7676

7777
TActorId Sender;
@@ -93,7 +93,7 @@ struct TKqpCompileRequest {
9393
std::shared_ptr<std::atomic<bool>> IntrestedInResult;
9494
TMaybe<TQueryAst> QueryAst;
9595

96-
NYql::TExprContext* SplitCtx;
96+
std::shared_ptr<NYql::TExprContext> SplitCtx;
9797
NYql::TExprNode::TPtr SplitExpr;
9898

9999
bool FindInCache = true;

ydb/core/kqp/compile_service/kqp_compile_service.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
170170
TMaybe<TQueryAst> queryAst = {},
171171
bool collectFullDiagnostics = false,
172172
bool PerStatementResult = false,
173-
NYql::TExprContext* ctx = nullptr,
173+
std::shared_ptr<NYql::TExprContext> ctx = nullptr,
174174
NYql::TExprNode::TPtr expr = nullptr);
175175

176176
IActor* CreateKqpCompileRequestActor(const TActorId& owner, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TMaybe<TString>& uid,

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -354,20 +354,13 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
354354
ShardedWriteController->Close();
355355
}
356356

357-
void SetParentTraceId(NWilson::TTraceId traceId) {
358-
ParentTraceId = std::move(traceId);
357+
void CleanupClosedTokens() {
358+
YQL_ENSURE(ShardedWriteController);
359+
ShardedWriteController->CleanupClosedTokens();
359360
}
360361

361-
void UpdateShards() {
362-
// TODO: Maybe there are better ways to initialize new shards...
363-
for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) {
364-
TxManager->AddShard(shardInfo.ShardId, IsOlap, TablePath);
365-
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
366-
if (shardInfo.HasRead) {
367-
flags |= IKqpTransactionManager::EAction::READ;
368-
}
369-
TxManager->AddAction(shardInfo.ShardId, flags);
370-
}
362+
void SetParentTraceId(NWilson::TTraceId traceId) {
363+
ParentTraceId = std::move(traceId);
371364
}
372365

373366
bool IsClosed() const {
@@ -878,18 +871,30 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
878871
}
879872
}
880873

874+
void UpdateShards() {
875+
for (const auto& shardInfo : ShardedWriteController->ExtractShardUpdates()) {
876+
TxManager->AddShard(shardInfo.ShardId, IsOlap, TablePath);
877+
IKqpTransactionManager::TActionFlags flags = IKqpTransactionManager::EAction::WRITE;
878+
if (shardInfo.HasRead) {
879+
flags |= IKqpTransactionManager::EAction::READ;
880+
}
881+
TxManager->AddAction(shardInfo.ShardId, flags);
882+
}
883+
}
884+
881885
void FlushBuffers() {
882886
ShardedWriteController->FlushBuffers();
883887
UpdateShards();
884888
}
885889

886-
bool Flush() {
887-
for (const auto& shardInfo : ShardedWriteController->GetPendingShards()) {
888-
if (!SendDataToShard(shardInfo.ShardId)) {
889-
return false;
890+
bool FlushToShards() {
891+
bool ok = true;
892+
ShardedWriteController->ForEachPendingShard([&](const auto& shardInfo) {
893+
if (ok && !SendDataToShard(shardInfo.ShardId)) {
894+
ok = false;
890895
}
891-
}
892-
return true;
896+
});
897+
return ok;
893898
}
894899

895900
bool SendDataToShard(const ui64 shardId) {
@@ -1516,7 +1521,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
15161521
}
15171522

15181523
if (Closed || outOfMemory) {
1519-
if (!WriteTableActor->Flush()) {
1524+
if (!WriteTableActor->FlushToShards()) {
15201525
return;
15211526
}
15221527
}
@@ -1933,7 +1938,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19331938
CA_LOG_D("Flush data");
19341939
for (auto& [_, info] : WriteInfos) {
19351940
if (info.WriteTableActor->IsReady()) {
1936-
if (!info.WriteTableActor->Flush()) {
1941+
if (!info.WriteTableActor->FlushToShards()) {
19371942
return false;
19381943
}
19391944
}
@@ -2807,6 +2812,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
28072812
Y_ABORT_UNLESS(GetTotalMemory() == 0);
28082813

28092814
for (auto& [_, info] : WriteInfos) {
2815+
info.WriteTableActor->CleanupClosedTokens();
28102816
info.WriteTableActor->Unlink();
28112817
}
28122818
}

0 commit comments

Comments
 (0)