Skip to content

Commit 59d5eed

Browse files
committed
Fix snapshots (#20652)
1 parent 243a4e6 commit 59d5eed

File tree

3 files changed

+271
-12
lines changed

3 files changed

+271
-12
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
174174
bool hasEffects = false;
175175
bool hasStreamLookup = false;
176176
bool hasSinkWrite = false;
177+
bool hasSinkInsert = false;
177178

178179
for (const auto &tx : physicalQuery.GetTransactions()) {
179180
switch (tx.GetType()) {
@@ -193,6 +194,18 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
193194
for (const auto &stage : tx.GetStages()) {
194195
hasSinkWrite |= !stage.GetSinks().empty();
195196

197+
for (const auto &sink : stage.GetSinks()) {
198+
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink
199+
&& sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>())
200+
{
201+
NKikimrKqp::TKqpTableSinkSettings sinkSettings;
202+
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&sinkSettings));
203+
if (sinkSettings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) {
204+
hasSinkInsert = true;
205+
}
206+
}
207+
}
208+
196209
for (const auto &input : stage.GetInputs()) {
197210
hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup;
198211
}
@@ -228,19 +241,15 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
228241
return true;
229242
}
230243
// ReadOnly transaction here
231-
} else {
232-
// We don't want snapshot when there are effects at the moment,
233-
// because it hurts performance when there are multiple single-shard
234-
// reads and a single distributed commit. Taking snapshot costs
235-
// similar to an additional distributed transaction, and it's very
236-
// hard to predict when that happens, causing performance
237-
// degradation.
238-
if (hasEffects) {
239-
return false;
240-
}
241244
}
242245

243-
YQL_ENSURE(!hasEffects && !hasStreamLookup);
246+
if (hasSinkInsert && readPhases > 0) {
247+
YQL_ENSURE(hasEffects);
248+
// Insert operations create new read phases,
249+
// so in presence of other reads we have to acquire snapshot.
250+
// This is unique to INSERT operation, because it can fail.
251+
return true;
252+
}
244253

245254
// We need snapshot when there are multiple table read phases, most
246255
// likely it involves multiple tables and we would have to use a

ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/core/kqp/gateway/kqp_metadata_loader.h>
88
#include <ydb/core/kqp/host/kqp_host_impl.h>
99
#include <ydb/core/tx/data_events/events.h>
10+
#include <ydb/core/tx/datashard/datashard.h>
1011

1112
#include <ydb-cpp-sdk/client/proto/accessor.h>
1213
#include <ydb-cpp-sdk/client/table/table.h>
@@ -283,6 +284,9 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) {
283284
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
284285
UNIT_ASSERT(evWrite->Record.OperationsSize() == 0);
285286
UNIT_ASSERT(evWrite->Record.GetLocks().GetLocks().size() != 0);
287+
UNIT_ASSERT(evWrite->Record.GetLocks().GetOp() == NKikimrDataEvents::TKqpLocks::Rollback);
288+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() == 0);
289+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() == 0);
286290
writes.emplace_back(ev.Release());
287291
return TTestActorRuntime::EEventAction::DROP;
288292
}
@@ -322,6 +326,252 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) {
322326
runtime.Send(ev.release());
323327
}
324328

