Skip to content

Commit 6ae8233

Browse files
authored
[stable-25-1] EvWrite & CTAS fixes (#19423)
2 parents 18b0ee4 + a92f682 commit 6ae8233

File tree

13 files changed

+217
-55
lines changed

13 files changed

+217
-55
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
335335
return false;
336336
}
337337

338-
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool canUseVolatileTx, const bool commit) {
338+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit) {
339339
auto getTable = [](const NKqpProto::TKqpPhyTableId& table) {
340340
return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId());
341341
};
@@ -402,7 +402,7 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
402402
NKikimrKqp::TKqpTableSinkSettings settings;
403403
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
404404
modifiedTables.insert(getTable(settings.GetTable()));
405-
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && (!commit || !canUseVolatileTx)) {
405+
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && !commit) {
406406
// INSERT with sink should be executed immediately, because it returns an error in case of duplicate rows.
407407
return true;
408408
}
@@ -415,21 +415,5 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
415415
return false;
416416
}
417417

418-
bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx) {
419-
for (const auto &stage : tx->GetStages()) {
420-
for (const auto& sink : stage.GetSinks()) {
421-
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
422-
YQL_ENSURE(sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>());
423-
NKikimrKqp::TKqpTableSinkSettings settings;
424-
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
425-
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) {
426-
return true;
427-
}
428-
}
429-
}
430-
}
431-
return false;
432-
}
433-
434418
} // namespace NKqp
435419
} // namespace NKikimr

ydb/core/kqp/common/kqp_tx.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,7 @@ class TShardIdToTableInfo {
165165
};
166166
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;
167167

168-
bool HasUncommittedChangesRead(
169-
THashSet<NKikimr::TTableId>& modifiedTables,
170-
const NKqpProto::TKqpPhyQuery& physicalQuery,
171-
const bool canUseVolatileTx,
172-
const bool commit);
168+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit);
173169

174170
class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
175171
public:
@@ -325,9 +321,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
325321
return true;
326322
}
327323

328-
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool canUseVolatileTx, const bool commit) {
324+
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool commit) {
329325
NeedUncommittedChangesFlush = (DeferredEffects.Size() > kMaxDeferredEffects)
330-
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, canUseVolatileTx, commit);
326+
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
331327
if (NeedUncommittedChangesFlush) {
332328
ModifiedTablesSinceLastFlush.clear();
333329
}
@@ -532,6 +528,4 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
532528
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
533529
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
534530

535-
bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx);
536-
537531
} // namespace NKikimr::NKqp

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1635,7 +1635,13 @@ class TKqpHost : public IKqpHost {
16351635

16361636
YQL_ENSURE(ExprCtxStorage);
16371637

1638-
auto prepareData = PrepareRewrite(compileResult.QueryExpr, *ExprCtxStorage, *TypesCtx, SessionCtx, Cluster);
1638+
auto prepareData = PrepareRewrite(
1639+
compileResult.QueryExpr,
1640+
*ExprCtxStorage,
1641+
*TypesCtx,
1642+
SessionCtx,
1643+
*FuncRegistry,
1644+
Cluster);
16391645

16401646
return MakeIntrusive<TAsyncSplitQueryResult>(
16411647
compileResult.QueryExpr,

ydb/core/kqp/host/kqp_statement_rewrite.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ namespace {
9898
NYql::TExprContext& exprCtx,
9999
NYql::TTypeAnnotationContext& typeCtx,
100100
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
101+
const NMiniKQL::IFunctionRegistry& funcRegistry,
101102
const TString& cluster) {
102103
NYql::NNodes::TExprBase expr(root);
103104
auto maybeWrite = expr.Maybe<NYql::NNodes::TCoWrite>();
@@ -135,6 +136,7 @@ namespace {
135136

136137
auto typeTransformer = NYql::TTransformationPipeline(&typeCtx)
137138
.AddServiceTransformers()
139+
.AddExpressionEvaluation(funcRegistry)
138140
.AddPreTypeAnnotation()
139141
.AddIOAnnotation()
140142
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(cluster, sessionCtx->TablesPtr(), typeCtx, sessionCtx->ConfigPtr()))
@@ -428,6 +430,7 @@ TPrepareRewriteInfo PrepareRewrite(
428430
NYql::TExprContext& exprCtx,
429431
NYql::TTypeAnnotationContext& typeCtx,
430432
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
433+
const NMiniKQL::IFunctionRegistry& funcRegistry,
431434
const TString& cluster) {
432435
// CREATE TABLE AS statement can be used only with perstatement execution.
433436
// Thus we assume that there is only one such statement. (it was checked in CheckRewrite)
@@ -440,7 +443,7 @@ TPrepareRewriteInfo PrepareRewrite(
440443
});
441444
YQL_ENSURE(createTableAsNode);
442445

443-
return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, cluster);
446+
return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, funcRegistry, cluster);
444447
}
445448

