Skip to content

Commit 46419b6

Browse files
table have to be readable with snapshot livetime (#12964)
1 parent b46a3cc commit 46419b6

File tree

6 files changed

+64
-40
lines changed

6 files changed

+64
-40
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
1515
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
1616
ACFL_DEBUG("event", "start_execute");
1717
auto& index = Self->MutableIndexAs<NOlap::TColumnEngineForLogs>();
18+
const auto minReadSnapshot = Self->GetMinReadSnapshot();
1819
for (auto&& pack : Packs) {
1920
const auto& writeMeta = pack.GetWriteMeta();
20-
AFL_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
21+
AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
2122
AFL_VERIFY(!writeMeta.HasLongTxId());
2223
auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
2324
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) {
4040
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
4141
ACFL_DEBUG("event", "start_execute");
4242
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
43+
const auto minReadSnapshot = Self->GetMinReadSnapshot();
4344
for (auto&& aggr : buffer.GetAggregations()) {
4445
const auto& writeMeta = aggr->GetWriteMeta();
45-
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
46+
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
4647
txc.DB.NoMoreReadsForTx();
4748
TWriteOperation::TPtr operation;
4849
if (writeMeta.HasLongTxId()) {

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
142142
"writing_id", writeMeta.GetId())("status", putResult.GetPutStatus());
143143
Counters.GetWritesMonitor()->OnFinishWrite(aggr->GetSize(), 1);
144144

145-
if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) {
146-
ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex());
147-
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
148-
149-
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
150-
ctx.Send(writeMeta.GetSource(), result.release());
151-
Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::NoTable);
152-
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
153-
continue;
154-
}
155-
156145
if (putResult.GetPutStatus() != NKikimrProto::OK) {
157146
Counters.GetCSCounters().OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant());
158147
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
@@ -238,7 +227,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
238227
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled);
239228
}
240229

241-
if (!TablesManager.IsReadyForWrite(pathId)) {
230+
if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
242231
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index")
243232
<< " at tablet " << TabletID());
244233

@@ -558,7 +547,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
558547

559548
const auto pathId = operation.GetTableId().GetTableId();
560549

561-
if (!TablesManager.IsReadyForWrite(pathId)) {
550+
if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
562551
sendError("table not writable", NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR);
563552
return;
564553
}

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,8 +1051,10 @@ void TColumnShard::SetupCleanupPortions() {
10511051
return;
10521052
}
10531053

1054-
auto changes =
1055-
TablesManager.MutablePrimaryIndex().StartCleanupPortions(GetMinReadSnapshot(), TablesManager.GetPathsToDrop(), DataLocksManager);
1054+
const NOlap::TSnapshot minReadSnapshot = GetMinReadSnapshot();
1055+
THashSet<ui64> pathsToDrop = TablesManager.GetPathsToDrop(minReadSnapshot);
1056+
1057+
auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(minReadSnapshot, pathsToDrop, DataLocksManager);
10561058
if (!changes) {
10571059
ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes");
10581060
return;
@@ -1077,7 +1079,7 @@ void TColumnShard::SetupCleanupTables() {
10771079
}
10781080

10791081
THashSet<ui64> pathIdsEmptyInInsertTable;
1080-
for (auto&& i : TablesManager.GetPathsToDrop()) {
1082+
for (auto&& i : TablesManager.GetPathsToDrop(GetMinReadSnapshot())) {
10811083
if (InsertTable->HasPathIdData(i)) {
10821084
continue;
10831085
}

ydb/core/tx/columnshard/tables_manager.cpp

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
6060
return false;
6161
}
6262
if (table.IsDropped()) {
63-
PathsToDrop.insert(table.GetPathId());
63+
AFL_VERIFY(PathsToDrop[table.GetDropVersionVerified()].emplace(table.GetPathId()).second);
6464
}
6565

6666
AFL_VERIFY(Tables.emplace(table.GetPathId(), std::move(table)).second);
@@ -201,19 +201,23 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
201201
return true;
202202
}
203203

204-
bool TTablesManager::HasTable(const ui64 pathId, bool withDeleted) const {
204+
bool TTablesManager::HasTable(const ui64 pathId, const bool withDeleted, const std::optional<NOlap::TSnapshot> minReadSnapshot) const {
205205
auto it = Tables.find(pathId);
206206
if (it == Tables.end()) {
207207
return false;
208208
}
209-
if (it->second.IsDropped()) {
209+
if (it->second.IsDropped(minReadSnapshot)) {
210210
return withDeleted;
211211
}
212212
return true;
213213
}
214214

215-
bool TTablesManager::IsReadyForWrite(const ui64 pathId) const {
216-
return HasPrimaryIndex() && HasTable(pathId);
215+
bool TTablesManager::IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const {
216+
return HasPrimaryIndex() && HasTable(pathId, withDeleted);
217+
}
218+
219+
bool TTablesManager::IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const {
220+
return HasPrimaryIndex() && HasTable(pathId, false, minReadSnapshot);
217221
}
218222

219223
bool TTablesManager::HasPreset(const ui32 presetId) const {
@@ -237,7 +241,7 @@ void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& versio
237241
AFL_VERIFY(Tables.contains(pathId));
238242
auto& table = Tables[pathId];
239243
table.SetDropVersion(version);
240-
PathsToDrop.insert(pathId);
244+
AFL_VERIFY(PathsToDrop[version].emplace(pathId).second);
241245
Ttl.erase(pathId);
242246
Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId());
243247
}
@@ -363,27 +367,34 @@ TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& s
363367
}
364368

365369
bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const {
366-
auto itDrop = PathsToDrop.find(pathId);
370+
const auto& itTable = Tables.find(pathId);
371+
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
372+
auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified());
367373
AFL_VERIFY(itDrop != PathsToDrop.end());
374+
AFL_VERIFY(itDrop->second.contains(pathId));
375+
368376
AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
369377
NIceDb::TNiceDb db(dbTable);
370378
NColumnShard::Schema::EraseTableInfo(db, pathId);
371-
const auto& itTable = Tables.find(pathId);
372-
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
373379
for (auto&& tableVersion : itTable->second.GetVersions()) {
374380
NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion);
375381
}
376382
return true;
377383
}
378384

