Skip to content

Commit a622789

Browse files
committed
Revert "Use uncommitted changes for INSERT without volatile commit" (#19302)
1 parent 2ca9e56 commit a622789

File tree

4 files changed

+9
-40
lines changed

4 files changed

+9
-40
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
335335
return false;
336336
}
337337

338-
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool canUseVolatileTx, const bool commit) {
338+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit) {
339339
auto getTable = [](const NKqpProto::TKqpPhyTableId& table) {
340340
return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId());
341341
};
@@ -402,7 +402,7 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
402402
NKikimrKqp::TKqpTableSinkSettings settings;
403403
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
404404
modifiedTables.insert(getTable(settings.GetTable()));
405-
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && (!commit || !canUseVolatileTx)) {
405+
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && !commit) {
406406
// INSERT with sink should be executed immediately, because it returns an error in case of duplicate rows.
407407
return true;
408408
}
@@ -415,21 +415,5 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
415415
return false;
416416
}
417417

418-
bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx) {
419-
for (const auto &stage : tx->GetStages()) {
420-
for (const auto& sink : stage.GetSinks()) {
421-
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
422-
YQL_ENSURE(sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>());
423-
NKikimrKqp::TKqpTableSinkSettings settings;
424-
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
425-
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) {
426-
return true;
427-
}
428-
}
429-
}
430-
}
431-
return false;
432-
}
433-
434418
} // namespace NKqp
435419
} // namespace NKikimr

ydb/core/kqp/common/kqp_tx.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,7 @@ class TShardIdToTableInfo {
165165
};
166166
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;
167167

168-
bool HasUncommittedChangesRead(
169-
THashSet<NKikimr::TTableId>& modifiedTables,
170-
const NKqpProto::TKqpPhyQuery& physicalQuery,
171-
const bool canUseVolatileTx,
172-
const bool commit);
168+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit);
173169

174170
class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
175171
public:
@@ -325,9 +321,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
325321
return true;
326322
}
327323

328-
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool canUseVolatileTx, const bool commit) {
324+
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool commit) {
329325
NeedUncommittedChangesFlush = (DeferredEffects.Size() > kMaxDeferredEffects)
330-
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, canUseVolatileTx, commit);
326+
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
331327
if (NeedUncommittedChangesFlush) {
332328
ModifiedTablesSinceLastFlush.clear();
333329
}
@@ -532,6 +528,4 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
532528
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
533529
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
534530

535-
bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx);
536-
537531
} // namespace NKikimr::NKqp

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ class TKqpQueryState : public TNonCopyable {
348348
return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery());
349349
}
350350

351-
bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx, const bool canUseVolatileTx) {
351+
bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx) {
352352
const auto& phyQuery = PreparedQuery->GetPhysicalQuery();
353353
if (!Commit) {
354354
return false;
@@ -376,7 +376,7 @@ class TKqpQueryState : public TNonCopyable {
376376
}
377377

378378
if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
379-
if (tx && tx->GetHasEffects() && (!HasSinkInsert(tx) || canUseVolatileTx)) {
379+
if (tx && tx->GetHasEffects()) {
380380
YQL_ENSURE(tx->ResultsSize() == 0);
381381
// commit can be applied to the last transaction with effects
382382
return CurrentTx + 1 == phyQuery.TransactionsSize();

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -950,10 +950,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
950950
}
951951

952952
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
953-
QueryState->TxCtx->ApplyPhysicalQuery(
954-
phyQuery,
955-
CanUseVolatileTx(),
956-
QueryState->Commit);
953+
QueryState->TxCtx->ApplyPhysicalQuery(phyQuery, QueryState->Commit);
957954
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
958955
EKikimrQueryType::Dml);
959956
if (!success) {
@@ -1195,7 +1192,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
11951192
ExecutePartitioned(tx);
11961193
} else if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) {
11971194
ExecuteDeferredEffectsImmediately(tx);
1198-
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx, CanUseVolatileTx()); commit || tx) {
1195+
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) {
11991196
ExecutePhyTx(tx, commit);
12001197
} else {
12011198
ReplySuccess();
@@ -2900,12 +2897,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
29002897
}
29012898
}
29022899

2903-
bool CanUseVolatileTx() const {
2904-
return AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions()
2905-
&& !QueryState->TxCtx->TopicOperations.HasOperations()
2906-
&& !QueryState->TxCtx->HasOlapTable;
2907-
}
2908-
29092900
private:
29102901
TActorId Owner;
29112902
TKqpQueryCachePtr QueryCache;

0 commit comments

Comments
 (0)