Skip to content

Commit 1e497ce

Browse files
authored
Fix snapshots (#20652)
1 parent f29415d commit 1e497ce

File tree

3 files changed

+271
-12
lines changed

3 files changed

+271
-12
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
174174
bool hasEffects = false;
175175
bool hasStreamLookup = false;
176176
bool hasSinkWrite = false;
177+
bool hasSinkInsert = false;
177178

178179
for (const auto &tx : physicalQuery.GetTransactions()) {
179180
switch (tx.GetType()) {
@@ -193,6 +194,18 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
193194
for (const auto &stage : tx.GetStages()) {
194195
hasSinkWrite |= !stage.GetSinks().empty();
195196

197+
for (const auto &sink : stage.GetSinks()) {
198+
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink
199+
&& sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>())
200+
{
201+
NKikimrKqp::TKqpTableSinkSettings sinkSettings;
202+
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&sinkSettings));
203+
if (sinkSettings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) {
204+
hasSinkInsert = true;
205+
}
206+
}
207+
}
208+
196209
for (const auto &input : stage.GetInputs()) {
197210
hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup;
198211
}
@@ -228,19 +241,15 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
228241
return true;
229242
}
230243
// ReadOnly transaction here
231-
} else {
232-
// We don't want snapshot when there are effects at the moment,
233-
// because it hurts performance when there are multiple single-shard
234-
// reads and a single distributed commit. Taking snapshot costs
235-
// similar to an additional distributed transaction, and it's very
236-
// hard to predict when that happens, causing performance
237-
// degradation.
238-
if (hasEffects) {
239-
return false;
240-
}
241244
}
242245

243-
YQL_ENSURE(!hasEffects && !hasStreamLookup);
246+
if (hasSinkInsert && readPhases > 0) {
247+
YQL_ENSURE(hasEffects);
248+
// Insert operations create new read phases,
249+
// so in presence of other reads we have to acquire snapshot.
250+
// This is unique to INSERT operation, because it can fail.
251+
return true;
252+
}
244253

245254
// We need snapshot when there are multiple table read phases, most
246255
// likely it involves multiple tables and we would have to use a

ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/core/kqp/gateway/kqp_metadata_loader.h>
88
#include <ydb/core/kqp/host/kqp_host_impl.h>
99
#include <ydb/core/tx/data_events/events.h>
10+
#include <ydb/core/tx/datashard/datashard.h>
1011

1112
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
1213
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
@@ -290,6 +291,9 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) {
290291
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
291292
UNIT_ASSERT(evWrite->Record.OperationsSize() == 0);
292293
UNIT_ASSERT(evWrite->Record.GetLocks().GetLocks().size() != 0);
294+
UNIT_ASSERT(evWrite->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Rollback);
295+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() == 0);
296+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() == 0);
293297
writes.emplace_back(ev.Release());
294298
return TTestActorRuntime::EEventAction::DROP;
295299
}
@@ -332,6 +336,252 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) {
332336
runtime.Send(ev.release());
333337
}
334338

