Skip to content

Commit 383217d

Browse files
committed
Use uncommitted changes for INSERT without volatile commit (#19133)
1 parent 3c4fc2c commit 383217d

File tree

5 files changed

+176
-9
lines changed

5 files changed

+176
-9
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
335335
return false;
336336
}
337337

338-
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit) {
338+
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool canUseVolatileTx, const bool commit) {
339339
auto getTable = [](const NKqpProto::TKqpPhyTableId& table) {
340340
return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId());
341341
};
@@ -402,7 +402,7 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
402402
NKikimrKqp::TKqpTableSinkSettings settings;
403403
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
404404
modifiedTables.insert(getTable(settings.GetTable()));
405-
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && !commit) {
405+
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && (!commit || !canUseVolatileTx)) {
406406
// INSERT with sink should be executed immediately, because it returns an error in case of duplicate rows.
407407
return true;
408408
}
@@ -415,5 +415,21 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
415415
return false;
416416
}
417417

418+
bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx) {
419+
for (const auto &stage : tx->GetStages()) {
420+
for (const auto& sink : stage.GetSinks()) {
421+
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
422+
YQL_ENSURE(sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>());
423+
NKikimrKqp::TKqpTableSinkSettings settings;
424+
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
425+
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) {
426+
return true;
427+
}
428+
}
429+
}
430+
}
431+
return false;
432+
}
433+
418434
} // namespace NKqp
419435
} // namespace NKikimr

ydb/core/kqp/common/kqp_tx.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,11 @@ class TShardIdToTableInfo {
165165
};
166166
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;
167167

168-
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit);
168+
bool HasUncommittedChangesRead(
169+
THashSet<NKikimr::TTableId>& modifiedTables,
170+
const NKqpProto::TKqpPhyQuery& physicalQuery,
171+
const bool canUseVolatileTx,
172+
const bool commit);
169173

170174
class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
171175
public:
@@ -321,9 +325,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
321325
return true;
322326
}
323327

324-
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool commit) {
328+
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool canUseVolatileTx, const bool commit) {
325329
NeedUncommittedChangesFlush = (DeferredEffects.Size() > kMaxDeferredEffects)
326-
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
330+
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, canUseVolatileTx, commit);
327331
if (NeedUncommittedChangesFlush) {
328332
ModifiedTablesSinceLastFlush.clear();
329333
}
@@ -528,4 +532,6 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
528532
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
529533
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
530534

535+
bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx);
536+
531537
} // namespace NKikimr::NKqp

ydb/core/kqp/session_actor/kqp_query_state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ class TKqpQueryState : public TNonCopyable {
348348
return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery());
349349
}
350350

351-
bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx) {
351+
bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx, const bool canUseVolatileTx) {
352352
const auto& phyQuery = PreparedQuery->GetPhysicalQuery();
353353
if (!Commit) {
354354
return false;
@@ -376,7 +376,7 @@ class TKqpQueryState : public TNonCopyable {
376376
}
377377

378378
if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
379-
if (tx && tx->GetHasEffects()) {
379+
if (tx && tx->GetHasEffects() && (!HasSinkInsert(tx) || canUseVolatileTx)) {
380380
YQL_ENSURE(tx->ResultsSize() == 0);
381381
// commit can be applied to the last transaction with effects
382382
return CurrentTx + 1 == phyQuery.TransactionsSize();

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -950,7 +950,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
950950
}
951951

952952
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
953-
QueryState->TxCtx->ApplyPhysicalQuery(phyQuery, QueryState->Commit);
953+
QueryState->TxCtx->ApplyPhysicalQuery(
954+
phyQuery,
955+
CanUseVolatileTx(),
956+
QueryState->Commit);
954957
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
955958
EKikimrQueryType::Dml);
956959
if (!success) {
@@ -1192,7 +1195,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
11921195
ExecutePartitioned(tx);
11931196
} else if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) {
11941197
ExecuteDeferredEffectsImmediately(tx);
1195-
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) {
1198+
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx, CanUseVolatileTx()); commit || tx) {
11961199
ExecutePhyTx(tx, commit);
11971200
} else {
11981201
ReplySuccess();
@@ -2897,6 +2900,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
28972900
}
28982901
}
28992902

