Skip to content

Commit 4b451ac

Browse files
committed
Fix unexpected unique constraint violation errors from datashards (#20514)
1 parent 8e5812b commit 4b451ac

10 files changed

+243
-18
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
10281028
ActorIdToProto(BufferActorId, settings.MutableBufferActorId());
10291029
}
10301030
if (!settings.GetInconsistentTx()
1031-
&& TasksGraph.GetMeta().LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION
10321031
&& GetSnapshot().IsValid()) {
10331032
settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step);
10341033
settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId);

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,14 @@ namespace {
137137
transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard);
138138
}
139139
}
140+
141+
std::optional<NKikimrDataEvents::TMvccSnapshot> GetOptionalMvccSnapshot(const NKikimrKqp::TKqpTableSinkSettings& settings) {
142+
if (settings.HasMvccSnapshot()) {
143+
return settings.GetMvccSnapshot();
144+
} else {
145+
return std::nullopt;
146+
}
147+
}
140148
}
141149

142150

@@ -948,6 +956,9 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
948956

949957
if (LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION) {
950958
YQL_ENSURE(MvccSnapshot);
959+
}
960+
961+
if (MvccSnapshot) {
951962
*evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
952963
}
953964
}
@@ -979,6 +990,20 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
979990
Counters->WriteActorImmediateWritesRetries->Inc();
980991
}
981992

993+
if (isPrepare && MvccSnapshot) {
994+
bool needMvccSnapshot = false;
995+
for (const auto& operation : evWrite->Record.GetOperations()) {
996+
if (operation.GetType() == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT) {
997+
// This operation may fail with an incorrect unique constraint violation otherwise
998+
needMvccSnapshot = true;
999+
break;
1000+
}
1001+
}
1002+
if (needMvccSnapshot) {
1003+
*evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
1004+
}
1005+
}
1006+
9821007
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWrite->Record.GetTxId(), shardId, TlsActivationContext->AsActorContext(), "WriteActor");
9831008

9841009
CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", isPrepare=" << isPrepare << ", isImmediateCommit=" << isImmediateCommit << ", TxId=" << evWrite->Record.GetTxId()
@@ -1378,7 +1403,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
13781403
Settings.GetIsOlap(),
13791404
std::move(keyColumnTypes),
13801405
Alloc,
1381-
Settings.GetMvccSnapshot(),
1406+
GetOptionalMvccSnapshot(Settings),
13821407
Settings.GetLockMode(),
13831408
nullptr,
13841409
TActorId{},
@@ -3090,7 +3115,7 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
30903115
.LockTxId = Settings.GetLockTxId(),
30913116
.LockNodeId = Settings.GetLockNodeId(),
30923117
.InconsistentTx = Settings.GetInconsistentTx(),
3093-
.MvccSnapshot = Settings.GetMvccSnapshot(),
3118+
.MvccSnapshot = GetOptionalMvccSnapshot(Settings),
30943119
.LockMode = Settings.GetLockMode(),
30953120
},
30963121
.Priority = Settings.GetPriority(),

ydb/core/tx/datashard/datashard_pipeline.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,7 +1686,8 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
16861686
{
16871687
const auto& rec = ev->Get()->Record;
16881688
TBasicOpInfo info(rec.GetTxId(), EOperationKind::WriteTx, NEvWrite::TConvertor::GetProposeFlags(rec.GetTxMode()), 0, receivedAt, tieBreakerIndex);
1689-
if (rec.HasMvccSnapshot()) {
1689+
// Uncommitted writes are performed over a consistent mvcc snapshot
1690+
if (rec.HasMvccSnapshot() && rec.GetLockTxId()) {
16901691
info.SetMvccSnapshot(TRowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId()),
16911692
rec.GetMvccSnapshot().GetRepeatableRead());
16921693
}
@@ -1700,9 +1701,9 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
17001701
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
17011702
};
17021703

1703-
if (rec.HasMvccSnapshot() && !rec.GetLockTxId()) {
1704+
if (rec.GetLockMode() != NKikimrDataEvents::OPTIMISTIC) {
17041705
badRequest(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST,
1705-
"MvccSnapshot without LockTxId is not implemented");
1706+
"Only OPTIMISTIC lock mode is currently implemented");
17061707
return writeOp;
17071708
}
17081709

