Skip to content

Commit d584a70

Browse files
committed
Remove legacy split to read and write versions, execute operations at a single mvcc version (#20448)
1 parent 0d24c4f commit d584a70

39 files changed

+194
-244
lines changed

ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ EExecutionStatus TBuildDataTxOutRSUnit::Execute(TOperation::TPtr op,
6363
TDataShardLocksDb locksDb(DataShard, txc);
6464
TSetupSysLocks guardLocks(op, DataShard, &locksDb);
6565

66-
tx->GetDataTx()->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion);
66+
tx->GetDataTx()->SetMvccVersion(DataShard.GetMvccVersion(tx));
6767
IEngineFlat *engine = tx->GetDataTx()->GetEngine();
6868
try {
6969
auto &outReadSets = op->OutReadSets();

ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ class TBuildDistributedEraseTxOutRSUnit : public TExecutionUnit {
104104

105105
const auto tags = MakeTags(condition->Tags(), eraseTx->GetIndexColumnIds());
106106
auto now = TAppData::TimeProvider->Now();
107-
auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(tx);
107+
auto mvccVersion = DataShard.GetMvccVersion(tx);
108108
NMiniKQL::TEngineHostCounters engineHostCounters;
109-
TDataShardUserDb userDb(DataShard, txc.DB, op->GetGlobalTxId(), readVersion, writeVersion, engineHostCounters, now);
109+
TDataShardUserDb userDb(DataShard, txc.DB, op->GetGlobalTxId(), mvccVersion, engineHostCounters, now);
110110
bool pageFault = false;
111111

112112
TDynBitMap confirmedRows;

ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
9191
LOG_T("Operation " << *op << " (build_kqp_data_tx_out_rs) at " << tabletId
9292
<< " set memory limit " << (txc.GetMemoryLimit() - dataTx->GetTxSize()));
9393

94-
dataTx->SetReadVersion(DataShard.GetReadWriteVersions(tx).ReadVersion);
94+
dataTx->SetMvccVersion(DataShard.GetMvccVersion(tx));
9595

9696
if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) {
9797
auto result = KqpRunTransaction(ctx, op->GetTxId(), useGenericReadSets, tasksRunner);

ydb/core/tx/datashard/change_collector_cdc_stream.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,10 +279,10 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
279279
}
280280

281281
TMaybe<TRowState> TCdcStreamChangeCollector::GetState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
282-
TArrayRef<const TTag> valueTags, TSelectStats& stats, const TMaybe<TRowVersion>& readVersion)
282+
TArrayRef<const TTag> valueTags, TSelectStats& stats, const TMaybe<TRowVersion>& snapshot)
283283
{
284284
TRowState row;
285-
const auto ready = UserDb.SelectRow(tableId, key, valueTags, row, stats, readVersion);
285+
const auto ready = UserDb.SelectRow(tableId, key, valueTags, row, stats, snapshot);
286286

287287
if (ready == EReady::Page) {
288288
return Nothing();
@@ -292,10 +292,10 @@ TMaybe<TRowState> TCdcStreamChangeCollector::GetState(const TTableId& tableId, T
292292
}
293293

294294
TMaybe<TRowState> TCdcStreamChangeCollector::GetState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
295-
TArrayRef<const TTag> valueTags, const TMaybe<TRowVersion>& readVersion)
295+
TArrayRef<const TTag> valueTags, const TMaybe<TRowVersion>& snapshot)
296296
{
297297
TSelectStats stats;
298-
return GetState(tableId, key, valueTags, stats, readVersion);
298+
return GetState(tableId, key, valueTags, stats, snapshot);
299299
}
300300

