Skip to content

Commit fc0c41d

Browse files
authored
Remove unused table versions along with schema versions in TSchemaVer… (#10058)
1 parent 0211059 commit fc0c41d

File tree

2 files changed

+76
-6
lines changed

2 files changed

+76
-6
lines changed

ydb/core/tx/columnshard/normalizer/schema_version/version.cpp

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,44 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
2323
}
2424
};
2525

26+
class TTableKey {
27+
public:
28+
ui64 PathId;
29+
ui64 Step;
30+
ui64 TxId;
31+
ui64 Version;
32+
33+
public:
34+
TTableKey(ui64 pathId, ui64 step, ui64 txId, ui64 version)
35+
: PathId(pathId)
36+
, Step(step)
37+
, TxId(txId)
38+
, Version(version)
39+
{
40+
}
41+
};
42+
2643
std::vector<TKey> VersionsToRemove;
44+
std::vector<TTableKey> TableVersionsToRemove;
2745

2846
public:
29-
TNormalizerResult(std::vector<TKey>&& versions)
47+
TNormalizerResult(std::vector<TKey>&& versions, std::vector<TTableKey>&& tableVersions)
3048
: VersionsToRemove(versions)
49+
, TableVersionsToRemove(tableVersions)
3150
{
3251
}
3352

3453
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
3554
using namespace NColumnShard;
3655
NIceDb::TNiceDb db(txc.DB);
3756
for (auto& key: VersionsToRemove) {
57+
LOG_S_DEBUG("Removing schema version in TSchemaVersionNormalizer " << key.Version);
3858
db.Table<Schema::SchemaPresetVersionInfo>().Key(key.Id, key.Step, key.TxId).Delete();
3959
}
60+
for (auto& key: TableVersionsToRemove) {
61+
LOG_S_DEBUG("Removing table version in TSchemaVersionNormalizer " << key.Version << " pathId " << key.PathId);
62+
db.Table<Schema::TableVersionInfo>().Key(key.PathId, key.Step, key.TxId).Delete();
63+
}
4064
return true;
4165
}
4266

@@ -78,6 +102,7 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
78102
}
79103

80104
std::vector<TKey> unusedSchemaIds;
105+
std::vector<TTableKey> unusedTableSchemaIds;
81106
std::optional<ui64> maxVersion;
82107
std::vector<INormalizerChanges::TPtr> changes;
83108

@@ -107,18 +132,57 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
107132
}
108133
}
109134

135+
{
136+
auto rowset = db.Table<Schema::TableVersionInfo>().Select();
137+
if (!rowset.IsReady()) {
138+
return std::nullopt;
139+
}
140+
141+
while (!rowset.EndOfSet()) {
142+
const ui64 pathId = rowset.GetValue<Schema::TableVersionInfo::PathId>();
143+
144+
NKikimrTxColumnShard::TTableVersionInfo versionInfo;
145+
Y_ABORT_UNLESS(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
146+
if (versionInfo.HasSchema()) {
147+
ui64 version = versionInfo.GetSchema().GetVersion();
148+
if (!usedSchemaVersions.contains(version)) {
149+
unusedTableSchemaIds.emplace_back(pathId, rowset.GetValue<Schema::TableVersionInfo::SinceStep>(), rowset.GetValue<Schema::TableVersionInfo::SinceTxId>(), version);
150+
}
151+
}
152+
153+
if (!rowset.Next()) {
154+
return std::nullopt;
155+
}
156+
}
157+
}
158+
159+
std::vector<TTableKey> tablePortion;
110160
std::vector<TKey> portion;
161+
tablePortion.reserve(10000);
111162
portion.reserve(10000);
163+
auto addPortion = [&]() {
164+
if (portion.size() + tablePortion.size() >= 10000) {
165+
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion), std::move(tablePortion)));
166+
portion = std::vector<TKey>();
167+
tablePortion = std::vector<TTableKey>();
168+
}
169+
};
112170
for (const auto& id: unusedSchemaIds) {
113171
if (!maxVersion.has_value() || (id.Version != *maxVersion)) {
114172
portion.push_back(id);
115-
if (portion.size() >= 10000) {
116-
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
117-
}
173+
addPortion();
174+
}
175+
}
176+
177+
for (const auto& id: unusedTableSchemaIds) {
178+
if (!maxVersion.has_value() || (id.Version != *maxVersion)) {
179+
tablePortion.push_back(id);
180+
addPortion();
118181
}
119182
}
120-
if (portion.size() > 0) {
121-
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
183+
184+
if (portion.size() + tablePortion.size() > 0) {
185+
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion), std::move(tablePortion)));
122186
}
123187
return changes;
124188
}

ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,12 @@ class TSchemaVersionsCleaner : public NYDBTest::ILocalDBModifier {
176176
Y_ABORT_UNLESS(info.SerializeToString(&serialized));
177177
db.Table<Schema::SchemaPresetVersionInfo>().Key(11, 1, 1).Update(NIceDb::TUpdate<Schema::SchemaPresetVersionInfo::InfoProto>(serialized));
178178

179+
// Add invalid widow table version, if SchemaVersionCleaner will not erase it, then test will fail
180+
NKikimrTxColumnShard::TTableVersionInfo versionInfo;
181+
versionInfo.MutableSchema()->SetVersion(minVersion - 1);
182+
Y_ABORT_UNLESS(versionInfo.SerializeToString(&serialized));
183+
db.Table<Schema::TableVersionInfo>().Key(1, 1, 1).Update(NIceDb::TUpdate<Schema::TableVersionInfo::InfoProto>(serialized));
184+
179185
db.Table<Schema::SchemaPresetInfo>().Key(10).Update(NIceDb::TUpdate<Schema::SchemaPresetInfo::Name>("default"));
180186

181187
}

0 commit comments

Comments
 (0)