Skip to content

Commit bbdc254

Browse files
fix simple reading with accessors fetching (#12000)
1 parent cb1675a commit bbdc254

File tree

9 files changed

+67
-43
lines changed

9 files changed

+67
-43
lines changed

ydb/core/tx/columnshard/engines/changes/with_appended.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
7777
break;
7878
}
7979
}
80-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("portions", sb)("task_id", GetTaskIdentifier());
80+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portions", sb)("task_id", GetTaskIdentifier());
8181
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size());
8282

8383
for (auto& [_, portionInfo] : PortionsToRemove) {

ydb/core/tx/columnshard/engines/reader/common/result.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace NKikimr::NOlap::NReader {
1212
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
1313
class TPartialReadResult: public TNonCopyable {
1414
private:
15-
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>, ResourcesGuard);
15+
YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards);
1616
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TGroupGuard>, GroupGuard);
1717
NArrow::TShardedRecordBatch ResultBatch;
1818

@@ -54,11 +54,11 @@ class TPartialReadResult: public TNonCopyable {
5454
return ScanCursor;
5555
}
5656

57-
explicit TPartialReadResult(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& resourcesGuard,
58-
std::shared_ptr<NGroupedMemoryManager::TGroupGuard>&& gGuard, const NArrow::TShardedRecordBatch& batch,
57+
explicit TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
58+
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
5959
const std::shared_ptr<IScanCursor>& scanCursor, const std::optional<ui32> notFinishedIntervalIdx)
60-
: ResourcesGuard(std::move(resourcesGuard))
61-
, GroupGuard(std::move(gGuard))
60+
: ResourceGuards(resourceGuards)
61+
, GroupGuard(gGuard)
6262
, ResultBatch(batch)
6363
, ScanCursor(scanCursor)
6464
, NotFinishedIntervalIdx(notFinishedIntervalIdx) {
@@ -68,7 +68,7 @@ class TPartialReadResult: public TNonCopyable {
6868

6969
explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor,
7070
const std::optional<ui32> notFinishedIntervalIdx)
71-
: TPartialReadResult(nullptr, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
71+
: TPartialReadResult({}, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
7272
}
7373
};
7474

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
2727
} else {
2828
gGuard = itInterval->second->GetGroupGuard();
2929
}
30-
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(std::move(allocationGuard), std::move(gGuard), *newBatch,
30+
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> guards = { std::move(allocationGuard) };
31+
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(guards, std::move(gGuard), *newBatch,
3132
std::make_shared<TPlainScanCursor>(lastPK), callbackIdxSubscriver)).second);
3233
} else {
3334
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second);

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -96,27 +96,27 @@ class TColumnsAccumulator {
9696
const bool sequential) {
9797
auto actualColumns = columns - AssemblerReadyColumns;
9898
AssemblerReadyColumns = AssemblerReadyColumns + columns;
99-
if (!actualColumns.IsEmpty()) {
100-
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
101-
if (sequential) {
102-
const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
103-
if (notSequentialColumnIds.size()) {
104-
script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
105-
std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
106-
script.AddStep<TAssemblerStep>(cross, purposeId);
107-
*actualSet = *actualSet - *cross;
108-
}
109-
if (!actualSet->IsEmpty()) {
110-
script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
111-
script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
112-
}
113-
} else {
114-
script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
115-
script.AddStep<TAssemblerStep>(actualSet, purposeId);
99+
if (actualColumns.IsEmpty()) {
100+
return false;
101+
}
102+
auto actualSet = std::make_shared<TColumnsSet>(actualColumns.GetColumnIds(), FullSchema);
103+
if (sequential) {
104+
const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet);
105+
if (notSequentialColumnIds.size()) {
106+
script.Allocation(notSequentialColumnIds, stage, EMemType::Raw);
107+
std::shared_ptr<TColumnsSet> cross = actualSet->BuildSamePtr(notSequentialColumnIds);
108+
script.AddStep<TAssemblerStep>(cross, purposeId);
109+
*actualSet = *actualSet - *cross;
116110
}
117-
return true;
111+
if (!actualSet->IsEmpty()) {
112+
script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential);
113+
script.AddStep<TOptionalAssemblerStep>(actualSet, purposeId);
114+
}
115+
} else {
116+
script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw);
117+
script.AddStep<TAssemblerStep>(actualSet, purposeId);
118118
}
119-
return false;
119+
return true;
120120
}
121121
};
122122

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
1616
while (FetchingSources.size()) {
1717
auto frontSource = *FetchingSources.begin();
1818
if (!frontSource->HasStageResult()) {
19+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result")("source_id", frontSource->GetSourceId())(
20+
"source_idx", frontSource->GetSourceIdx());
1921
break;
2022
}
2123
if (!frontSource->GetStageResult().HasResultChunk()) {
24+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_no_result_chunk")("source_id", frontSource->GetSourceId())(
25+
"source_idx", frontSource->GetSourceIdx());
2226
break;
2327
}
2428
auto table = frontSource->MutableStageResult().ExtractResultChunk();
@@ -28,24 +32,32 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
2832
sourceIdxToContinue = frontSource->GetSourceIdx();
2933
}
3034
if (table && table->num_rows()) {
35+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "has_result")("source_id", frontSource->GetSourceId())(
36+
"source_idx", frontSource->GetSourceIdx())("table", table->num_rows());
3137
auto cursor =
3238
std::make_shared<TSimpleScanCursor>(frontSource->GetStartPKRecordBatch(), frontSource->GetSourceId(), startIndex + recordsCount);
33-
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(nullptr, nullptr, table, cursor, sourceIdxToContinue));
39+
reader.OnIntervalResult(
40+
std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table, cursor, sourceIdxToContinue));
3441
} else if (sourceIdxToContinue) {
42+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())(
43+
"source_idx", frontSource->GetSourceIdx());
3544
ContinueSource(*sourceIdxToContinue);
3645
break;
3746
}
3847
if (!isFinished) {
3948
break;
4049
}
50+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "source_finished")("source_id", frontSource->GetSourceId())(
51+
"source_idx", frontSource->GetSourceIdx());
4152
AFL_VERIFY(FetchingSourcesByIdx.erase(frontSource->GetSourceIdx()));
4253
if (Context->GetCommonContext()->GetReadMetadata()->Limit) {
43-
FinishedSources.emplace(*FetchingSources.begin());
54+
frontSource->ClearResult();
55+
FinishedSources.emplace(frontSource);
4456
}
4557
FetchingSources.erase(FetchingSources.begin());
4658
while (FetchingSources.size() && FinishedSources.size()) {
47-
auto finishedSource = *FinishedSources.begin();
4859
auto fetchingSource = *FetchingSources.begin();
60+
auto finishedSource = *FinishedSources.begin();
4961
if (finishedSource->GetFinish() < fetchingSource->GetStart()) {
5062
FetchedCount += finishedSource->GetRecordsCount();
5163
}

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr)
2525
AFL_VERIFY(FetchingPlan);
2626
AFL_VERIFY(!Context->IsAborted());
2727
ProcessingStarted = true;
28+
SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
29+
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());
2830
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", SourceIdx);
2931
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
3032
TFetchingScriptCursor cursor(FetchingPlan, 0);
@@ -237,9 +239,7 @@ TPortionDataSource::TPortionDataSource(
237239
portion->RecordSnapshotMin(TSnapshot::Zero()), portion->RecordSnapshotMax(TSnapshot::Zero()), portion->GetRecordsCount(),
238240
portion->GetShardingVersionOptional(), portion->GetMeta().GetDeletionsCount())
239241
, Portion(portion)
240-
, Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion))
241-
, SourceGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
242-
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId())) {
242+
, Schema(GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*portion)) {
243243
}
244244

