Skip to content

Commit 3f0791d

Browse files
portions index simplification (#12414)
1 parent 9afb07b commit 3f0791d

File tree

7 files changed

+50
-359
lines changed

7 files changed

+50
-359
lines changed

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -240,24 +240,13 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
240240
}
241241

242242
{
243-
auto& portionsIndex =
244-
TablesManager.GetPrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>().GetGranuleVerified(writeMeta.GetTableId()).GetPortionsIndex();
245-
{
246-
const ui64 minMemoryRead = portionsIndex.GetMinRawMemoryRead();
247-
if (NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit < minMemoryRead) {
248-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "overlimit")("reason", "read_raw_memory")("current", minMemoryRead)(
249-
"limit", NOlap::TGlobalLimits::DefaultReduceMemoryIntervalLimit)("table_id", writeMeta.GetTableId());
250-
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::OverlimitReadRawMemory);
251-
}
252-
}
253-
254-
{
255-
const ui64 minMemoryRead = portionsIndex.GetMinBlobMemoryRead();
256-
if (NOlap::TGlobalLimits::DefaultBlobsMemoryIntervalLimit < minMemoryRead) {
257-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "overlimit")("reason", "read_blob_memory")("current", minMemoryRead)(
258-
"limit", NOlap::TGlobalLimits::DefaultBlobsMemoryIntervalLimit)("table_id", writeMeta.GetTableId());
259-
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::OverlimitReadBlobMemory);
260-
}
243+
auto status = TablesManager.GetPrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>()
244+
.GetGranuleVerified(writeMeta.GetTableId())
245+
.GetOptimizerPlanner()
246+
.CheckWriteData();
247+
if (status.IsFail()) {
248+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "writing_fail_through_compaction")("reason", status.GetErrorMessage());
249+
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria);
261250
}
262251
}
263252

@@ -298,10 +287,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
298287
<< Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID());
299288
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
300289