446449
TVector<NYql::TExprNode::TPtr> RewriteExpression(

ydb/core/kqp/host/kqp_statement_rewrite.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ TPrepareRewriteInfo PrepareRewrite(
2525
NYql::TExprContext& exprCtx,
2626
NYql::TTypeAnnotationContext& typeCtx,
2727
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
28+
const NMiniKQL::IFunctionRegistry& funcRegistry,
2829
const TString& cluster);
2930

3031
TVector<NYql::TExprNode::TPtr> RewriteExpression(

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1961,19 +1961,20 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
19611961
return Process();
19621962
}
19631963

1964-
bool Prepare(const ui64 txId, NWilson::TTraceId traceId) {
1964+
bool Prepare(std::optional<NWilson::TTraceId> traceId) {
19651965
UpdateTracingState("Commit", std::move(traceId));
19661966
OperationStartTime = TInstant::Now();
19671967

19681968
CA_LOG_D("Start prepare for distributed commit");
19691969
YQL_ENSURE(State == EState::WRITING);
1970+
YQL_ENSURE(!NeedToFlushBeforeCommit);
19701971
State = EState::PREPARING;
19711972
for (auto& [_, queue] : DataQueues) {
19721973
YQL_ENSURE(queue.empty());
19731974
}
1974-
TxId = txId;
1975+
YQL_ENSURE(TxId);
19751976
for (auto& [_, info] : WriteInfos) {
1976-
info.WriteTableActor->SetPrepare(txId);
1977+
info.WriteTableActor->SetPrepare(*TxId);
19771978
}
19781979
Close();
19791980
if (!Process()) {
@@ -2424,8 +2425,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
24242425
TxManager->StartExecute();
24252426
ImmediateCommit(std::move(ev->TraceId));
24262427
} else {
2427-
TxManager->StartPrepare();
2428-
Prepare(ev->Get()->TxId, std::move(ev->TraceId));
2428+
AFL_ENSURE(ev->Get()->TxId);
2429+
TxId = ev->Get()->TxId;
2430+
if (NeedToFlushBeforeCommit) {
2431+
Flush(std::move(ev->TraceId));
2432+
} else {
2433+
TxManager->StartPrepare();
2434+
Prepare(std::move(ev->TraceId));
2435+
}
24292436
}
24302437
}
24312438

@@ -2805,6 +2812,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
28052812
UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId());
28062813
OnOperationFinished(Counters->BufferActorFlushLatencyHistogram);
28072814
State = EState::WRITING;
2815+
AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit
2816+
NeedToFlushBeforeCommit = false;
2817+
if (TxId) {
2818+
TxManager->StartPrepare();
2819+
Prepare(std::nullopt);
2820+
return;
2821+
}
2822+
28082823
Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{
28092824
BuildStats()
28102825
});
@@ -2846,8 +2861,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
28462861
ReplyErrorAndDieImpl(statusCode, std::move(issues));
28472862
}
28482863