ydb/core/tx/datashard/datashard_user_db.cpp

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,19 @@ NTable::EReady TDataShardUserDb::SelectRow(
3636

3737
SetPerformedUserReads(true);
3838

39-
return Db.Select(tid, key, tags, row, stats, /* readFlags */ 0,
39+
NTable::EReady ready = Db.Select(tid, key, tags, row, stats, /* readFlags */ 0,
4040
MvccVersion,
4141
GetReadTxMap(tableId),
4242
GetReadTxObserver(tableId));
43+
44+
if (stats.InvisibleRowSkips > 0) {
45+
if (LockTxId) {
46+
Self.SysLocksTable().BreakSetLocks();
47+
}
48+
MvccReadConflict = true;
49+
}
50+
51+
return ready;
4352
}
4453

4554
NTable::EReady TDataShardUserDb::SelectRow(
@@ -439,11 +448,12 @@ class TReadTxObserver: public NTable::ITransactionObserver {
439448
// We already use InvisibleRowSkips for these
440449
}
441450

442-
void OnApplyCommitted(const TRowVersion&) override {
443-
// Not needed
451+
void OnApplyCommitted(const TRowVersion& rowVersion) override {
452+
ConflictChecker.CheckReadConflict(rowVersion);
444453
}
445454

446-
void OnApplyCommitted(const TRowVersion&, ui64 txId) override {
455+
void OnApplyCommitted(const TRowVersion& rowVersion, ui64 txId) override {
456+
ConflictChecker.CheckReadConflict(rowVersion);
447457
ConflictChecker.CheckReadDependency(txId);
448458
}
449459

@@ -756,6 +766,7 @@ NTable::ITransactionObserverPtr TDataShardUserDb::GetReadTxObserver(const TTable
756766
Y_ABORT_UNLESS(localTableId != 0, "Unexpected GetReadTxObserver for an unknown table");
757767

758768
bool needObserver = (
769+
SnapshotVersion < MvccVersion ||
759770
// We need observer when there are waiting changes in the tx map
760771
Self.GetVolatileTxManager().GetTxMap() ||
761772
// We need observer for locked reads when there are active write locks
@@ -789,16 +800,23 @@ void TDataShardUserDb::AddReadConflict(ui64 txId) {
789800
}
790801

791802
void TDataShardUserDb::CheckReadConflict(const TRowVersion& rowVersion) {
792-
Y_ABORT_UNLESS(LockTxId);
793-
794803
if (rowVersion > MvccVersion) {
795804
// We are reading from snapshot at MvccVersion and should not normally
796805
// observe changes with a version above that. However, if we have an
797806
// uncommitted change, that we fake as committed for our own changes
798807
// visibility, we might shadow some change that happened after a
799808
// snapshot. This is a clear indication of a conflict between read
800809
// and that future conflict, hence we must break locks and abort.
801-
Self.SysLocksTable().BreakSetLocks();
810+
if (LockTxId) {
811+
Self.SysLocksTable().BreakSetLocks();
812+
}
813+
MvccReadConflict = true;
814+
} else if (rowVersion > SnapshotVersion) {
815+
// During commit we read at the current mvcc version, however we may
816+
// notice there have been changes between the snapshot and current
817+
// commit version. This is not necessarily an error, but indicates
818+
// if this read was performed under a lock it would have been broken.
819+
SnapshotReadConflict = true;
802820
}
803821
}
804822

ydb/core/tx/datashard/datashard_user_db.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,25 @@ class TDataShardUserDb final
216216

217217
YDB_ACCESSOR_DEF(bool, PerformedUserReads);
218218

219+
// Becomes true when user-visible reads detect changes over MvccVersion, i.e.
220+
// if we would have performed this read under a lock, it would have been broken.
221+
YDB_READONLY(bool, MvccReadConflict, false);
222+
223+
// At commit time we have MvccVersion equal to the commit version, however
224+
// when transaction has a snapshot it should behave as if all reads are
225+
// performed at the snapshot version. This snapshot version is not used
226+
// for reads (we optimistically read from commit version at commit time to
227+
// minimize conflicts), however encountering errors which prevent the
228+
// transaction from committing having conflicts with the snapshot indicate
229+
// it should behave as if an imaginary lock was broken instread.
230+
YDB_ACCESSOR(TRowVersion, SnapshotVersion, TRowVersion::Max());
231+
// Becomes true when reads detect there have been committed changes between
232+
// the snapshot version and the commit version.
233+
YDB_READONLY(bool, SnapshotReadConflict, false);
234+
// Becomes true when writes detect there have been committed changes between
235+
// the snapshot version and the commit version.
236+
YDB_READONLY(bool, SnapshotWriteConflict, false);
237+
219238
NMiniKQL::TEngineHostCounters& Counters;
220239
};
221240

ydb/core/tx/datashard/datashard_ut_common_kqp.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ namespace NKqpHelpers {
224224
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */));
225225
}
226226

227-
inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
228-
auto response = AwaitResponse(runtime, KqpSimpleSendCommit(runtime, sessionId, txId, query));
227+
inline TString KqpSimpleWaitCommit(TTestActorRuntime& runtime, NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> future) {
228+
auto response = AwaitResponse(runtime, std::move(future));
229229
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
230230
return TStringBuilder() << "ERROR: " << response.operation().status();
231231
}
@@ -235,6 +235,10 @@ namespace NKqpHelpers {
235235
return FormatResult(result);
236236
}
237237

238+
inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
239+
return KqpSimpleWaitCommit(runtime, KqpSimpleSendCommit(runtime, sessionId, txId, query));
240+
}
241+
238242
inline Ydb::Table::ExecuteSchemeQueryRequest MakeSchemeRequestRPC(
239243
const TString& sql, const TString& sessionId)
240244
{

ydb/core/tx/datashard/datashard_ut_write.cpp

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1778,5 +1778,146 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
17781778
"{ items { int32_value: 11 } items { int32_value: 12 } items { int32_value: 12 } items { int32_value: 12 } }");
17791779
}
17801780

1781+
Y_UNIT_TEST(WriteUniqueRowsInsertDuplicateBeforeCommit) {
1782+
TPortManager pm;
1783+
NKikimrConfig::TAppConfig app;
1784+
app.MutableTableServiceConfig()->SetEnableOltpSink(true);
1785+
TServerSettings serverSettings(pm.GetPort(2134));
1786+
serverSettings.SetDomainName("Root")
1787+
.SetUseRealThreads(false)
1788+
.SetAppConfig(app);
1789+
1790+
auto [runtime, server, sender] = TestCreateServer(serverSettings);
1791+
1792+
TDisableDataShardLogBatching disableDataShardLogBatching;
1793+
1794+
UNIT_ASSERT_VALUES_EQUAL(
1795+
KqpSchemeExec(runtime, R"(
1796+
CREATE TABLE `/Root/counts` (key int, rows int, PRIMARY KEY (key));
1797+
CREATE TABLE `/Root/rows` (key int, subkey int, value int, PRIMARY KEY (key, subkey));
1798+
)"),
1799+
"SUCCESS"
1800+
);
1801+
1802+
ExecSQL(server, sender, R"(
1803+
UPSERT INTO `/Root/counts` (key, rows) VALUES
1804+
(42, 2);
1805+
UPSERT INTO `/Root/rows` (key, subkey, value) VALUES
1806+
(42, 1, 101),
1807+
(42, 2, 102);
1808+
)");
1809+
1810+
TString sessionId1, txId1;
1811+
UNIT_ASSERT_VALUES_EQUAL(
1812+
KqpSimpleBegin(runtime, sessionId1, txId1, R"(
1813+
UPDATE `/Root/counts` SET rows = rows + 1 WHERE key = 42;
1814+
SELECT rows FROM `/Root/counts` WHERE key = 42;
1815+
)"),
1816+
"{ items { int32_value: 3 } }"
1817+
);
1818+
1819+
TString sessionId2, txId2;
1820+
UNIT_ASSERT_VALUES_EQUAL(
1821+
KqpSimpleBegin(runtime, sessionId2, txId2, R"(
1822+
UPDATE `/Root/counts` SET rows = rows + 1 WHERE key = 42;
1823+
SELECT rows FROM `/Root/counts` WHERE key = 42;
1824+
)"),
1825+
"{ items { int32_value: 3 } }"
1826+
);
1827+
1828+
UNIT_ASSERT_VALUES_EQUAL(
1829+
KqpSimpleCommit(runtime, sessionId2, txId2, R"(
1830+
INSERT INTO `/Root/rows` (key, subkey, value) VALUES
1831+
(42, 3, 203);
1832+
)"),
1833+
"<empty>"
1834+
);
1835+
1836+
UNIT_ASSERT_VALUES_EQUAL(
1837+
KqpSimpleContinue(runtime, sessionId1, txId1, R"(
1838+
INSERT INTO `/Root/rows` (key, subkey, value) VALUES
1839+
(42, 3, 303);
1840+
)"),
1841+
"ERROR: ABORTED"
1842+
);
1843+
}
1844+
1845+
Y_UNIT_TEST(WriteUniqueRowsInsertDuplicateAtCommit) {
1846+
TPortManager pm;
1847+
NKikimrConfig::TAppConfig app;
1848+
app.MutableTableServiceConfig()->SetEnableOltpSink(true);
1849+
TServerSettings serverSettings(pm.GetPort(2134));
1850+
serverSettings.SetDomainName("Root")
1851+
.SetUseRealThreads(false)
1852+
.SetAppConfig(app);
1853+
1854+
auto [runtime, server, sender] = TestCreateServer(serverSettings);
1855+
1856+
TDisableDataShardLogBatching disableDataShardLogBatching;
1857+
1858+
UNIT_ASSERT_VALUES_EQUAL(
1859+
KqpSchemeExec(runtime, R"(
1860+
CREATE TABLE `/Root/counts` (key int, rows int, PRIMARY KEY (key));
1861+
CREATE TABLE `/Root/rows` (key int, subkey int, value int, PRIMARY KEY (key, subkey));
1862+
)"),
1863+
"SUCCESS"
1864+
);
1865+
1866+
ExecSQL(server, sender, R"(
1867+
UPSERT INTO `/Root/counts` (key, rows) VALUES
1868+
(42, 2);
1869+
UPSERT INTO `/Root/rows` (key, subkey, value) VALUES
1870+
(42, 1, 101),
1871+
(42, 2, 102);
1872+
)");
1873+
1874+
TString sessionId1, txId1;
1875+
UNIT_ASSERT_VALUES_EQUAL(
1876+
KqpSimpleBegin(runtime, sessionId1, txId1, R"(
1877+
UPDATE `/Root/counts` SET rows = rows + 1 WHERE key = 42;
1878+
SELECT rows FROM `/Root/counts` WHERE key = 42;
1879+
)"),
1880+
"{ items { int32_value: 3 } }"
1881+
);
1882+
1883+
TString sessionId2, txId2;
1884+
UNIT_ASSERT_VALUES_EQUAL(
1885+
KqpSimpleBegin(runtime, sessionId2, txId2, R"(
1886+
UPDATE `/Root/counts` SET rows = rows + 1 WHERE key = 42;
1887+
SELECT rows FROM `/Root/counts` WHERE key = 42;
1888+
)"),
1889+
"{ items { int32_value: 3 } }"
1890+
);
1891+
1892+
UNIT_ASSERT_VALUES_EQUAL(
1893+
KqpSimpleCommit(runtime, sessionId2, txId2, R"(
1894+
INSERT INTO `/Root/rows` (key, subkey, value) VALUES
1895+
(42, 3, 203);
1896+
)"),
1897+
"<empty>"
1898+
);
1899+
1900+
TBlockEvents<NEvents::TDataEvents::TEvWriteResult> blockedLocksBroken(runtime, [&](auto& ev) {
1901+
auto* msg = ev->Get();
1902+
if (msg->Record.GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN) {
1903+
return true;
1904+
}
1905+
return false;
1906+
});
1907+
1908+
auto commitFuture = KqpSimpleSendCommit(runtime, sessionId1, txId1, R"(
1909+
INSERT INTO `/Root/rows` (key, subkey, value) VALUES
1910+
(42, 3, 303);
1911+
)");
1912+
1913+
runtime.SimulateSleep(TDuration::MilliSeconds(1));
1914+
blockedLocksBroken.Stop().Unblock();
1915+
1916+
UNIT_ASSERT_VALUES_EQUAL(
1917+
KqpSimpleWaitCommit(runtime, std::move(commitFuture)),
1918+
"ERROR: ABORTED"
1919+
);
1920+
}
1921+
17811922
} // Y_UNIT_TEST_SUITE(DataShardWrite)
17821923
} // namespace NKikimr

ydb/core/tx/datashard/datashard_write_operation.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, ui64 globalTxId, TInstant
5757
LockNodeId = record.GetLockNodeId();
5858
}
5959

60+
if (record.HasMvccSnapshot()) {
61+
MvccSnapshot.emplace(record.GetMvccSnapshot().GetStep(), record.GetMvccSnapshot().GetTxId());
62+
}
63+
6064
OverloadSubscribe = record.HasOverloadSubscribe() ? record.GetOverloadSubscribe() : std::optional<ui64>{};
6165

6266
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;

ydb/core/tx/datashard/datashard_write_operation.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
131131
YDB_READONLY_DEF(TInstant, ReceivedAt);
132132
YDB_READONLY_DEF(std::optional<ui64>, OverloadSubscribe);
133133
YDB_READONLY_DEF(bool, MvccSnapshotRead);
134+
YDB_READONLY_DEF(std::optional<TRowVersion>, MvccSnapshot);
134135

135136
YDB_READONLY_DEF(ui64, TxSize);
136137

0 commit comments

Comments
 (0)