301301
TRowState TCdcStreamChangeCollector::PatchState(const TRowState& oldState, ERowOp rop,

ydb/core/tx/datashard/change_collector_cdc_stream.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ namespace NDataShard {
88

99
class TCdcStreamChangeCollector: public TBaseChangeCollector {
1010
TMaybe<NTable::TRowState> GetState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
11-
TArrayRef<const NTable::TTag> valueTags, NTable::TSelectStats& stats, const TMaybe<TRowVersion>& readVersion = {});
11+
TArrayRef<const NTable::TTag> valueTags, NTable::TSelectStats& stats, const TMaybe<TRowVersion>& snapshot = {});
1212
TMaybe<NTable::TRowState> GetState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
13-
TArrayRef<const NTable::TTag> valueTags, const TMaybe<TRowVersion>& readVersion = {});
13+
TArrayRef<const NTable::TTag> valueTags, const TMaybe<TRowVersion>& snapshot = {});
1414
static NTable::TRowState PatchState(const NTable::TRowState& oldState, NTable::ERowOp rop,
1515
const THashMap<NTable::TTag, NTable::TPos>& tagToPos, const THashMap<NTable::TTag, NTable::TUpdateOp>& updates);
1616

ydb/core/tx/datashard/datashard.cpp

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class TDataShardMiniKQLFactory : public NMiniKQL::TMiniKQLFactory {
6767
}
6868

6969
// Write user tables with a minimal safe version (avoiding snapshots)
70-
return Self->GetLocalReadWriteVersions().WriteVersion;
70+
return Self->GetLocalMvccVersion();
7171
}
7272

7373
TRowVersion GetReadVersion(const TTableId& tableId) const override {
@@ -83,7 +83,7 @@ class TDataShardMiniKQLFactory : public NMiniKQL::TMiniKQLFactory {
8383
return TRowVersion::Max();
8484
}
8585

86-
return Self->GetLocalReadWriteVersions().ReadVersion;
86+
return TRowVersion::Max();
8787
}
8888

8989
private:
@@ -2325,23 +2325,8 @@ bool TDataShard::AllowCancelROwithReadsets() const {
23252325
return CanCancelROWithReadSets;
23262326
}
23272327

2328-
TReadWriteVersions TDataShard::GetLocalReadWriteVersions() const {
2329-
if (IsFollower())
2330-
return {TRowVersion::Max(), TRowVersion::Max()};
2331-
2332-
TRowVersion edge = Max(
2333-
SnapshotManager.GetCompleteEdge(),
2334-
SnapshotManager.GetIncompleteEdge(),
2335-
SnapshotManager.GetUnprotectedReadEdge());
2336-
2337-
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
2338-
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
2339-
2340-
TRowVersion maxEdge(edge.Step, ::Max<ui64>());
2341-
2342-
TRowVersion writeVersion = Max(maxEdge, edge.Next(), SnapshotManager.GetImmediateWriteEdge());
2343-
2344-
return {TRowVersion::Max(), writeVersion};
2328+
TRowVersion TDataShard::GetLocalMvccVersion() const {
2329+
return GetMvccVersion();
23452330
}
23462331

23472332
TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const {
@@ -2436,17 +2421,17 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
24362421
Y_ABORT("unreachable");
24372422
}
24382423

2439-
TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const {
2424+
TRowVersion TDataShard::GetMvccVersion(TOperation* op) const {
24402425
if (IsFollower()) {
2441-
return {TRowVersion::Max(), TRowVersion::Max()};
2426+
return TRowVersion::Max();
24422427
}
24432428

24442429
if (op) {
2445-
if (!op->MvccReadWriteVersion) {
2446-
op->MvccReadWriteVersion = GetMvccTxVersion(op->IsReadOnly() ? EMvccTxMode::ReadOnly : EMvccTxMode::ReadWrite, op);
2430+
if (!op->CachedMvccVersion) {
2431+
op->CachedMvccVersion = GetMvccTxVersion(op->IsReadOnly() ? EMvccTxMode::ReadOnly : EMvccTxMode::ReadWrite, op);
24472432
}
24482433

2449-
return *op->MvccReadWriteVersion;
2434+
return *op->CachedMvccVersion;
24502435
}
24512436

24522437
return GetMvccTxVersion(EMvccTxMode::ReadWrite, nullptr);

ydb/core/tx/datashard/datashard__engine_host.cpp

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ class TDataShardEngineHost final
207207
self->GetKeyAccessSampler()))
208208
, Self(self)
209209
, EngineBay(engineBay)
210-
, UserDb(*self, db, globalTxId, TRowVersion::Min(), TRowVersion::Max(), counters, now)
210+
, UserDb(*self, db, globalTxId, TRowVersion::Max(), counters, now)
211211
{
212212
}
213213

@@ -217,39 +217,37 @@ class TDataShardEngineHost final
217217
TArrayRef<const NTable::TTag> tags,
218218
NTable::TRowState& row,
219219
NTable::TSelectStats& stats,
220-
const TMaybe<TRowVersion>& readVersion) override
220+
const TMaybe<TRowVersion>& snapshot) override
221221
{
222-
return UserDb.SelectRow(tableId, key, tags, row, stats, readVersion);
222+
return UserDb.SelectRow(tableId, key, tags, row, stats, snapshot);
223223
}
224224