245245
} // namespace NKikimr::NOlap::NReader::NSimple

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class IDataSource: public ICursorEntity {
6060
YDB_READONLY(bool, HasDeletions, false);
6161
virtual NJson::TJsonValue DoDebugJson() const = 0;
6262
std::shared_ptr<TFetchingScript> FetchingPlan;
63-
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> ResourceGuards;
63+
YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards);
6464
YDB_READONLY(TPKRangeFilter::EUsageClass, UsageClass, TPKRangeFilter::EUsageClass::PartialUsage);
6565
bool ProcessingStarted = false;
6666
bool IsStartedByCursor = false;
@@ -74,7 +74,7 @@ class IDataSource: public ICursorEntity {
7474
}
7575

7676
std::optional<TFetchingScriptCursor> ScriptCursor;
77-
77+
std::shared_ptr<NGroupedMemoryManager::TGroupGuard> SourceGroupGuard;
7878
protected:
7979
std::optional<bool> IsSourceInMemoryFlag;
8080

@@ -95,11 +95,27 @@ class IDataSource: public ICursorEntity {
9595
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) = 0;
9696

9797
public:
98-
virtual ui64 GetMemoryGroupId() const = 0;
9998
bool GetIsStartedByCursor() const {
10099
return IsStartedByCursor;
101100
}
102101

