Skip to content

Commit 20ba22b

Browse files
azevaykinnikvas0
authored andcommitted
If there are no commitTxIds we should clear AwaitingDecisions (#18997)
1 parent 900a13d commit 20ba22b

File tree

4 files changed

+52
-5
lines changed

4 files changed

+52
-5
lines changed

ydb/core/kqp/ut/effects/kqp_effects_ut.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,52 @@ Y_UNIT_TEST_SUITE(KqpEffects) {
525525
UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 3);
526526
}
527527

528+
Y_UNIT_TEST_TWIN(EmptyUpdate, UseSink) {
529+
NKikimrConfig::TAppConfig appConfig;
530+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
531+
auto settings = TKikimrSettings()
532+
.SetAppConfig(appConfig)
533+
.SetWithSampleTables(false);
534+
TKikimrRunner kikimr(settings);
535+
auto db = kikimr.GetTableClient();
536+
auto session = db.CreateSession().GetValueSync().GetSession();
537+
538+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE);
539+
540+
{
541+
auto schemeResult = session.ExecuteSchemeQuery(R"(
542+
--!syntax_v1
543+
CREATE TABLE T1 (
544+
Key Uint32,
545+
Value Uint32,
546+
Timestamp Timestamp,
547+
PRIMARY KEY (Key)
548+
);
549+
CREATE TABLE T2 (
550+
Key Uint32,
551+
Value Uint32,
552+
PRIMARY KEY (Key)
553+
);
554+
)").ExtractValueSync();
555+
UNIT_ASSERT_VALUES_EQUAL_C(schemeResult.GetStatus(), EStatus::SUCCESS, schemeResult.GetIssues().ToString());
556+
}
557+
Cerr << "!!!UPDATE TABLE" << Endl;
558+
{
559+
auto result = session.ExecuteDataQuery(R"(
560+
--!syntax_v1
561+
$data = SELECT 1u AS Key, 1u AS Value;
562+
UPDATE T1 ON SELECT Key, Value FROM $data;
563+
DELETE FROM T2 WHERE Key = 1;
564+
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
565+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
566+
}
567+
Cerr << "!!!DROP TABLE" << Endl;
568+
{
569+
auto schemeResult = session.DropTable("/Root/T1").ExtractValueSync();
570+
UNIT_ASSERT_VALUES_EQUAL_C(schemeResult.GetStatus(), EStatus::SUCCESS, schemeResult.GetIssues().ToString());
571+
}
572+
}
573+
528574
Y_UNIT_TEST_TWIN(AlterDuringUpsertTransaction, UseSink) {
529575
NKikimrConfig::TAppConfig appConfig;
530576
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);

ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
348348
// Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies
349349
// Such transactions would have no participants and become immediately committed
350350
auto commitTxIds = dataTx->GetVolatileCommitTxIds();
351-
if (commitTxIds) {
351+
if (commitTxIds || isArbiter) {
352352
TVector<ui64> participants(awaitingDecisions.begin(), awaitingDecisions.end());
353353
DataShard.GetVolatileTxManager().PersistAddVolatileTx(
354354
txId,
@@ -360,6 +360,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
360360
dataTx->GetVolatileCommitOrdered(),
361361
isArbiter,
362362
txc);
363+
} else {
364+
awaitingDecisions.clear();
363365
}
364366

365367
if (dataTx->GetPerformedUserReads()) {

ydb/core/tx/datashard/execute_write_unit.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ class TExecuteWriteUnit : public TExecutionUnit {
433433
// Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies
434434
// Such transactions would have no participants and become immediately committed
435435
auto commitTxIds = userDb.GetVolatileCommitTxIds();
436-
if (commitTxIds) {
436+
if (commitTxIds || isArbiter) {
437437
TVector<ui64> participants(awaitingDecisions.begin(), awaitingDecisions.end());
438438
DataShard.GetVolatileTxManager().PersistAddVolatileTx(
439439
userDb.GetVolatileTxId(),
@@ -446,6 +446,8 @@ class TExecuteWriteUnit : public TExecutionUnit {
446446
isArbiter,
447447
txc
448448
);
449+
} else {
450+
awaitingDecisions.clear();
449451
}
450452

451453
if (userDb.GetPerformedUserReads()) {

ydb/core/tx/datashard/volatile_tx.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -519,9 +519,6 @@ namespace NKikimr::NDataShard {
519519
{
520520
using Schema = TDataShard::Schema;
521521

522-
Y_VERIFY_S(!commitTxIds.empty(),
523-
"Unexpected volatile txId# " << txId << " @" << version << " without commits");
524-
525522
auto res = VolatileTxs.insert(
526523
std::make_pair(txId, std::make_unique<TVolatileTxInfo>()));
527524
Y_VERIFY_S(res.second, "Cannot add volatile txId# " << txId << " @" << version

0 commit comments

Comments
 (0)