379385
bool TTablesManager::TryFinalizeDropPathOnComplete(const ui64 pathId) {
380-
auto itDrop = PathsToDrop.find(pathId);
381-
AFL_VERIFY(itDrop != PathsToDrop.end());
382-
AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
383-
AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId));
384-
PathsToDrop.erase(itDrop);
385386
const auto& itTable = Tables.find(pathId);
386387
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
388+
{
389+
auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified());
390+
AFL_VERIFY(itDrop != PathsToDrop.end());
391+
AFL_VERIFY(itDrop->second.erase(pathId));
392+
if (itDrop->second.empty()) {
393+
PathsToDrop.erase(itDrop);
394+
}
395+
}
396+
AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
397+
AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId));
387398
Tables.erase(itTable);
388399
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("method", "TryFinalizeDropPathOnComplete")("path_id", pathId)("size", Tables.size());
389400
return true;

ydb/core/tx/columnshard/tables_manager.h

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,28 @@ class TTableInfo {
105105
return PathId;
106106
}
107107

108+
const NOlap::TSnapshot& GetDropVersionVerified() const {
109+
AFL_VERIFY(DropVersion);
110+
return *DropVersion;
111+
}
112+
108113
void SetDropVersion(const NOlap::TSnapshot& version) {
114+
AFL_VERIFY(!DropVersion)("exists", DropVersion->DebugString())("version", version.DebugString());
109115
DropVersion = version;
110116
}
111117

112118
void AddVersion(const NOlap::TSnapshot& snapshot) {
113119
Versions.insert(snapshot);
114120
}
115121

116-
bool IsDropped() const {
117-
return DropVersion.has_value();
122+
bool IsDropped(const std::optional<NOlap::TSnapshot>& minReadSnapshot = std::nullopt) const {
123+
if (!DropVersion) {
124+
return false;
125+
}
126+
if (!minReadSnapshot) {
127+
return true;
128+
}
129+
return *DropVersion < *minReadSnapshot;
118130
}
119131

120132
TTableInfo() = default;
@@ -139,7 +151,7 @@ class TTablesManager {
139151
THashMap<ui64, TTableInfo> Tables;
140152
THashSet<ui32> SchemaPresetsIds;
141153
THashMap<ui32, NKikimrSchemeOp::TColumnTableSchema> ActualSchemaForPreset;
142-
THashSet<ui64> PathsToDrop;
154+
std::map<NOlap::TSnapshot, THashSet<ui64>> PathsToDrop;
143155
THashMap<ui64, NOlap::TTiering> Ttl;
144156
std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
145157
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
@@ -166,12 +178,19 @@ class TTablesManager {
166178
return Ttl;
167179
}
168180

169-
const THashSet<ui64>& GetPathsToDrop() const {
181+
const std::map<NOlap::TSnapshot, THashSet<ui64>>& GetPathsToDrop() const {
170182
return PathsToDrop;
171183
}
172184

173-
THashSet<ui64>& MutablePathsToDrop() {
174-
return PathsToDrop;
185+
THashSet<ui64> GetPathsToDrop(const NOlap::TSnapshot& minReadSnapshot) const {
186+
THashSet<ui64> result;
187+
for (auto&& i : PathsToDrop) {
188+
if (minReadSnapshot < i.first) {
189+
break;
190+
}
191+
result.insert(i.second.begin(), i.second.end());
192+
}
193+
return result;
175194
}
176195

177196
const THashMap<ui64, TTableInfo>& GetTables() const {
@@ -236,8 +255,9 @@ class TTablesManager {
236255
const TTableInfo& GetTable(const ui64 pathId) const;
237256
ui64 GetMemoryUsage() const;
238257

239-
bool HasTable(const ui64 pathId, bool withDeleted = false) const;
240-
bool IsReadyForWrite(const ui64 pathId) const;
258+
bool HasTable(const ui64 pathId, const bool withDeleted = false, const std::optional<NOlap::TSnapshot> minReadSnapshot = std::nullopt) const;
259+
bool IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const;
260+
bool IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const;
241261
bool HasPreset(const ui32 presetId) const;
242262

243263
void DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db);

0 commit comments

Comments
 (0)