102+
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& GetGroupGuard() const {
103+
AFL_VERIFY(SourceGroupGuard);
104+
return SourceGroupGuard;
105+
}
106+
107+
ui64 GetMemoryGroupId() const {
108+
AFL_VERIFY(SourceGroupGuard);
109+
return SourceGroupGuard->GetGroupId();
110+
}
111+
112+
virtual void ClearResult() {
113+
StageData.reset();
114+
StageResult.reset();
115+
ResourceGuards.clear();
116+
SourceGroupGuard = nullptr;
117+
}
118+
103119
void SetIsStartedByCursor() {
104120
IsStartedByCursor = true;
105121
}
@@ -323,7 +339,6 @@ class TPortionDataSource: public IDataSource {
323339
using TBase = IDataSource;
324340
const TPortionInfo::TConstPtr Portion;
325341
std::shared_ptr<ISnapshotSchema> Schema;
326-
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard> SourceGroupGuard;
327342

328343
void NeedFetchColumns(const std::set<ui32>& columnIds, TBlobsAction& blobsAction,
329344
THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo>& nullBlocks, const std::shared_ptr<NArrow::TColumnFilter>& filter);
@@ -365,10 +380,6 @@ class TPortionDataSource: public IDataSource {
365380
virtual bool DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) override;
366381

367382
public:
368-
virtual ui64 GetMemoryGroupId() const override {
369-
return SourceGroupGuard->GetGroupId();
370-
}
371-
372383
virtual ui64 PredictAccessorsSize() const override {
373384
return Portion->GetApproxChunksCount(GetContext()->GetCommonContext()->GetReadMetadata()->GetResultSchema()->GetColumnsCount()) * sizeof(TColumnRecord);
374385
}

ydb/core/tx/columnshard/engines/storage/granule/storage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ std::optional<NStorageOptimizer::TOptimizationPriority> TGranulesStorage::GetCom
6363
maxPriorityGranule = granulesSorted.front().GetGranule();
6464
break;
6565
}
66-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "granule_locked")("path_id", granulesSorted.front().GetGranule()->GetPathId());
66+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "granule_locked")("path_id", granulesSorted.front().GetGranule()->GetPathId());
6767
std::pop_heap(granulesSorted.begin(), granulesSorted.end());
6868
granulesSorted.pop_back();
6969
}

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ std::shared_ptr<TColumnEngineChanges> TOptimizerPlanner::DoGetOptimizationTask(
4747
result->SetPortionExpectedSize(levelPortions->GetExpectedPortionSize());
4848
}
4949
auto positions = data.GetCheckPositions(PrimaryKeysSchema, level->GetLevelId() > 1);
50-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())(
50+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())(
5151
"level", level->GetLevelId())("target", data.GetTargetCompactionLevel())("data", data.DebugString());
5252
result->SetCheckPoints(std::move(positions));
5353
for (auto&& i : result->GetSwitchedPortions()) {

0 commit comments

Comments
 (0)