Skip to content

Commit 9b1e630

Browse files
Normalizer insert records (#9010)
1 parent 19eab67 commit 9b1e630

File tree

7 files changed

+319
-37
lines changed

7 files changed

+319
-37
lines changed

ydb/core/tx/columnshard/columnshard_schema.cpp

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,44 +10,18 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
1010
}
1111

1212
while (!rowset.EndOfSet()) {
13-
EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>();
14-
const ui64 planStep = rowset.GetValue<InsertTable::PlanStep>();
15-
const ui64 writeTxId = rowset.GetValueOrDefault<InsertTable::WriteTxId>();
16-
const ui64 pathId = rowset.GetValue<InsertTable::PathId>();
17-
const TString dedupId = rowset.GetValue<InsertTable::DedupId>();
18-
const ui64 schemaVersion = rowset.HaveValue<InsertTable::SchemaVersion>() ? rowset.GetValue<InsertTable::SchemaVersion>() : 0;
13+
NOlap::TInsertTableRecordLoadContext constructor;
14+
constructor.ParseFromDatabase(rowset);
1915

20-
TString error;
21-
NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(rowset.GetValue<InsertTable::BlobId>(), dsGroupSelector, error);
22-
Y_ABORT_UNLESS(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());
23-
24-
NKikimrTxColumnShard::TLogicalMetadata meta;
25-
if (auto metaStr = rowset.GetValue<InsertTable::Meta>()) {
26-
Y_ABORT_UNLESS(meta.ParseFromString(metaStr));
27-
}
28-
29-
std::optional<ui64> rangeOffset;
30-
if (rowset.HaveValue<InsertTable::BlobRangeOffset>()) {
31-
rangeOffset = rowset.GetValue<InsertTable::BlobRangeOffset>();
32-
}
33-
std::optional<ui64> rangeSize;
34-
if (rowset.HaveValue<InsertTable::BlobRangeSize>()) {
35-
rangeSize = rowset.GetValue<InsertTable::BlobRangeSize>();
36-
}
37-
AFL_VERIFY(!!rangeOffset == !!rangeSize);
38-
39-
auto userData = std::make_shared<NOlap::TUserData>(pathId,
40-
NOlap::TBlobRange(blobId, rangeOffset.value_or(0), rangeSize.value_or(blobId.BlobSize())), meta, schemaVersion, std::nullopt);
41-
42-
switch (recType) {
43-
case EInsertTableIds::Inserted:
44-
insertTable.AddInserted(NOlap::TInsertedData((TInsertWriteId)writeTxId, userData), true);
16+
switch (constructor.GetRecType()) {
17+
case Schema::EInsertTableIds::Inserted:
18+
insertTable.AddInserted(constructor.BuildInsertedOrAborted(dsGroupSelector), true);
4519
break;
46-
case EInsertTableIds::Committed:
47-
insertTable.AddCommitted(NOlap::TCommittedData(userData, planStep, writeTxId, dedupId), true);
20+
case Schema::EInsertTableIds::Committed:
21+
insertTable.AddCommitted(constructor.BuildCommitted(dsGroupSelector), true);
4822
break;
49-
case EInsertTableIds::Aborted:
50-
insertTable.AddAborted(NOlap::TInsertedData((TInsertWriteId)writeTxId, userData), true);
23+
case Schema::EInsertTableIds::Aborted:
24+
insertTable.AddAborted(constructor.BuildInsertedOrAborted(dsGroupSelector), true);
5125
break;
5226
}
5327
if (!rowset.Next()) {

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -977,4 +977,124 @@ class TIndexChunkLoadContext {
977977
}
978978
};
979979

980-
}
980+
class TInsertTableRecordLoadContext {
981+
private:
982+
NColumnShard::Schema::EInsertTableIds RecType;
983+
ui64 PlanStep;
984+
ui64 WriteTxId;
985+
ui64 PathId;
986+
YDB_ACCESSOR_DEF(TString, DedupId);
987+
ui64 SchemaVersion;
988+
TString BlobIdString;
989+
std::optional<NOlap::TUnifiedBlobId> BlobId;
990+
TString MetadataString;
991+
std::optional<NKikimrTxColumnShard::TLogicalMetadata> Metadata;
992+
std::optional<ui64> RangeOffset;
993+
std::optional<ui64> RangeSize;
994+
995+
void Prepare(const IBlobGroupSelector* dsGroupSelector) {
996+
AFL_VERIFY(!PreparedFlag);
997+
PreparedFlag = true;
998+
TString error;
999+
NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(BlobIdString, dsGroupSelector, error);
1000+
Y_ABORT_UNLESS(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());
1001+
BlobId = blobId;
1002+
1003+
NKikimrTxColumnShard::TLogicalMetadata meta;
1004+
AFL_VERIFY(MetadataString);
1005+
Y_ABORT_UNLESS(meta.ParseFromString(MetadataString));
1006+
Metadata = std::move(meta);
1007+
AFL_VERIFY(!!RangeOffset == !!RangeSize);
1008+
}
1009+
1010+
bool PreparedFlag = false;
1011+
bool ParsedFlag = false;
1012+
1013+
public:
1014+
TInsertWriteId GetInsertWriteId() const {
1015+
AFL_VERIFY(ParsedFlag);
1016+
AFL_VERIFY(RecType != NColumnShard::Schema::EInsertTableIds::Committed);
1017+
return (TInsertWriteId)WriteTxId;
1018+
}
1019+
1020+
NColumnShard::Schema::EInsertTableIds GetRecType() const {
1021+
AFL_VERIFY(ParsedFlag);
1022+
return RecType;
1023+
}
1024+
1025+
ui64 GetPlanStep() const {
1026+
AFL_VERIFY(ParsedFlag);
1027+
return PlanStep;
1028+
}
1029+
1030+
void Remove(NIceDb::TNiceDb& db) const {
1031+
AFL_VERIFY(ParsedFlag);
1032+
db.Table<NColumnShard::Schema::InsertTable>().Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId).Delete();
1033+
}
1034+
1035+
void Upsert(NIceDb::TNiceDb& db) const {
1036+
AFL_VERIFY(ParsedFlag);
1037+
using namespace NColumnShard;
1038+
if (RangeOffset) {
1039+
db.Table<Schema::InsertTable>()
1040+
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1041+
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString),
1042+
NIceDb::TUpdate<Schema::InsertTable::BlobRangeOffset>(*RangeOffset),
1043+
NIceDb::TUpdate<Schema::InsertTable::BlobRangeSize>(*RangeSize), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1044+
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
1045+
} else {
1046+
db.Table<Schema::InsertTable>()
1047+
.Key((ui8)RecType, PlanStep, WriteTxId, PathId, DedupId)
1048+
.Update(NIceDb::TUpdate<Schema::InsertTable::BlobId>(BlobIdString), NIceDb::TUpdate<Schema::InsertTable::Meta>(MetadataString),
1049+
NIceDb::TUpdate<Schema::InsertTable::SchemaVersion>(SchemaVersion));
1050+
}
1051+
}
1052+
1053+
template <class TRowset>
1054+
void ParseFromDatabase(TRowset& rowset) {
1055+
AFL_VERIFY(!ParsedFlag)("problem", "duplication parsing");
1056+
ParsedFlag = true;
1057+
using namespace NColumnShard;
1058+
RecType = (Schema::EInsertTableIds)rowset.template GetValue<Schema::InsertTable::Committed>();
1059+
PlanStep = rowset.template GetValue<Schema::InsertTable::PlanStep>();
1060+
WriteTxId = rowset.template GetValueOrDefault<Schema::InsertTable::WriteTxId>();
1061+
AFL_VERIFY(WriteTxId);
1062+
1063+
PathId = rowset.template GetValue<Schema::InsertTable::PathId>();
1064+
DedupId = rowset.template GetValue<Schema::InsertTable::DedupId>();
1065+
SchemaVersion =
1066+
rowset.template HaveValue<Schema::InsertTable::SchemaVersion>() ? rowset.template GetValue<Schema::InsertTable::SchemaVersion>() : 0;
1067+
BlobIdString = rowset.template GetValue<Schema::InsertTable::BlobId>();
1068+
MetadataString = rowset.template GetValue<Schema::InsertTable::Meta>();
1069+
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeOffset>()) {
1070+
RangeOffset = rowset.template GetValue<Schema::InsertTable::BlobRangeOffset>();
1071+
}
1072+
if (rowset.template HaveValue<Schema::InsertTable::BlobRangeSize>()) {
1073+
RangeSize = rowset.template GetValue<Schema::InsertTable::BlobRangeSize>();
1074+
}
1075+
}
1076+
1077+
NOlap::TCommittedData BuildCommitted(const IBlobGroupSelector* dsGroupSelector) {
1078+
Prepare(dsGroupSelector);
1079+
using namespace NColumnShard;
1080+
AFL_VERIFY(RecType == Schema::EInsertTableIds::Committed);
1081+
auto userData = std::make_shared<NOlap::TUserData>(PathId,
1082+
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
1083+
AFL_VERIFY(!!DedupId);
1084+
AFL_VERIFY(PlanStep);
1085+
return NOlap::TCommittedData(userData, PlanStep, WriteTxId, DedupId);
1086+
}
1087+
1088+
NOlap::TInsertedData BuildInsertedOrAborted(const IBlobGroupSelector* dsGroupSelector) {
1089+
Prepare(dsGroupSelector);
1090+
using namespace NColumnShard;
1091+
AFL_VERIFY(RecType != Schema::EInsertTableIds::Committed);
1092+
auto userData = std::make_shared<NOlap::TUserData>(PathId,
1093+
NOlap::TBlobRange(*BlobId, RangeOffset.value_or(0), RangeSize.value_or(BlobId->BlobSize())), *Metadata, SchemaVersion, std::nullopt);
1094+
AFL_VERIFY(!DedupId);
1095+
AFL_VERIFY(!PlanStep);
1096+
return NOlap::TInsertedData((TInsertWriteId)WriteTxId, userData);
1097+
}
1098+
};
1099+
1100+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ enum class ENormalizerSequentialId: ui32 {
5757
PortionsMetadata,
5858
CleanGranuleId,
5959
EmptyPortionsCleaner,
60+
CleanInsertionDedup,
6061

6162
MAX
6263
};
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#include "broken_insertion_dedup.h"
2+
3+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
namespace NKikimr::NOlap::NInsertionDedup {
7+
8+
class TNormalizerRemoveChanges: public INormalizerChanges {
9+
private:
10+
std::vector<TInsertTableRecordLoadContext> Insertions;
11+
public:
12+
virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normalizationContext*/) const override {
13+
NIceDb::TNiceDb db(txc.DB);
14+
for (auto&& i : Insertions) {
15+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "remove_aborted_record")("write_id", i.GetInsertWriteId());
16+
i.Remove(db);
17+
}
18+
return true;
19+
}
20+
virtual void ApplyOnComplete(const TNormalizationController& /*normalizationContext*/) const override {
21+
22+
}
23+
24+
virtual ui64 GetSize() const override {
25+
return Insertions.size();
26+
}
27+
28+
TNormalizerRemoveChanges(const std::vector<TInsertTableRecordLoadContext>& insertions)
29+
: Insertions(insertions)
30+
{
31+
32+
}
33+
};
34+
35+
class TNormalizerCleanDedupChanges: public INormalizerChanges {
36+
private:
37+
mutable std::vector<TInsertTableRecordLoadContext> Insertions;
38+
39+
public:
40+
virtual bool ApplyOnExecute(
41+
NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /*normalizationContext*/) const override {
42+
NIceDb::TNiceDb db(txc.DB);
43+
for (auto&& i : Insertions) {
44+
AFL_VERIFY(i.GetDedupId());
45+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "correct_record")("dedup", i.GetDedupId());
46+
i.Remove(db);
47+
i.SetDedupId("");
48+
i.Upsert(db);
49+
}
50+
return true;
51+
}
52+
virtual void ApplyOnComplete(const TNormalizationController& /*normalizationContext*/) const override {
53+
}
54+
55+
virtual ui64 GetSize() const override {
56+
return Insertions.size();
57+
}
58+
59+
TNormalizerCleanDedupChanges(const std::vector<TInsertTableRecordLoadContext>& insertions)
60+
: Insertions(insertions) {
61+
}
62+
};
63+
64+
65+
class TCollectionStates {
66+
private:
67+
YDB_READONLY_DEF(std::optional<TInsertTableRecordLoadContext>, Inserted);
68+
YDB_READONLY_DEF(std::optional<TInsertTableRecordLoadContext>, Aborted);
69+
public:
70+
void SetInserted(const TInsertTableRecordLoadContext& context) {
71+
AFL_VERIFY(!Inserted);
72+
Inserted = context;
73+
}
74+
void SetAborted(const TInsertTableRecordLoadContext& context) {
75+
AFL_VERIFY(!Aborted);
76+
Aborted = context;
77+
}
78+
};
79+
80+
TConclusion<std::vector<INormalizerTask::TPtr>> TInsertionsDedupNormalizer::DoInit(
81+
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
82+
NIceDb::TNiceDb db(txc.DB);
83+
84+
using namespace NColumnShard;
85+
auto rowset = db.Table<NColumnShard::Schema::InsertTable>().Select();
86+
if (!rowset.IsReady()) {
87+
return TConclusionStatus::Fail("cannot read insertion info");
88+
}
89+
THashMap<TInsertWriteId, TCollectionStates> insertions;
90+
while (!rowset.EndOfSet()) {
91+
TInsertTableRecordLoadContext constructor;
92+
constructor.ParseFromDatabase(rowset);
93+
if (constructor.GetRecType() == NColumnShard::Schema::EInsertTableIds::Committed) {
94+
AFL_VERIFY(constructor.GetPlanStep());
95+
} else {
96+
AFL_VERIFY(!constructor.GetPlanStep());
97+
if (constructor.GetRecType() == NColumnShard::Schema::EInsertTableIds::Aborted) {
98+
insertions[constructor.GetInsertWriteId()].SetAborted(constructor);
99+
} else if (constructor.GetRecType() == NColumnShard::Schema::EInsertTableIds::Inserted) {
100+
insertions[constructor.GetInsertWriteId()].SetInserted(constructor);
101+
} else {
102+
AFL_VERIFY(false);
103+
}
104+
}
105+
if (!rowset.Next()) {
106+
return TConclusionStatus::Fail("cannot read insertion info");
107+
}
108+
}
109+
110+
std::vector<INormalizerTask::TPtr> result;
111+
std::vector<TInsertTableRecordLoadContext> toRemove;
112+
std::vector<TInsertTableRecordLoadContext> toCleanDedup;
113+
for (auto&& [id, i] : insertions) {
114+
if (i.GetInserted() && i.GetAborted()) {
115+
toRemove.emplace_back(*i.GetInserted());
116+
if (i.GetAborted()->GetDedupId()) {
117+
toCleanDedup.emplace_back(*i.GetAborted());
118+
}
119+
} else if (i.GetAborted()) {
120+
if (i.GetAborted()->GetDedupId()) {
121+
toCleanDedup.emplace_back(*i.GetAborted());
122+
}
123+
} else if (i.GetInserted()) {
124+
if (i.GetInserted()->GetDedupId()) {
125+
toCleanDedup.emplace_back(*i.GetInserted());
126+
}
127+
} else {
128+
AFL_VERIFY(false);
129+
}
130+
if (toCleanDedup.size() == 1000) {
131+
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerCleanDedupChanges>(toCleanDedup)));
132+
toCleanDedup.clear();
133+
}
134+
if (toRemove.size() == 1000) {
135+
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerRemoveChanges>(toRemove)));
136+
toRemove.clear();
137+
}
138+
}
139+
if (toCleanDedup.size()) {
140+
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerCleanDedupChanges>(toCleanDedup)));
141+
toCleanDedup.clear();
142+
}
143+
if (toRemove.size()) {
144+
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TNormalizerRemoveChanges>(toRemove)));
145+
toRemove.clear();
146+
}
147+
148+
return result;
149+
}
150+
151+
} // namespace NKikimr::NOlap
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
7+
namespace NKikimr::NOlap::NInsertionDedup {
8+
9+
class TInsertionsDedupNormalizer: public TNormalizationController::INormalizerComponent {
10+
public:
11+
static TString GetClassNameStatic() {
12+
return "CleanInsertionDedup";
13+
}
14+
private:
15+
class TNormalizerResult;
16+
17+
static const inline INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer> Registrator =
18+
INormalizerComponent::TFactory::TRegistrator<TInsertionsDedupNormalizer>(GetClassNameStatic());
19+
20+
public:
21+
TInsertionsDedupNormalizer(const TNormalizationController::TInitContext&) {
22+
}
23+
24+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
25+
return ENormalizerSequentialId::CleanInsertionDedup;
26+
}
27+
28+
virtual TString GetClassName() const override {
29+
return GetClassNameStatic();
30+
}
31+
32+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
33+
};
34+
35+
}

ydb/core/tx/columnshard/normalizer/tablet/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ LIBRARY()
33
SRCS(
44
GLOBAL gc_counters.cpp
55
GLOBAL broken_txs.cpp
6+
GLOBAL broken_insertion_dedup.cpp
67
)
78

89
PEERDIR(

ydb/core/tx/columnshard/operations/manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class TLockFeatures: TMoveOnly {
7878

7979
void SetBroken() {
8080
SharingInfo->Broken = 1;
81-
SharingInfo->InternalGenerationCounter = (ui64)TSysTables::TLocksTable::TLock::ESetErrors::ErrorBroken;
81+
SharingInfo->InternalGenerationCounter = (i64)TSysTables::TLocksTable::TLock::ESetErrors::ErrorBroken;
8282
}
8383

8484
bool IsBroken() const {

0 commit comments

Comments
 (0)