Skip to content

Stable-25-1-2 Fix snapshots #20707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: stable-25-1-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
bool hasEffects = false;
bool hasStreamLookup = false;
bool hasSinkWrite = false;
bool hasSinkInsert = false;

for (const auto &tx : physicalQuery.GetTransactions()) {
switch (tx.GetType()) {
Expand All @@ -193,6 +194,18 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
for (const auto &stage : tx.GetStages()) {
hasSinkWrite |= !stage.GetSinks().empty();

for (const auto &sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink
&& sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>())
{
NKikimrKqp::TKqpTableSinkSettings sinkSettings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&sinkSettings));
if (sinkSettings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) {
hasSinkInsert = true;
}
}
}

for (const auto &input : stage.GetInputs()) {
hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup;
}
Expand Down Expand Up @@ -228,19 +241,15 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
return true;
}
// ReadOnly transaction here
} else {
// We don't want snapshot when there are effects at the moment,
// because it hurts performance when there are multiple single-shard
// reads and a single distributed commit. Taking snapshot costs
// similar to an additional distributed transaction, and it's very
// hard to predict when that happens, causing performance
// degradation.
if (hasEffects) {
return false;
}
}

YQL_ENSURE(!hasEffects && !hasStreamLookup);
if (hasSinkInsert && readPhases > 0) {
YQL_ENSURE(hasEffects);
// Insert operations create new read phases,
// so in presence of other reads we have to acquire snapshot.
// This is unique to INSERT operation, because it can fail.
return true;
}

// We need snapshot when there are multiple table read phases, most
// likely it involves multiple tables and we would have to use a
Expand Down
250 changes: 250 additions & 0 deletions ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <ydb/core/kqp/gateway/kqp_metadata_loader.h>
#include <ydb/core/kqp/host/kqp_host_impl.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/datashard/datashard.h>

#include <ydb-cpp-sdk/client/proto/accessor.h>
#include <ydb-cpp-sdk/client/table/table.h>
Expand Down Expand Up @@ -283,6 +284,9 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) {
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
UNIT_ASSERT(evWrite->Record.OperationsSize() == 0);
UNIT_ASSERT(evWrite->Record.GetLocks().GetLocks().size() != 0);
UNIT_ASSERT(evWrite->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Rollback);
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() == 0);
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() == 0);
writes.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
Expand Down Expand Up @@ -322,6 +326,252 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) {
runtime.Send(ev.release());
}

auto result = runtime.WaitFuture(future);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}

Y_UNIT_TEST(TestSnapshotIfInsertRead) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);

auto setting = NKikimrKqp::TKqpSetting();
TKikimrSettings settings;
settings.SetAppConfig(appConfig);
settings.SetUseRealThreads(false);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });

auto& runtime = *kikimr.GetTestServer().GetRuntime();
NYdb::NTable::TExecDataQuerySettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);

{
const TString query(Q1_(R"(
SELECT Ensure("ok", false, "error") FROM `/Root/KeyValue2` WHERE Key = "10u";

INSERT INTO `/Root/KeyValue` (Key, Value) VALUES (10u, "test");
)"));

std::vector<std::unique_ptr<IEventHandle>> writes;

auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
if (writes.empty() && ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) {
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() != 0);
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() != 0);
writes.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}

return TTestActorRuntime::EEventAction::PROCESS;
};

TDispatchOptions opts;
opts.FinalEvents.emplace_back([&](IEventHandle&) {
return writes.size() > 0;
});

runtime.SetObserverFunc(grab);

auto future = kikimr.RunInThreadPool([&]{
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
});

runtime.DispatchEvents(opts);
UNIT_ASSERT(writes.size() > 0);

{
const TString upsertQuery(Q1_(R"(
INSERT INTO `/Root/KeyValue` (Key, Value) VALUES (10u, "other");
INSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("10u", "other");
)"));

auto upsertResult = kikimr.RunCall([&]{
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync();
});

UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString());
}


for(auto& ev: writes) {
runtime.Send(ev.release());
}

auto result = runtime.WaitFuture(future);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
}
}

