Skip to content

Commit e49b360

Browse files
authored
25-1-2: Fix unexpected unique constraint violation errors from datashards (#20548)
2 parents ed9142d + 01f9cdc commit e49b360

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+439
-262
lines changed

.github/config/muted_ya.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@ ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictReadWriteOltpNoSink
6262
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictWriteOlap
6363
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictWriteOltp
6464
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TConflictWriteOltpNoSink
65+
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TReadOnlyOlap
6566
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TReadOnlyOltp
6667
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TReadOnlyOltpNoSink
68+
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TSimpleOlap
6769
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TSimpleOltp
6870
ydb/core/kqp/ut/tx KqpSnapshotIsolation.TSimpleOltpNoSink
6971
ydb/core/persqueue/ut [*/*] chunk chunk

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
10281028
ActorIdToProto(BufferActorId, settings.MutableBufferActorId());
10291029
}
10301030
if (!settings.GetInconsistentTx()
1031-
&& TasksGraph.GetMeta().LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION
10321031
&& GetSnapshot().IsValid()) {
10331032
settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step);
10341033
settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId);

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,14 @@ namespace {
137137
transaction.AddReceivingShards(*prepareSettings.ArbiterColumnShard);
138138
}
139139
}
140+
141+
std::optional<NKikimrDataEvents::TMvccSnapshot> GetOptionalMvccSnapshot(const NKikimrKqp::TKqpTableSinkSettings& settings) {
142+
if (settings.HasMvccSnapshot()) {
143+
return settings.GetMvccSnapshot();
144+
} else {
145+
return std::nullopt;
146+
}
147+
}
140148
}
141149

142150

@@ -948,6 +956,9 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
948956

949957
if (LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION) {
950958
YQL_ENSURE(MvccSnapshot);
959+
}
960+
961+
if (MvccSnapshot) {
951962
*evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
952963
}
953964
}
@@ -979,6 +990,20 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
979990
Counters->WriteActorImmediateWritesRetries->Inc();
980991
}
981992

993+
if (isPrepare && MvccSnapshot) {
994+
bool needMvccSnapshot = false;
995+
for (const auto& operation : evWrite->Record.GetOperations()) {
996+
if (operation.GetType() == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT) {
997+
// This operation may fail with an incorrect unique constraint violation otherwise
998+
needMvccSnapshot = true;
999+
break;
1000+
}
1001+
}
1002+
if (needMvccSnapshot) {
1003+
*evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
1004+
}
1005+
}
1006+
9821007
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWrite->Record.GetTxId(), shardId, TlsActivationContext->AsActorContext(), "WriteActor");
9831008

9841009
CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", isPrepare=" << isPrepare << ", isImmediateCommit=" << isImmediateCommit << ", TxId=" << evWrite->Record.GetTxId()
@@ -1378,7 +1403,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
13781403
Settings.GetIsOlap(),
13791404
std::move(keyColumnTypes),
13801405
Alloc,
1381-
Settings.GetMvccSnapshot(),
1406+
GetOptionalMvccSnapshot(Settings),
13821407
Settings.GetLockMode(),
13831408
nullptr,
13841409
TActorId{},
@@ -3090,7 +3115,7 @@ class TKqpForwardWriteActor : public TActorBootstrapped<TKqpForwardWriteActor>,
30903115
.LockTxId = Settings.GetLockTxId(),
30913116
.LockNodeId = Settings.GetLockNodeId(),
30923117
.InconsistentTx = Settings.GetInconsistentTx(),
3093-
.MvccSnapshot = Settings.GetMvccSnapshot(),
3118+
.MvccSnapshot = GetOptionalMvccSnapshot(Settings),
30943119
.LockMode = Settings.GetLockMode(),
30953120
},
30963121
.Priority = Settings.GetPriority(),

ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ EExecutionStatus TBuildDataTxOutRSUnit::Execute(TOperation::TPtr op,
6363
TDataShardLocksDb locksDb(DataShard, txc);
6464
TSetupSysLocks guardLocks(op, DataShard, &locksDb);
6565

66-
tx->GetDataTx()->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion);
66+
tx->GetDataTx()->SetMvccVersion(DataShard.GetMvccVersion(tx));
6767
IEngineFlat *engine = tx->GetDataTx()->GetEngine();
6868
try {
6969
auto &outReadSets = op->OutReadSets();

ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ class TBuildDistributedEraseTxOutRSUnit : public TExecutionUnit {
104104

105105
const auto tags = MakeTags(condition->Tags(), eraseTx->GetIndexColumnIds());
106106
auto now = TAppData::TimeProvider->Now();
107-
auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(tx);
107+
auto mvccVersion = DataShard.GetMvccVersion(tx);
108108
NMiniKQL::TEngineHostCounters engineHostCounters;
109-
TDataShardUserDb userDb(DataShard, txc.DB, op->GetGlobalTxId(), readVersion, writeVersion, engineHostCounters, now);
109+
TDataShardUserDb userDb(DataShard, txc.DB, op->GetGlobalTxId(), mvccVersion, engineHostCounters, now);
110110
bool pageFault = false;
111111

112112
TDynBitMap confirmedRows;

ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
9191
LOG_T("Operation " << *op << " (build_kqp_data_tx_out_rs) at " << tabletId
9292
<< " set memory limit " << (txc.GetMemoryLimit() - dataTx->GetTxSize()));
9393

94-
dataTx->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion);
94+
dataTx->SetMvccVersion(DataShard.GetMvccVersion(tx));
9595

9696
if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) {
9797
auto result = KqpRunTransaction(ctx, op->GetTxId(), useGenericReadSets, tasksRunner);

ydb/core/tx/datashard/change_collector_cdc_stream.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,10 +279,10 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
279279
}
280280

281281
TMaybe<TRowState> TCdcStreamChangeCollector::GetState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
282-
TArrayRef<const TTag> valueTags, TSelectStats& stats, const TMaybe<TRowVersion>& readVersion)
282+
TArrayRef<const TTag> valueTags, TSelectStats& stats, const TMaybe<TRowVersion>& snapshot)
283283
{
284284
TRowState row;
285-
const auto ready = UserDb.SelectRow(tableId, key, valueTags, row, stats, readVersion);
285+
const auto ready = UserDb.SelectRow(tableId, key, valueTags, row, stats, snapshot);
286286

287287
if (ready == EReady::Page) {
288288
return Nothing();
@@ -292,10 +292,10 @@ TMaybe<TRowState> TCdcStreamChangeCollector::GetState(const TTableId& tableId, T
292292
}
293293

294294
TMaybe<TRowState> TCdcStreamChangeCollector::GetState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
295-
TArrayRef<const TTag> valueTags, const TMaybe<TRowVersion>& readVersion)
295+
TArrayRef<const TTag> valueTags, const TMaybe<TRowVersion>& snapshot)
296296
{
297297
TSelectStats stats;
298-
return GetState(tableId, key, valueTags, stats, readVersion);
298+
return GetState(tableId, key, valueTags, stats, snapshot);
299299
}
300300

301301
TRowState TCdcStreamChangeCollector::PatchState(const TRowState& oldState, ERowOp rop,

ydb/core/tx/datashard/change_collector_cdc_stream.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ namespace NDataShard {
88

99
class TCdcStreamChangeCollector: public TBaseChangeCollector {
1010
TMaybe<NTable::TRowState> GetState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
11-
TArrayRef<const NTable::TTag> valueTags, NTable::TSelectStats& stats, const TMaybe<TRowVersion>& readVersion = {});
11+
TArrayRef<const NTable::TTag> valueTags, NTable::TSelectStats& stats, const TMaybe<TRowVersion>& snapshot = {});
1212
TMaybe<NTable::TRowState> GetState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
13-
TArrayRef<const NTable::TTag> valueTags, const TMaybe<TRowVersion>& readVersion = {});
13+
TArrayRef<const NTable::TTag> valueTags, const TMaybe<TRowVersion>& snapshot = {});
1414
static NTable::TRowState PatchState(const NTable::TRowState& oldState, NTable::ERowOp rop,
1515
const THashMap<NTable::TTag, NTable::TPos>& tagToPos, const THashMap<NTable::TTag, NTable::TUpdateOp>& updates);
1616

ydb/core/tx/datashard/datashard.cpp

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class TDataShardMiniKQLFactory : public NMiniKQL::TMiniKQLFactory {
6767
}
6868

6969
// Write user tables with a minimal safe version (avoiding snapshots)
70-
return Self->GetLocalReadWriteVersions().WriteVersion;
70+
return Self->GetLocalMvccVersion();
7171
}
7272

7373
TRowVersion GetReadVersion(const TTableId& tableId) const override {
@@ -83,7 +83,7 @@ class TDataShardMiniKQLFactory : public NMiniKQL::TMiniKQLFactory {
8383
return TRowVersion::Max();
8484
}
8585

86-
return Self->GetLocalReadWriteVersions().ReadVersion;
86+
return TRowVersion::Max();
8787
}
8888

8989
private:
@@ -2325,23 +2325,8 @@ bool TDataShard::AllowCancelROwithReadsets() const {
23252325
return CanCancelROWithReadSets;
23262326
}
23272327

2328-
TReadWriteVersions TDataShard::GetLocalReadWriteVersions() const {
2329-
if (IsFollower())
2330-
return {TRowVersion::Max(), TRowVersion::Max()};
2331-
2332-
TRowVersion edge = Max(
2333-
SnapshotManager.GetCompleteEdge(),
2334-
SnapshotManager.GetIncompleteEdge(),
2335-
SnapshotManager.GetUnprotectedReadEdge());
2336-
2337-
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
2338-
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
2339-
2340-
TRowVersion maxEdge(edge.Step, ::Max<ui64>());
2341-
2342-
TRowVersion writeVersion = Max(maxEdge, edge.Next(), SnapshotManager.GetImmediateWriteEdge());
2343-
2344-
return {TRowVersion::Max(), writeVersion};
2328+
TRowVersion TDataShard::GetLocalMvccVersion() const {
2329+
return GetMvccVersion();
23452330
}
23462331

23472332
TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const {
@@ -2436,17 +2421,17 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
24362421
Y_ABORT("unreachable");
24372422
}
24382423

2439-
TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const {
2424+
TRowVersion TDataShard::GetMvccVersion(TOperation* op) const {
24402425
if (IsFollower()) {
2441-
return {TRowVersion::Max(), TRowVersion::Max()};
2426+
return TRowVersion::Max();
24422427
}
24432428

24442429
if (op) {
2445-
if (!op->MvccReadWriteVersion) {
2446-
op->MvccReadWriteVersion = GetMvccTxVersion(op->IsReadOnly() ? EMvccTxMode::ReadOnly : EMvccTxMode::ReadWrite, op);
2430+
if (!op->CachedMvccVersion) {
2431+
op->CachedMvccVersion = GetMvccTxVersion(op->IsReadOnly() ? EMvccTxMode::ReadOnly : EMvccTxMode::ReadWrite, op);
24472432
}
24482433

2449-
return *op->MvccReadWriteVersion;
2434+
return *op->CachedMvccVersion;
24502435
}
24512436

24522437
return GetMvccTxVersion(EMvccTxMode::ReadWrite, nullptr);

ydb/core/tx/datashard/datashard__engine_host.cpp

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ class TDataShardEngineHost final
207207
self->GetKeyAccessSampler()))
208208
, Self(self)
209209
, EngineBay(engineBay)
210-
, UserDb(*self, db, globalTxId, TRowVersion::Min(), TRowVersion::Max(), counters, now)
210+
, UserDb(*self, db, globalTxId, TRowVersion::Max(), counters, now)
211211
{
212212
}
213213

@@ -217,39 +217,37 @@ class TDataShardEngineHost final
217217
TArrayRef<const NTable::TTag> tags,
218218
NTable::TRowState& row,
219219
NTable::TSelectStats& stats,
220-
const TMaybe<TRowVersion>& readVersion) override
220+
const TMaybe<TRowVersion>& snapshot) override
221221
{
222-
return UserDb.SelectRow(tableId, key, tags, row, stats, readVersion);
222+
return UserDb.SelectRow(tableId, key, tags, row, stats, snapshot);
223223
}
224224

225225
NTable::EReady SelectRow(
226226
const TTableId& tableId,
227227
TArrayRef<const TRawTypeValue> key,
228228
TArrayRef<const NTable::TTag> tags,
229229
NTable::TRowState& row,
230-
const TMaybe<TRowVersion>& readVersion) override
230+
const TMaybe<TRowVersion>& snapshot) override
231231
{
232-
return UserDb.SelectRow(tableId, key, tags, row, readVersion);
232+
return UserDb.SelectRow(tableId, key, tags, row, snapshot);
233233
}
234234

235-
void SetWriteVersion(TRowVersion writeVersion) {
236-
UserDb.SetWriteVersion(writeVersion);
235+
void SetMvccVersion(TRowVersion mvccVersion) {
236+
UserDb.SetMvccVersion(mvccVersion);
237237
}
238238

239239
TRowVersion GetWriteVersion(const TTableId& tableId) const override {
240240
Y_UNUSED(tableId);
241-
Y_ABORT_UNLESS(!UserDb.GetWriteVersion().IsMax(), "Cannot perform writes without WriteVersion set");
242-
return UserDb.GetWriteVersion();
243-
}
244-
245-
void SetReadVersion(TRowVersion readVersion) {
246-
UserDb.SetReadVersion(readVersion);
241+
auto mvccVersion = UserDb.GetMvccVersion();
242+
Y_ABORT_UNLESS(!mvccVersion.IsMax(), "Cannot perform writes without the correct MvccVersion set");
243+
return mvccVersion;
247244
}
248245

249246
TRowVersion GetReadVersion(const TTableId& tableId) const override {
250247
Y_UNUSED(tableId);
251-
Y_ABORT_UNLESS(!UserDb.GetReadVersion().IsMin(), "Cannot perform reads without ReadVersion set");
252-
return UserDb.GetReadVersion();
248+
auto mvccVersion = UserDb.GetMvccVersion();
249+
Y_ABORT_UNLESS(!mvccVersion.IsMax(), "Cannot perform reads without the correct MvccVersion set");
250+
return mvccVersion;
253251
}
254252

255253
void SetVolatileTxId(ui64 txId) {
@@ -276,8 +274,8 @@ class TDataShardEngineHost final
276274
return UserDb.GetChangeCollector(tableId);
277275
}
278276

279-
void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) override {
280-
UserDb.CommitChanges(tableId, lockId, writeVersion);
277+
void CommitChanges(const TTableId& tableId, ui64 lockId) override {
278+
UserDb.CommitChanges(tableId, lockId);
281279
}
282280

283281
TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const {
@@ -596,21 +594,14 @@ TEngineBay::TSizes TEngineBay::CalcSizes(bool needsTotalKeysSize) const {
596594
return outSizes;
597595
}
598596

599-
void TEngineBay::SetWriteVersion(TRowVersion writeVersion) {
600-
Y_ABORT_UNLESS(EngineHost);
601-
602-
auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
603-
host->SetWriteVersion(writeVersion);
604-
}
605-
606-
void TEngineBay::SetReadVersion(TRowVersion readVersion) {
597+
void TEngineBay::SetMvccVersion(TRowVersion mvccVersion) {
607598
Y_ABORT_UNLESS(EngineHost);
608599

609600
auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
610-
host->SetReadVersion(readVersion);
601+
host->SetMvccVersion(mvccVersion);
611602

612603
Y_ABORT_UNLESS(ComputeCtx);
613-
ComputeCtx->SetReadVersion(readVersion);
604+
ComputeCtx->SetMvccVersion(mvccVersion);
614605
}
615606

616607
void TEngineBay::SetVolatileTxId(ui64 txId) {

0 commit comments

Comments
 (0)