301-
NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager,
302-
Counters.GetIndexationCounters().SplitterCounters, Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot());
303-
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(
304-
BufferizationWriteActorId, std::move(writeData), context);
290+
NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
291+
Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot());
292+
std::shared_ptr<NConveyor::ITask> task =
293+
std::make_shared<NOlap::TBuildBatchesTask>(BufferizationWriteActorId, std::move(writeData), context);
305294
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
306295
}
307296
}
@@ -599,8 +588,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
599588
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());
600589
Y_ABORT_UNLESS(writeOperation);
601590
writeOperation->SetBehaviour(behaviour);
602-
NOlap::TWritingContext wContext(
603-
pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
591+
NOlap::TWritingContext wContext(pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
604592
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max());
605593
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
606594
writeOperation->Start(*this, arrowData, source, wContext);

ydb/core/tx/columnshard/counters/columnshard.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ enum class EWriteFailReason {
1717
NoTable /* "no_table" */,
1818
IncorrectSchema /* "incorrect_schema" */,
1919
Overload /* "overload" */,
20-
OverlimitReadRawMemory /* "overlimit_read_raw_memory" */,
21-
OverlimitReadBlobMemory /* "overlimit_read_blob_memory" */
20+
CompactionCriteria /* "compaction_criteria" */
2221
};
2322

2423
class TWriteCounters: public TCommonCountersOwner {

ydb/core/tx/columnshard/engines/changes/general_compaction.cpp

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -41,38 +41,9 @@ std::shared_ptr<NArrow::TColumnFilter> TGeneralCompactColumnEngineChanges::Build
4141
}
4242
}
4343
}
44-
NArrow::TColumnFilter filterCorrection = NArrow::TColumnFilter::BuildDenyFilter();
45-
auto pkSchema = resultSchema->GetIndexInfo().GetReplaceKey();
46-
NArrow::NMerger::TRWSortableBatchPosition pos(batch, 0, pkSchema->field_names(), {}, false);
47-
ui32 posCurrent = 0;
48-
auto excludedIntervalsInfo = GranuleMeta->GetPortionsIndex().GetIntervalFeatures(pInfo, portionsInUsage);
49-
for (auto&& i : excludedIntervalsInfo.GetExcludedIntervals()) {
50-
NArrow::NMerger::TSortableBatchPosition startForFound(i.GetStart().ToBatch(pkSchema), 0, pkSchema->field_names(), {}, false);
51-
NArrow::NMerger::TSortableBatchPosition finishForFound(i.GetFinish().ToBatch(pkSchema), 0, pkSchema->field_names(), {}, false);
52-
auto foundStart =
53-
NArrow::NMerger::TSortableBatchPosition::FindPosition(pos, pos.GetPosition(), batch->num_rows() - 1, startForFound, true);
54-
AFL_VERIFY(foundStart);
55-
AFL_VERIFY(!foundStart->IsLess())("pos", pos.DebugJson())("start", startForFound.DebugJson())("found", foundStart->DebugString());
56-
auto foundFinish =
57-
NArrow::NMerger::TSortableBatchPosition::FindPosition(pos, pos.GetPosition(), batch->num_rows() - 1, finishForFound, false);
58-
AFL_VERIFY(foundFinish);
59-
AFL_VERIFY(foundFinish->GetPosition() >= foundStart->GetPosition());
60-
if (foundFinish->GetPosition() > foundStart->GetPosition()) {
61-
AFL_VERIFY(!foundFinish->IsGreater())("pos", pos.DebugJson())("finish", finishForFound.DebugJson())(
62-
"found", foundFinish->DebugString());
63-
}
64-
filterCorrection.Add(foundStart->GetPosition() - posCurrent, false);
65-
if (foundFinish->IsGreater()) {
66-
filterCorrection.Add(foundFinish->GetPosition() - foundStart->GetPosition(), true);
67-
posCurrent = foundFinish->GetPosition();
68-
} else {
69-
filterCorrection.Add(foundFinish->GetPosition() - foundStart->GetPosition() + 1, true);
70-
posCurrent = foundFinish->GetPosition() + 1;
71-
}
44+
if (GranuleMeta->GetPortionsIndex().HasOlderIntervals(pInfo, portionsInUsage)) {
45+
filterDeleted = NArrow::TColumnFilter::BuildAllowFilter();
7246
}
73-
AFL_VERIFY(filterCorrection.Size() <= batch->num_rows());
74-
filterCorrection.Add(false, batch->num_rows() - filterCorrection.Size());
75-
filterDeleted = filterDeleted.Or(filterCorrection);
7647
}
7748
if (filter) {
7849
*filter = filter->And(filterDeleted);

ydb/core/tx/columnshard/engines/storage/granule/granule.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ class TGranuleMeta: TNonCopyable {
165165
return ActualizationIndex->CollectMetadataRequests(Portions);
166166
}
167167

168+
const NStorageOptimizer::IOptimizerPlanner& GetOptimizerPlanner() const {
169+
return *OptimizerPlanner;
170+
}
171+
168172
std::shared_ptr<ITxReader> BuildLoader(const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector, const TVersionedIndex& vIndex);
169173
bool TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedIndex);
170174
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& GetDataAccessorsManager() const {

ydb/core/tx/columnshard/engines/storage/granule/portions_index.cpp

Lines changed: 14 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -3,114 +3,26 @@
33

44
namespace NKikimr::NOlap::NGranule::NPortionsIndex {
55

6-
TPortionsIndex::TPortionIntervals TPortionsIndex::GetIntervalFeatures(const TPortionInfo& inputPortion, const THashSet<ui64>& skipPortions) const {
7-
auto itFrom = Points.find(inputPortion.IndexKeyStart());
8-
AFL_VERIFY(itFrom != Points.end());
9-
auto itTo = Points.find(inputPortion.IndexKeyEnd());
10-
AFL_VERIFY(itTo != Points.end());
11-
TPortionIntervals portionExcludeIntervals;
12-
while (true) {
13-
std::optional<NArrow::TReplaceKey> nextKey;
14-
for (auto&& [p, _] : itFrom->second.GetPortionIds()) {
15-
if (skipPortions.contains(p)) {
16-
continue;
17-
}
18-
const auto& portionCross = Owner.GetPortionVerified(p);
19-
if (!portionCross.CrossSSWith(inputPortion)) {
20-
continue;
21-
}
22-
if (!nextKey || *nextKey < portionCross.IndexKeyEnd()) {
23-
nextKey = portionCross.IndexKeyEnd();
24-
}
6+
bool TPortionsIndex::HasOlderIntervals(const TPortionInfo& inputPortion, const THashSet<ui64>& skipPortions) const {
7+
for (auto&& [_, p] : Portions) {
8+
if (p->GetPortionId() == inputPortion.GetPortionId()) {
9+
continue;
2510
}
26-
if (nextKey) {
27-
nextKey = std::min(inputPortion.IndexKeyEnd(), *nextKey);
28-
portionExcludeIntervals.Add(itFrom->first, *nextKey);
29-
auto itFromNext = Points.find(*nextKey);
30-
AFL_VERIFY(itFromNext != Points.end());
31-
if (itFromNext == itTo) {
32-
break;
33-
}
34-
if (itFromNext == itFrom) {
35-
++itFrom;
36-
} else {
37-
itFrom = itFromNext;
38-
}
39-
AFL_VERIFY(itFrom != Points.end());
40-
} else {
41-
if (itFrom == itTo) {
42-
break;
43-
}
44-
++itFrom;
45-
AFL_VERIFY(itFrom != Points.end());
11+
if (inputPortion.IndexKeyEnd() < p->IndexKeyStart()) {
12+
continue;
4613
}
47-
48-
}
49-
return portionExcludeIntervals;
50-
}
51-
52-
void TPortionsIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& p) {
53-
auto itFrom = Points.find(p->IndexKeyStart());
54-
AFL_VERIFY(itFrom != Points.end());
55-
auto itTo = Points.find(p->IndexKeyEnd());
56-
AFL_VERIFY(itTo != Points.end());
57-
{
58-
const TPortionInfoStat stat(p);
59-
auto it = itFrom;
60-
while (true) {
61-
RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
62-
it->second.RemoveContained(stat);
63-
RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
64-
BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
65-
if (it == itTo) {
66-
break;
67-
}
68-
AFL_VERIFY(++it != Points.end());
69-
}
70-
}
71-
if (itFrom != itTo) {
72-
itFrom->second.RemoveStart(p);
73-
if (itFrom->second.IsEmpty()) {
74-
RemoveFromMemoryUsageControl(itFrom->second.GetIntervalStats());
75-
Points.erase(itFrom);
14+
if (p->IndexKeyEnd() < inputPortion.IndexKeyStart()) {
15+
continue;
7616
}
77-
itTo->second.RemoveFinish(p);
78-
if (itTo->second.IsEmpty()) {
79-
RemoveFromMemoryUsageControl(itTo->second.GetIntervalStats());
80-
Points.erase(itTo);
17+
if (skipPortions.contains(p->GetPortionId())) {
18+
continue;
8119
}
82-
} else {
83-
itTo->second.RemoveStart(p);
84-
itTo->second.RemoveFinish(p);
85-
if (itTo->second.IsEmpty()) {
86-
RemoveFromMemoryUsageControl(itTo->second.GetIntervalStats());
87-
Points.erase(itTo);
88-
}
89-
}
90-
RawMemoryUsage.FlushCounters();
91-
BlobMemoryUsage.FlushCounters();
92-
}
93-
94-
void TPortionsIndex::AddPortion(const std::shared_ptr<TPortionInfo>& p) {
95-
auto itFrom = InsertPoint(p->IndexKeyStart());
96-
itFrom->second.AddStart(p);
97-
auto itTo = InsertPoint(p->IndexKeyEnd());
98-
itTo->second.AddFinish(p);
99-
100-
auto it = itFrom;
101-
const TPortionInfoStat stat(p);
102-
while (true) {
103-
RemoveFromMemoryUsageControl(it->second.GetIntervalStats());
104-
it->second.AddContained(stat);
105-
RawMemoryUsage.Add(it->second.GetIntervalStats().GetMinRawBytes());
106-
BlobMemoryUsage.Add(it->second.GetIntervalStats().GetBlobBytes());
107-
if (it == itTo) {
108-
break;
20+
if (inputPortion.RecordSnapshotMax() < p->RecordSnapshotMin()) {
21+
continue;
10922
}
110-
AFL_VERIFY(++it != Points.end());
23+
return true;
11124
}
112-
RawMemoryUsage.FlushCounters();
113-
BlobMemoryUsage.FlushCounters();
25+
return false;
11426
}
11527

11628
}

0 commit comments

Comments
 (0)