Y_UNIT_TEST(TestSecondaryIndexWithoutSnapshot) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);

auto setting = NKikimrKqp::TKqpSetting();
TKikimrSettings settings;
settings.SetAppConfig(appConfig);
settings.SetUseRealThreads(false);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });

auto& runtime = *kikimr.GetTestServer().GetRuntime();
NYdb::NTable::TExecDataQuerySettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);

kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; });

{
const TString query(Q1_(R"(
INSERT INTO `/Root/SecondaryKeys` (Key, Fk, Value) VALUES (10, 10, "test");
)"));

bool hasWrite = false;

auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
if (ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) {
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() == 0);
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() == 0);
hasWrite = true;
}

return TTestActorRuntime::EEventAction::PROCESS;
};

TDispatchOptions opts;
opts.FinalEvents.emplace_back([&](IEventHandle&) {
return hasWrite;
});

runtime.SetObserverFunc(grab);

auto future = kikimr.RunInThreadPool([&]{
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
});

auto result = runtime.WaitFuture(future);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());


runtime.DispatchEvents(opts);
UNIT_ASSERT(hasWrite);
}
}

Y_UNIT_TEST_TWIN(TestSnapshotWithDependentReads, UseSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);

auto setting = NKikimrKqp::TKqpSetting();
TKikimrSettings settings;
settings.SetAppConfig(appConfig);
settings.SetUseRealThreads(false);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });

auto& runtime = *kikimr.GetTestServer().GetRuntime();
NYdb::NTable::TExecDataQuerySettings execSettings;
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);

{
const TString upsertQuery(Q1_(R"(
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1u, "One");
UPSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("One", "expected");
)"));

auto upsertResult = kikimr.RunCall([&]{
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync();
});

UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString());
}

{
const TString query(Q1_(R"(
$cnt1 = SELECT Value FROM `/Root/KeyValue` WHERE Key = 1u;
SELECT Ensure("ok", $cnt1="One", "first error");

$cnt2 = SELECT Value FROM `/Root/KeyValue2` WHERE Key = $cnt1;
SELECT Ensure("ok", $cnt2="expected", "second error");

UPSERT INTO KeyValueLargePartition (Key, Value) VALUES
(1000u, "test");
)"));

std::vector<std::unique_ptr<IEventHandle>> reads;
bool hasRead = false;
bool allowAllReads = false;
bool hasResult = false;

auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
if (ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvRead::EventType) {
auto* evRead = ev->Get<NKikimr::TEvDataShard::TEvRead>();
UNIT_ASSERT(evRead->Record.GetSnapshot().GetStep() != 0);
UNIT_ASSERT(evRead->Record.GetSnapshot().GetTxId() != 0);
}
if (!allowAllReads && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvRead::EventType) {
// Block second read
if (hasRead) {
reads.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
} else {
hasRead = true;
}
} else if (!allowAllReads && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvReadResult::EventType) {
hasResult = true;
return TTestActorRuntime::EEventAction::PROCESS;
}


return TTestActorRuntime::EEventAction::PROCESS;
};

TDispatchOptions opts;
opts.FinalEvents.emplace_back([&](IEventHandle&) {
return reads.size() > 0 && hasResult;
});

runtime.SetObserverFunc(grab);

auto future = kikimr.RunInThreadPool([&]{
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
});

runtime.DispatchEvents(opts);
UNIT_ASSERT(reads.size() > 0 && hasResult);

{
const TString upsertQuery(Q1_(R"(
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1u, "not expected");
UPSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("One", "not expected");
)"));

auto upsertResult = kikimr.RunCall([&]{
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync();
});

UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString());
}

allowAllReads = true;

for(auto& ev: reads) {
runtime.Send(ev.release());
}

auto result = runtime.WaitFuture(future);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) {
result = session.ExecuteQuery(Q_(R"(
SELECT * FROM `/Root/KV` WHERE Value = "New";
)"), TTxControl::BeginTx(TTxSettings::OnlineRO()).CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));

auto commitResult = tx.Commit().ExtractValueSync();
Expand Down
Loading