2903+
bool CanUseVolatileTx() const {
2904+
return AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions()
2905+
&& !QueryState->TxCtx->TopicOperations.HasOperations()
2906+
&& !QueryState->TxCtx->HasOlapTable;
2907+
}
2908+
29002909
private:
29012910
TActorId Owner;
29022911
TKqpQueryCachePtr QueryCache;

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6034,6 +6034,142 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
60346034
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
60356035
}
60366036
}
6037+
6038+
Y_UNIT_TEST(TableSinkHtapInsert) {
6039+
NKikimrConfig::TAppConfig appConfig;
6040+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
6041+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
6042+
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true);
6043+
auto settings = TKikimrSettings()
6044+
.SetAppConfig(appConfig)
6045+
.SetWithSampleTables(false);
6046+
TKikimrRunner kikimr(settings);
6047+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
6048+
6049+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
6050+
6051+
const TString query = R"(
6052+
CREATE TABLE `/Root/ColumnShard` (
6053+
Col1 Uint32 NOT NULL,
6054+
Col2 String NOT NULL,
6055+
PRIMARY KEY (Col1)
6056+
)
6057+
PARTITION BY HASH(Col1)
6058+
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64);
6059+
6060+
CREATE TABLE `/Root/DataShard` (
6061+
Col1 Uint32 NOT NULL,
6062+
Col2 String NOT NULL,
6063+
PRIMARY KEY (Col1)
6064+
)
6065+
WITH (UNIFORM_PARTITIONS = 64, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64);
6066+
)";
6067+
6068+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
6069+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
6070+
6071+
auto client = kikimr.GetQueryClient();
6072+
6073+
{
6074+
auto result = client.ExecuteQuery(R"(
6075+
INSERT INTO `/Root/DataShard` (Col1, Col2) VALUES
6076+
(10u, "test1");
6077+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
6078+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
6079+
}
6080+
6081+
{
6082+
auto session = client.GetSession().GetValueSync().GetSession();
6083+
auto result = session.ExecuteQuery(R"(
6084+
INSERT INTO `/Root/ColumnShard` (Col1, Col2) VALUES
6085+
(1u, "test1"), (2u, "test2"), (4294967280u, "test3"), (4294967286u, "test");
6086+
)", NYdb::NQuery::TTxControl::BeginTx()).ExtractValueSync();
6087+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
6088+
6089+
auto tx = result.GetTransaction();
6090+
UNIT_ASSERT(tx);
6091+
6092+
// non-volatile tx commit
6093+
auto result2 = session.ExecuteQuery(R"(
6094+
INSERT INTO `/Root/DataShard` (Col1, Col2) VALUES
6095+
(10u, "test1"), (20u, "test2"), (4294967280u, "test3"), (4294967286u, "test");
6096+
)", NYdb::NQuery::TTxControl::Tx(*tx).CommitTx()).ExtractValueSync();
6097+
UNIT_ASSERT_VALUES_EQUAL_C(result2.GetStatus(), EStatus::PRECONDITION_FAILED, result2.GetIssues().ToString());
6098+
}
6099+
6100+
{
6101+
auto result = client.ExecuteQuery(R"(
6102+
SELECT COUNT(*) FROM `/Root/DataShard`;
6103+
SELECT COUNT(*) FROM `/Root/ColumnShard`;
6104+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
6105+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
6106+
CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(0)));
6107+
CompareYson(R"([[0u]])", FormatResultSetYson(result.GetResultSet(1)));
6108+
}
6109+
}
6110+
6111+
Y_UNIT_TEST(NonVolatileTxInsert) {
6112+
NKikimrConfig::TAppConfig appConfig;
6113+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
6114+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
6115+
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true);
6116+
appConfig.MutableFeatureFlags()->SetEnableDataShardVolatileTransactions(false);
6117+
auto settings = TKikimrSettings()
6118+
.SetAppConfig(appConfig)
6119+
.SetWithSampleTables(false);
6120+
TKikimrRunner kikimr(settings);
6121+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
6122+
6123+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
6124+
6125+
const TString query = R"(
6126+
CREATE TABLE `/Root/DataShard` (
6127+
Col1 Uint32 NOT NULL,
6128+
Col2 String NOT NULL,
6129+
PRIMARY KEY (Col1)
6130+
)
6131+
WITH (UNIFORM_PARTITIONS = 64, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 64);
6132+
)";
6133+
6134+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
6135+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
6136+
6137+
auto client = kikimr.GetQueryClient();
6138+
6139+
{
6140+
auto result = client.ExecuteQuery(R"(
6141+
INSERT INTO `/Root/DataShard` (Col1, Col2) VALUES
6142+
(10u, "test1");
6143+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
6144+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
6145+
}
6146+
6147+
{
6148+
// non-volatile tx commit
6149+
auto result = client.ExecuteQuery(R"(
6150+
INSERT INTO `/Root/DataShard` (Col1, Col2) VALUES
6151+
(10u, "test1"), (20u, "test2"), (4294967280u, "test3"), (4294967286u, "test");
6152+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
6153+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
6154+
}
6155+
6156+
{
6157+
auto result = client.ExecuteQuery(R"(
6158+
SELECT COUNT(*) FROM `/Root/DataShard`;
6159+
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES
6160+
(10u, "test1"), (20u, "test2"), (4294967280u, "test3"), (4294967286u, "test");
6161+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
6162+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
6163+
}
6164+
6165+
{
6166+
auto result = client.ExecuteQuery(R"(
6167+
SELECT COUNT(*) FROM `/Root/DataShard`;
6168+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
6169+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
6170+
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0)));
6171+
}
6172+
}
60376173
}
60386174

60396175
} // namespace NKqp

0 commit comments

Comments
 (0)