diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index f6022c112c52..b1ca942a79cd 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -174,6 +174,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig bool hasEffects = false; bool hasStreamLookup = false; bool hasSinkWrite = false; + bool hasSinkInsert = false; for (const auto &tx : physicalQuery.GetTransactions()) { switch (tx.GetType()) { @@ -193,6 +194,18 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig for (const auto &stage : tx.GetStages()) { hasSinkWrite |= !stage.GetSinks().empty(); + for (const auto &sink : stage.GetSinks()) { + if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink + && sink.GetInternalSink().GetSettings().Is()) + { + NKikimrKqp::TKqpTableSinkSettings sinkSettings; + YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&sinkSettings)); + if (sinkSettings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) { + hasSinkInsert = true; + } + } + } + for (const auto &input : stage.GetInputs()) { hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup; } @@ -228,19 +241,15 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig return true; } // ReadOnly transaction here - } else { - // We don't want snapshot when there are effects at the moment, - // because it hurts performance when there are multiple single-shard - // reads and a single distributed commit. Taking snapshot costs - // similar to an additional distributed transaction, and it's very - // hard to predict when that happens, causing performance - // degradation. - if (hasEffects) { - return false; - } } - YQL_ENSURE(!hasEffects && !hasStreamLookup); + if (hasSinkInsert && readPhases > 0) { + YQL_ENSURE(hasEffects); + // Insert operations create new read phases, + // so in presence of other reads we have to acquire snapshot. + // This is unique to INSERT operation, because it can fail. + return true; + } // We need snapshot when there are multiple table read phases, most // likely it involves multiple tables and we would have to use a diff --git a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp index d93c66afd6d4..330ed536489f 100644 --- a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -283,6 +284,9 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) { auto* evWrite = ev->Get(); UNIT_ASSERT(evWrite->Record.OperationsSize() == 0); UNIT_ASSERT(evWrite->Record.GetLocks().GetLocks().size() != 0); + UNIT_ASSERT(evWrite->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Rollback); + UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() == 0); + UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() == 0); writes.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; } @@ -322,6 +326,252 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) { runtime.Send(ev.release()); } + auto result = runtime.WaitFuture(future); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(TestSnapshotIfInsertRead) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + + auto setting = NKikimrKqp::TKqpSetting(); + TKikimrSettings settings; + settings.SetAppConfig(appConfig); + settings.SetUseRealThreads(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + { + const TString query(Q1_(R"( + SELECT Ensure("ok", false, "error") FROM `/Root/KeyValue2` WHERE Key = "10u"; + + INSERT INTO `/Root/KeyValue` (Key, Value) VALUES (10u, "test"); + )")); + + std::vector> writes; + + auto grab = [&](TAutoPtr &ev) -> auto { + if (writes.empty() && ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) { + auto* evWrite = ev->Get(); + UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() != 0); + UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() != 0); + writes.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }; + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle&) { + return writes.size() > 0; + }); + + runtime.SetObserverFunc(grab); + + auto future = kikimr.RunInThreadPool([&]{ + auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync(); + }); + + runtime.DispatchEvents(opts); + UNIT_ASSERT(writes.size() > 0); + + { + const TString upsertQuery(Q1_(R"( + INSERT INTO `/Root/KeyValue` (Key, Value) VALUES (10u, "other"); + INSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("10u", "other"); + )")); + + auto upsertResult = kikimr.RunCall([&]{ + auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync(); + }); + + UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString()); + } + + + for(auto& ev: writes) { + runtime.Send(ev.release()); + } + + auto result = runtime.WaitFuture(future); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(TestSecondaryIndexWithoutSnapshot) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + + auto setting = NKikimrKqp::TKqpSetting(); + TKikimrSettings settings; + settings.SetAppConfig(appConfig); + settings.SetUseRealThreads(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; }); + + { + const TString query(Q1_(R"( + INSERT INTO `/Root/SecondaryKeys` (Key, Fk, Value) VALUES (10, 10, "test"); + )")); + + bool hasWrite = false; + + auto grab = [&](TAutoPtr &ev) -> auto { + if (ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) { + auto* evWrite = ev->Get(); + UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() == 0); + UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() == 0); + hasWrite = true; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }; + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle&) { + return hasWrite; + }); + + runtime.SetObserverFunc(grab); + + auto future = kikimr.RunInThreadPool([&]{ + auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync(); + }); + + auto result = runtime.WaitFuture(future); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + + runtime.DispatchEvents(opts); + UNIT_ASSERT(hasWrite); + } + } + + Y_UNIT_TEST_TWIN(TestSnapshotWithDependentReads, UseSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); + + auto setting = NKikimrKqp::TKqpSetting(); + TKikimrSettings settings; + settings.SetAppConfig(appConfig); + settings.SetUseRealThreads(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); }); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + { + const TString upsertQuery(Q1_(R"( + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1u, "One"); + UPSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("One", "expected"); + )")); + + auto upsertResult = kikimr.RunCall([&]{ + auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync(); + }); + + UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString()); + } + + { + const TString query(Q1_(R"( + $cnt1 = SELECT Value FROM `/Root/KeyValue` WHERE Key = 1u; + SELECT Ensure("ok", $cnt1="One", "first error"); + + $cnt2 = SELECT Value FROM `/Root/KeyValue2` WHERE Key = $cnt1; + SELECT Ensure("ok", $cnt2="expected", "second error"); + + UPSERT INTO KeyValueLargePartition (Key, Value) VALUES + (1000u, "test"); + )")); + + std::vector> reads; + bool hasRead = false; + bool allowAllReads = false; + bool hasResult = false; + + auto grab = [&](TAutoPtr &ev) -> auto { + if (ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvRead::EventType) { + auto* evRead = ev->Get(); + UNIT_ASSERT(evRead->Record.GetSnapshot().GetStep() != 0); + UNIT_ASSERT(evRead->Record.GetSnapshot().GetTxId() != 0); + } + if (!allowAllReads && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvRead::EventType) { + // Block second read + if (hasRead) { + reads.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } else { + hasRead = true; + } + } else if (!allowAllReads && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvReadResult::EventType) { + hasResult = true; + return TTestActorRuntime::EEventAction::PROCESS; + } + + + return TTestActorRuntime::EEventAction::PROCESS; + }; + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle&) { + return reads.size() > 0 && hasResult; + }); + + runtime.SetObserverFunc(grab); + + auto future = kikimr.RunInThreadPool([&]{ + auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync(); + }); + + runtime.DispatchEvents(opts); + UNIT_ASSERT(reads.size() > 0 && hasResult); + + { + const TString upsertQuery(Q1_(R"( + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1u, "not expected"); + UPSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("One", "not expected"); + )")); + + auto upsertResult = kikimr.RunCall([&]{ + auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync(); + }); + + UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString()); + } + + allowAllReads = true; + + for(auto& ev: reads) { + runtime.Send(ev.release()); + } + auto result = runtime.WaitFuture(future); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); } diff --git a/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp index f6cffaf66c2b..323704823e53 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp @@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) { result = session.ExecuteQuery(Q_(R"( SELECT * FROM `/Root/KV` WHERE Value = "New"; )"), TTxControl::BeginTx(TTxSettings::OnlineRO()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT(result.IsSuccess()); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); auto commitResult = tx.Commit().ExtractValueSync();