339+
auto result = runtime.WaitFuture(future);
340+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
341+
}
342+
}
343+
344+
Y_UNIT_TEST(TestSnapshotIfInsertRead) {
345+
NKikimrConfig::TAppConfig appConfig;
346+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
347+
348+
auto setting = NKikimrKqp::TKqpSetting();
349+
TKikimrSettings settings;
350+
settings.SetAppConfig(appConfig);
351+
settings.SetUseRealThreads(false);
352+
TKikimrRunner kikimr(settings);
353+
auto db = kikimr.GetTableClient();
354+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
355+
auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
356+
357+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
358+
NYdb::NTable::TExecDataQuerySettings execSettings;
359+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
360+
361+
{
362+
const TString query(Q1_(R"(
363+
SELECT Ensure("ok", false, "error") FROM `/Root/KeyValue2` WHERE Key = "10u";
364+
365+
INSERT INTO `/Root/KeyValue` (Key, Value) VALUES (10u, "test");
366+
)"));
367+
368+
std::vector<std::unique_ptr<IEventHandle>> writes;
369+
370+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
371+
if (writes.empty() && ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) {
372+
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
373+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() != 0);
374+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() != 0);
375+
writes.emplace_back(ev.Release());
376+
return TTestActorRuntime::EEventAction::DROP;
377+
}
378+
379+
return TTestActorRuntime::EEventAction::PROCESS;
380+
};
381+
382+
TDispatchOptions opts;
383+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
384+
return writes.size() > 0;
385+
});
386+
387+
runtime.SetObserverFunc(grab);
388+
389+
auto future = kikimr.RunInThreadPool([&]{
390+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
391+
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
392+
});
393+
394+
runtime.DispatchEvents(opts);
395+
UNIT_ASSERT(writes.size() > 0);
396+
397+
{
398+
const TString upsertQuery(Q1_(R"(
399+
INSERT INTO `/Root/KeyValue` (Key, Value) VALUES (10u, "other");
400+
INSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("10u", "other");
401+
)"));
402+
403+
auto upsertResult = kikimr.RunCall([&]{
404+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
405+
return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync();
406+
});
407+
408+
UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString());
409+
}
410+
411+
412+
for(auto& ev: writes) {
413+
runtime.Send(ev.release());
414+
}
415+
416+
auto result = runtime.WaitFuture(future);
417+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
418+
}
419+
}
420+
421+
Y_UNIT_TEST(TestSecondaryIndexWithoutSnapshot) {
422+
NKikimrConfig::TAppConfig appConfig;
423+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
424+
425+
auto setting = NKikimrKqp::TKqpSetting();
426+
TKikimrSettings settings;
427+
settings.SetAppConfig(appConfig);
428+
settings.SetUseRealThreads(false);
429+
TKikimrRunner kikimr(settings);
430+
auto db = kikimr.GetTableClient();
431+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
432+
auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
433+
434+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
435+
NYdb::NTable::TExecDataQuerySettings execSettings;
436+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
437+
438+
kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; });
439+
440+
{
441+
const TString query(Q1_(R"(
442+
INSERT INTO `/Root/SecondaryKeys` (Key, Fk, Value) VALUES (10, 10, "test");
443+
)"));
444+
445+
bool hasWrite = false;
446+
447+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
448+
if (ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) {
449+
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
450+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() == 0);
451+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() == 0);
452+
hasWrite = true;
453+
}
454+
455+
return TTestActorRuntime::EEventAction::PROCESS;
456+
};
457+
458+
TDispatchOptions opts;
459+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
460+
return hasWrite;
461+
});
462+
463+
runtime.SetObserverFunc(grab);
464+
465+
auto future = kikimr.RunInThreadPool([&]{
466+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
467+
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
468+
});
469+
470+
auto result = runtime.WaitFuture(future);
471+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
472+
473+
474+
runtime.DispatchEvents(opts);
475+
UNIT_ASSERT(hasWrite);
476+
}
477+
}
478+
479+
Y_UNIT_TEST_TWIN(TestSnapshotWithDependentReads, UseSink) {
480+
NKikimrConfig::TAppConfig appConfig;
481+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
482+
483+
auto setting = NKikimrKqp::TKqpSetting();
484+
TKikimrSettings settings;
485+
settings.SetAppConfig(appConfig);
486+
settings.SetUseRealThreads(false);
487+
TKikimrRunner kikimr(settings);
488+
auto db = kikimr.GetTableClient();
489+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
490+
auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
491+
492+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
493+
NYdb::NTable::TExecDataQuerySettings execSettings;
494+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
495+
496+
{
497+
const TString upsertQuery(Q1_(R"(
498+
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1u, "One");
499+
UPSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("One", "expected");
500+
)"));
501+
502+
auto upsertResult = kikimr.RunCall([&]{
503+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
504+
return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync();
505+
});
506+
507+
UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString());
508+
}
509+
510+
{
511+
const TString query(Q1_(R"(
512+
$cnt1 = SELECT Value FROM `/Root/KeyValue` WHERE Key = 1u;
513+
SELECT Ensure("ok", $cnt1="One", "first error");
514+
515+
$cnt2 = SELECT Value FROM `/Root/KeyValue2` WHERE Key = $cnt1;
516+
SELECT Ensure("ok", $cnt2="expected", "second error");
517+
518+
UPSERT INTO KeyValueLargePartition (Key, Value) VALUES
519+
(1000u, "test");
520+
)"));
521+
522+
std::vector<std::unique_ptr<IEventHandle>> reads;
523+
bool hasRead = false;
524+
bool allowAllReads = false;
525+
bool hasResult = false;
526+
527+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
528+
if (ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvRead::EventType) {
529+
auto* evRead = ev->Get<NKikimr::TEvDataShard::TEvRead>();
530+
UNIT_ASSERT(evRead->Record.GetSnapshot().GetStep() != 0);
531+
UNIT_ASSERT(evRead->Record.GetSnapshot().GetTxId() != 0);
532+
}
533+
if (!allowAllReads && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvRead::EventType) {
534+
// Block second read
535+
if (hasRead) {
536+
reads.emplace_back(ev.Release());
537+
return TTestActorRuntime::EEventAction::DROP;
538+
} else {
539+
hasRead = true;
540+
}
541+
} else if (!allowAllReads && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvReadResult::EventType) {
542+
hasResult = true;
543+
return TTestActorRuntime::EEventAction::PROCESS;
544+
}
545+
546+
547+
return TTestActorRuntime::EEventAction::PROCESS;
548+
};
549+
550+
TDispatchOptions opts;
551+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
552+
return reads.size() > 0 && hasResult;
553+
});
554+
555+
runtime.SetObserverFunc(grab);
556+
557+
auto future = kikimr.RunInThreadPool([&]{
558+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
559+
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
560+
});
561+
562+
runtime.DispatchEvents(opts);
563+
UNIT_ASSERT(reads.size() > 0 && hasResult);
564+
565+
{
566+
const TString upsertQuery(Q1_(R"(
567+
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1u, "not expected");
568+
UPSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("One", "not expected");
569+
)"));
570+
571+
auto upsertResult = kikimr.RunCall([&]{
572+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
573+
return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync();
574+
});
575+
576+
UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString());
577+
}
578+
579+
allowAllReads = true;
580+
581+
for(auto& ev: reads) {
582+
runtime.Send(ev.release());
583+
}
584+
335585
auto result = runtime.WaitFuture(future);
336586
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
337587
}

ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) {
8282
result = session.ExecuteQuery(Q_(R"(
8383
SELECT * FROM `/Root/KV` WHERE Value = "New";
8484
)"), TTxControl::BeginTx(TTxSettings::OnlineRO()).CommitTx()).ExtractValueSync();
85-
UNIT_ASSERT(result.IsSuccess());
85+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
8686
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
8787

8888
auto commitResult = tx.Commit().ExtractValueSync();

0 commit comments

Comments
 (0)