329+
auto result = runtime.WaitFuture(future);
330+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
331+
}
332+
}
333+
334+
Y_UNIT_TEST(TestSnapshotIfInsertRead) {
335+
NKikimrConfig::TAppConfig appConfig;
336+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
337+
338+
auto setting = NKikimrKqp::TKqpSetting();
339+
TKikimrSettings settings;
340+
settings.SetAppConfig(appConfig);
341+
settings.SetUseRealThreads(false);
342+
TKikimrRunner kikimr(settings);
343+
auto db = kikimr.GetTableClient();
344+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
345+
auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
346+
347+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
348+
NYdb::NTable::TExecDataQuerySettings execSettings;
349+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
350+
351+
{
352+
const TString query(Q1_(R"(
353+
SELECT Ensure("ok", false, "error") FROM `/Root/KeyValue2` WHERE Key = "10u";
354+
355+
INSERT INTO `/Root/KeyValue` (Key, Value) VALUES (10u, "test");
356+
)"));
357+
358+
std::vector<std::unique_ptr<IEventHandle>> writes;
359+
360+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
361+
if (writes.empty() && ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) {
362+
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
363+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() != 0);
364+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() != 0);
365+
writes.emplace_back(ev.Release());
366+
return TTestActorRuntime::EEventAction::DROP;
367+
}
368+
369+
return TTestActorRuntime::EEventAction::PROCESS;
370+
};
371+
372+
TDispatchOptions opts;
373+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
374+
return writes.size() > 0;
375+
});
376+
377+
runtime.SetObserverFunc(grab);
378+
379+
auto future = kikimr.RunInThreadPool([&]{
380+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
381+
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
382+
});
383+
384+
runtime.DispatchEvents(opts);
385+
UNIT_ASSERT(writes.size() > 0);
386+
387+
{
388+
const TString upsertQuery(Q1_(R"(
389+
INSERT INTO `/Root/KeyValue` (Key, Value) VALUES (10u, "other");
390+
INSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("10u", "other");
391+
)"));
392+
393+
auto upsertResult = kikimr.RunCall([&]{
394+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
395+
return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync();
396+
});
397+
398+
UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString());
399+
}
400+
401+
402+
for(auto& ev: writes) {
403+
runtime.Send(ev.release());
404+
}
405+
406+
auto result = runtime.WaitFuture(future);
407+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
408+
}
409+
}
410+
411+
Y_UNIT_TEST(TestSecondaryIndexWithoutSnapshot) {
412+
NKikimrConfig::TAppConfig appConfig;
413+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
414+
415+
auto setting = NKikimrKqp::TKqpSetting();
416+
TKikimrSettings settings;
417+
settings.SetAppConfig(appConfig);
418+
settings.SetUseRealThreads(false);
419+
TKikimrRunner kikimr(settings);
420+
auto db = kikimr.GetTableClient();
421+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
422+
auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
423+
424+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
425+
NYdb::NTable::TExecDataQuerySettings execSettings;
426+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
427+
428+
kikimr.RunCall([&]{ CreateSampleTablesWithIndex(session, false /* no need in table data */); return true; });
429+
430+
{
431+
const TString query(Q1_(R"(
432+
INSERT INTO `/Root/SecondaryKeys` (Key, Fk, Value) VALUES (10, 10, "test");
433+
)"));
434+
435+
bool hasWrite = false;
436+
437+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
438+
if (ev->GetTypeRewrite() == NKikimr::NEvents::TDataEvents::TEvWrite::EventType) {
439+
auto* evWrite = ev->Get<NKikimr::NEvents::TDataEvents::TEvWrite>();
440+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetStep() == 0);
441+
UNIT_ASSERT(evWrite->Record.GetMvccSnapshot().GetTxId() == 0);
442+
hasWrite = true;
443+
}
444+
445+
return TTestActorRuntime::EEventAction::PROCESS;
446+
};
447+
448+
TDispatchOptions opts;
449+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
450+
return hasWrite;
451+
});
452+
453+
runtime.SetObserverFunc(grab);
454+
455+
auto future = kikimr.RunInThreadPool([&]{
456+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
457+
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
458+
});
459+
460+
auto result = runtime.WaitFuture(future);
461+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
462+
463+
464+
runtime.DispatchEvents(opts);
465+
UNIT_ASSERT(hasWrite);
466+
}
467+
}
468+
469+
Y_UNIT_TEST_TWIN(TestSnapshotWithDependentReads, UseSink) {
470+
NKikimrConfig::TAppConfig appConfig;
471+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
472+
473+
auto setting = NKikimrKqp::TKqpSetting();
474+
TKikimrSettings settings;
475+
settings.SetAppConfig(appConfig);
476+
settings.SetUseRealThreads(false);
477+
TKikimrRunner kikimr(settings);
478+
auto db = kikimr.GetTableClient();
479+
auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
480+
auto upsertSession = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); });
481+
482+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
483+
NYdb::NTable::TExecDataQuerySettings execSettings;
484+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
485+
486+
{
487+
const TString upsertQuery(Q1_(R"(
488+
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1u, "One");
489+
UPSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("One", "expected");
490+
)"));
491+
492+
auto upsertResult = kikimr.RunCall([&]{
493+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
494+
return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync();
495+
});
496+
497+
UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString());
498+
}
499+
500+
{
501+
const TString query(Q1_(R"(
502+
$cnt1 = SELECT Value FROM `/Root/KeyValue` WHERE Key = 1u;
503+
SELECT Ensure("ok", $cnt1="One", "first error");
504+
505+
$cnt2 = SELECT Value FROM `/Root/KeyValue2` WHERE Key = $cnt1;
506+
SELECT Ensure("ok", $cnt2="expected", "second error");
507+
508+
UPSERT INTO KeyValueLargePartition (Key, Value) VALUES
509+
(1000u, "test");
510+
)"));
511+
512+
std::vector<std::unique_ptr<IEventHandle>> reads;
513+
bool hasRead = false;
514+
bool allowAllReads = false;
515+
bool hasResult = false;
516+
517+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
518+
if (ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvRead::EventType) {
519+
auto* evRead = ev->Get<NKikimr::TEvDataShard::TEvRead>();
520+
UNIT_ASSERT(evRead->Record.GetSnapshot().GetStep() != 0);
521+
UNIT_ASSERT(evRead->Record.GetSnapshot().GetTxId() != 0);
522+
}
523+
if (!allowAllReads && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvRead::EventType) {
524+
// Block second read
525+
if (hasRead) {
526+
reads.emplace_back(ev.Release());
527+
return TTestActorRuntime::EEventAction::DROP;
528+
} else {
529+
hasRead = true;
530+
}
531+
} else if (!allowAllReads && ev->GetTypeRewrite() == NKikimr::TEvDataShard::TEvReadResult::EventType) {
532+
hasResult = true;
533+
return TTestActorRuntime::EEventAction::PROCESS;
534+
}
535+
536+
537+
return TTestActorRuntime::EEventAction::PROCESS;
538+
};
539+
540+
TDispatchOptions opts;
541+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
542+
return reads.size() > 0 && hasResult;
543+
});
544+
545+
runtime.SetObserverFunc(grab);
546+
547+
auto future = kikimr.RunInThreadPool([&]{
548+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
549+
return session.ExecuteDataQuery(query, txc, execSettings).ExtractValueSync();
550+
});
551+
552+
runtime.DispatchEvents(opts);
553+
UNIT_ASSERT(reads.size() > 0 && hasResult);
554+
555+
{
556+
const TString upsertQuery(Q1_(R"(
557+
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1u, "not expected");
558+
UPSERT INTO `/Root/KeyValue2` (Key, Value) VALUES ("One", "not expected");
559+
)"));
560+
561+
auto upsertResult = kikimr.RunCall([&]{
562+
auto txc = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
563+
return upsertSession.ExecuteDataQuery(upsertQuery, txc, execSettings).ExtractValueSync();
564+
});
565+
566+
UNIT_ASSERT_VALUES_EQUAL_C(upsertResult.GetStatus(), EStatus::SUCCESS, upsertResult.GetIssues().ToString());
567+
}
568+
569+
allowAllReads = true;
570+
571+
for(auto& ev: reads) {
572+
runtime.Send(ev.release());
573+
}
574+
325575
auto result = runtime.WaitFuture(future);
326576
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
327577
}

ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) {
8282
result = session.ExecuteQuery(Q_(R"(
8383
SELECT * FROM `/Root/KV` WHERE Value = "New";
8484
)"), TTxControl::BeginTx(TTxSettings::OnlineRO()).CommitTx()).ExtractValueSync();
85-
UNIT_ASSERT(result.IsSuccess());
85+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
8686
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
8787

8888
auto commitResult = tx.Commit().ExtractValueSync();

0 commit comments

Comments
 (0)