2849-
void UpdateTracingState(const char* name, NWilson::TTraceId traceId) {
2850-
BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(traceId),
2864+
void UpdateTracingState(const char* name, std::optional<NWilson::TTraceId> traceId) {
2865+
if (!traceId) {
2866+
return;
2867+
}
2868+
BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(*traceId),
28512869
name, NWilson::EFlags::AUTO_END);
28522870
if (BufferWriteActorStateSpan.GetTraceId() != BufferWriteActorSpan.GetTraceId()) {
28532871
BufferWriteActorStateSpan.Link(BufferWriteActorSpan.GetTraceId());
@@ -2929,6 +2947,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
29292947

29302948
EState State;
29312949
bool HasError = false;
2950+
bool NeedToFlushBeforeCommit = false;
29322951
THashMap<TPathId, std::queue<TBufferWriteMessage>> DataQueues;
29332952

29342953
struct TAckMessage {

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ class TKqpQueryState : public TNonCopyable {
348348
return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery());
349349
}
350350

351-
bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx, const bool canUseVolatileTx) {
351+
bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx) {
352352
const auto& phyQuery = PreparedQuery->GetPhysicalQuery();
353353
if (!Commit) {
354354
return false;
@@ -376,7 +376,7 @@ class TKqpQueryState : public TNonCopyable {
376376
}
377377

378378
if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
379-
if (tx && tx->GetHasEffects() && (!HasSinkInsert(tx) || canUseVolatileTx)) {
379+
if (tx && tx->GetHasEffects()) {
380380
YQL_ENSURE(tx->ResultsSize() == 0);
381381
// commit can be applied to the last transaction with effects
382382
return CurrentTx + 1 == phyQuery.TransactionsSize();

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -950,10 +950,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
950950
}
951951

952952
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
953-
QueryState->TxCtx->ApplyPhysicalQuery(
954-
phyQuery,
955-
CanUseVolatileTx(),
956-
QueryState->Commit);
953+
QueryState->TxCtx->ApplyPhysicalQuery(phyQuery, QueryState->Commit);
957954
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
958955
EKikimrQueryType::Dml);
959956
if (!success) {
@@ -1195,7 +1192,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
11951192
ExecutePartitioned(tx);
11961193
} else if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) {
11971194
ExecuteDeferredEffectsImmediately(tx);
1198-
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx, CanUseVolatileTx()); commit || tx) {
1195+
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) {
11991196
ExecutePhyTx(tx, commit);
12001197
} else {
12011198
ReplySuccess();
@@ -2900,12 +2897,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
29002897
}
29012898
}
29022899

2903-
bool CanUseVolatileTx() const {
2904-
return AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions()
2905-
&& !QueryState->TxCtx->TopicOperations.HasOperations()
2906-
&& !QueryState->TxCtx->HasOlapTable;
2907-
}
2908-
29092900
private:
29102901
TActorId Owner;
29112902
TKqpQueryCachePtr QueryCache;

ydb/core/kqp/ut/effects/kqp_effects_ut.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,52 @@ Y_UNIT_TEST_SUITE(KqpEffects) {
525525
UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 3);
526526
}
527527

528+
Y_UNIT_TEST_TWIN(EmptyUpdate, UseSink) {
529+
NKikimrConfig::TAppConfig appConfig;
530+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
531+
auto settings = TKikimrSettings()
532+
.SetAppConfig(appConfig)
533+
.SetWithSampleTables(false);
534+
TKikimrRunner kikimr(settings);
535+
auto db = kikimr.GetTableClient();
536+
auto session = db.CreateSession().GetValueSync().GetSession();
537+
538+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE);
539+
540+
{
541+
auto schemeResult = session.ExecuteSchemeQuery(R"(
542+
--!syntax_v1
543+
CREATE TABLE T1 (
544+
Key Uint32,
545+
Value Uint32,
546+
Timestamp Timestamp,
547+
PRIMARY KEY (Key)
548+
);
549+
CREATE TABLE T2 (
550+
Key Uint32,
551+
Value Uint32,
552+
PRIMARY KEY (Key)
553+
);
554+
)").ExtractValueSync();
555+
UNIT_ASSERT_VALUES_EQUAL_C(schemeResult.GetStatus(), EStatus::SUCCESS, schemeResult.GetIssues().ToString());
556+
}
557+
Cerr << "!!!UPDATE TABLE" << Endl;
558+
{
559+
auto result = session.ExecuteDataQuery(R"(
560+
--!syntax_v1
561+
$data = SELECT 1u AS Key, 1u AS Value;
562+
UPDATE T1 ON SELECT Key, Value FROM $data;
563+
DELETE FROM T2 WHERE Key = 1;
564+
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
565+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
566+
}
567+
Cerr << "!!!DROP TABLE" << Endl;
568+
{
569+
auto schemeResult = session.DropTable("/Root/T1").ExtractValueSync();
570+
UNIT_ASSERT_VALUES_EQUAL_C(schemeResult.GetStatus(), EStatus::SUCCESS, schemeResult.GetIssues().ToString());
571+
}
572+
}
573+
528574
Y_UNIT_TEST_TWIN(AlterDuringUpsertTransaction, UseSink) {
529575
NKikimrConfig::TAppConfig appConfig;
530576
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);

0 commit comments

Comments
 (0)