225225
NTable::EReady SelectRow(
226226
const TTableId& tableId,
227227
TArrayRef<const TRawTypeValue> key,
228228
TArrayRef<const NTable::TTag> tags,
229229
NTable::TRowState& row,
230-
const TMaybe<TRowVersion>& readVersion) override
230+
const TMaybe<TRowVersion>& snapshot) override
231231
{
232-
return UserDb.SelectRow(tableId, key, tags, row, readVersion);
232+
return UserDb.SelectRow(tableId, key, tags, row, snapshot);
233233
}
234234

235-
void SetWriteVersion(TRowVersion writeVersion) {
236-
UserDb.SetWriteVersion(writeVersion);
235+
void SetMvccVersion(TRowVersion mvccVersion) {
236+
UserDb.SetMvccVersion(mvccVersion);
237237
}
238238

239239
TRowVersion GetWriteVersion(const TTableId& tableId) const override {
240240
Y_UNUSED(tableId);
241-
Y_ABORT_UNLESS(!UserDb.GetWriteVersion().IsMax(), "Cannot perform writes without WriteVersion set");
242-
return UserDb.GetWriteVersion();
243-
}
244-
245-
void SetReadVersion(TRowVersion readVersion) {
246-
UserDb.SetReadVersion(readVersion);
241+
auto mvccVersion = UserDb.GetMvccVersion();
242+
Y_ABORT_UNLESS(!mvccVersion.IsMax(), "Cannot perform writes without the correct MvccVersion set");
243+
return mvccVersion;
247244
}
248245

249246
TRowVersion GetReadVersion(const TTableId& tableId) const override {
250247
Y_UNUSED(tableId);
251-
Y_ABORT_UNLESS(!UserDb.GetReadVersion().IsMin(), "Cannot perform reads without ReadVersion set");
252-
return UserDb.GetReadVersion();
248+
auto mvccVersion = UserDb.GetMvccVersion();
249+
Y_ABORT_UNLESS(!mvccVersion.IsMax(), "Cannot perform reads without the correct MvccVersion set");
250+
return mvccVersion;
253251
}
254252

255253
void SetVolatileTxId(ui64 txId) {
@@ -276,8 +274,8 @@ class TDataShardEngineHost final
276274
return UserDb.GetChangeCollector(tableId);
277275
}
278276

279-
void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion) override {
280-
UserDb.CommitChanges(tableId, lockId, writeVersion);
277+
void CommitChanges(const TTableId& tableId, ui64 lockId) override {
278+
UserDb.CommitChanges(tableId, lockId);
281279
}
282280

283281
TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const {
@@ -596,21 +594,14 @@ TEngineBay::TSizes TEngineBay::CalcSizes(bool needsTotalKeysSize) const {
596594
return outSizes;
597595
}
598596

599-
void TEngineBay::SetWriteVersion(TRowVersion writeVersion) {
600-
Y_ABORT_UNLESS(EngineHost);
601-
602-
auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
603-
host->SetWriteVersion(writeVersion);
604-
}
605-
606-
void TEngineBay::SetReadVersion(TRowVersion readVersion) {
597+
void TEngineBay::SetMvccVersion(TRowVersion mvccVersion) {
607598
Y_ABORT_UNLESS(EngineHost);
608599

609600
auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
610-
host->SetReadVersion(readVersion);
601+
host->SetMvccVersion(mvccVersion);
611602

612603
Y_ABORT_UNLESS(ComputeCtx);
613-
ComputeCtx->SetReadVersion(readVersion);
604+
ComputeCtx->SetMvccVersion(mvccVersion);
614605
}
615606

616607
void TEngineBay::SetVolatileTxId(ui64 txId) {

ydb/core/tx/datashard/datashard__engine_host.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,7 @@ class TEngineBay : TNonCopyable {
9797
const TValidationInfo& TxInfo() const { return KeyValidator.GetInfo(); }
9898
TEngineBay::TSizes CalcSizes(bool needsTotalKeysSize) const;
9999

100-
void SetWriteVersion(TRowVersion writeVersion);
101-
void SetReadVersion(TRowVersion readVersion);
100+
void SetMvccVersion(TRowVersion mvccVersion);
102101
void SetVolatileTxId(ui64 txId);
103102
void SetIsImmediateTx();
104103
void SetUsesMvccSnapshot();

ydb/core/tx/datashard/datashard_active_transaction.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,7 @@ class TValidatedDataTx : TNonCopyable, public TValidatedTx {
185185
bool CanCancel();
186186
bool CheckCancelled(ui64 tabletId);
187187

188-
void SetWriteVersion(TRowVersion writeVersion) { EngineBay.SetWriteVersion(writeVersion); }
189-
void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); }
188+
void SetMvccVersion(TRowVersion mvccVersion) { EngineBay.SetMvccVersion(mvccVersion); }
190189
void SetVolatileTxId(ui64 txId) { EngineBay.SetVolatileTxId(txId); }
191190

192191
TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const { return EngineBay.GetCollectedChanges(); }

ydb/core/tx/datashard/datashard_change_receiving.cpp

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -307,18 +307,16 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> {
307307
return false;
308308
}
309309

310-
if (!UseStepTxId(record) && !MvccReadWriteVersion) {
311-
auto [readVersion, writeVersion] = Self->GetReadWriteVersions();
312-
Y_DEBUG_ABORT_UNLESS(readVersion == writeVersion);
313-
MvccReadWriteVersion = writeVersion;
314-
Pipeline.AddCommittingOp(*MvccReadWriteVersion);
310+
if (!UseStepTxId(record) && !MvccVersion) {
311+
MvccVersion = Self->GetMvccVersion();
312+
Pipeline.AddCommittingOp(*MvccVersion);
315313
}
316314

317315
if (UseStepTxId(record)) {
318316
txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, TRowVersion(record.GetStep(), record.GetTxId()));
319317
} else {
320318
Self->SysLocksTable().BreakLocks(tableId, KeyCells.GetCells()); // probably redundant, we expect target table to be locked until complete restore
321-
txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, *MvccReadWriteVersion);
319+
txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, *MvccVersion);
322320
}
323321

324322
Self->GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(KeyCells.GetCells(), txc.DB);
@@ -408,8 +406,8 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> {
408406
void Complete(const TActorContext& ctx) override {
409407
Y_ABORT_UNLESS(Status);
410408

411-
if (MvccReadWriteVersion) {
412-
Pipeline.RemoveCommittingOp(*MvccReadWriteVersion);
409+
if (MvccVersion) {
410+
Pipeline.RemoveCommittingOp(*MvccVersion);
413411
}
414412

415413
if (Status->Record.GetStatus() == NKikimrChangeExchange::TEvStatus::STATUS_OK) {
@@ -425,7 +423,7 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> {
425423
TPipeline& Pipeline;
426424
TEvChangeExchange::TEvApplyRecords::TPtr Ev;
427425
THolder<TEvChangeExchange::TEvStatus> Status;
428-
std::optional<TRowVersion> MvccReadWriteVersion;
426+
std::optional<TRowVersion> MvccVersion;
429427

430428
TSerializedCellVec KeyCells;
431429
TSerializedCellVec ValueCells;

0 commit comments

Comments
 (0)