Skip to content

Commit 2df3e61

Browse files
authored
keep scan counters guard until result delivery (#19298)
1 parent cc9ffec commit 2df3e61

File tree

13 files changed

+46
-60
lines changed

13 files changed

+46
-60
lines changed

ydb/core/tx/columnshard/columnshard_private_events.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <ydb/core/formats/arrow/special_keys.h>
88
#include <ydb/core/protos/counters_columnshard.pb.h>
9+
#include <ydb/core/tx/columnshard/counters/scan.h>
910
#include <ydb/core/tx/columnshard/engines/column_engine.h>
1011
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>
1112
#include <ydb/core/tx/columnshard/engines/writer/write_controller.h>
@@ -118,14 +119,17 @@ struct TEvPrivate {
118119
class TEvTaskProcessedResult: public NActors::TEventLocal<TEvTaskProcessedResult, EvTaskProcessedResult> {
119120
private:
120121
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>> Result;
122+
TCounterGuard ScanCounter;
121123

122124
public:
123125
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>> ExtractResult() {
124126
return std::move(Result);
125127
}
126128

127-
TEvTaskProcessedResult(const TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>>& result)
128-
: Result(result) {
129+
TEvTaskProcessedResult(
130+
const TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>>& result, TCounterGuard&& scanCounters)
131+
: Result(result)
132+
, ScanCounter(std::move(scanCounters)) {
129133
}
130134
};
131135

ydb/core/tx/columnshard/engines/reader/abstract/read_context.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ class TReadContext {
8888

8989
void AbortWithError(const TString& errorMessage) {
9090
if (AbortionFlag->Inc() == 1) {
91-
NActors::TActivationContext::Send(
92-
ScanActorId, std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail(errorMessage)));
91+
NActors::TActivationContext::Send(ScanActorId, std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(
92+
TConclusionStatus::Fail(errorMessage), Counters.GetResultsForReplyGuard()));
9393
}
9494
}
9595

ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ void TColumnShardScan::HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResu
8484
} else {
8585
ACFL_DEBUG("event", "TEvTaskProcessedResult");
8686
auto t = static_pointer_cast<IApplyAction>(result.GetResult());
87-
Y_DEBUG_ABORT_UNLESS(dynamic_pointer_cast<IDataTasksProcessor::ITask>(result.GetResult()));
8887
if (!ScanIterator->Finished()) {
8988
ScanIterator->Apply(t);
9089
}

ydb/core/tx/columnshard/engines/reader/common/conveyor_task.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@ namespace NKikimr::NOlap::NReader {
77
void IDataTasksProcessor::ITask::DoExecute(const std::shared_ptr<NConveyor::ITask>& taskPtr) {
88
auto result = DoExecuteImpl();
99
if (result.IsFail()) {
10-
NActors::TActivationContext::AsActorContext().Send(OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(result));
11-
} else {
1210
NActors::TActivationContext::AsActorContext().Send(
13-
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(static_pointer_cast<IDataTasksProcessor::ITask>(taskPtr)));
11+
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(result, std::move(Guard)));
12+
} else {
13+
NActors::TActivationContext::AsActorContext().Send(OwnerId,
14+
new NColumnShard::TEvPrivate::TEvTaskProcessedResult(static_pointer_cast<IDataTasksProcessor::ITask>(taskPtr), std::move(Guard)));
1415
}
1516
}
1617

1718
void IDataTasksProcessor::ITask::DoOnCannotExecute(const TString& reason) {
1819
NActors::TActivationContext::AsActorContext().Send(
19-
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(TConclusionStatus::Fail(reason)));
20+
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(TConclusionStatus::Fail(reason), std::move(Guard)));
2021
}
2122

2223
}

ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <ydb/core/tx/columnshard/counters/scan.h>
34
#include <ydb/core/tx/conveyor/usage/abstract.h>
45

56
#include <ydb/library/accessor/accessor.h>
@@ -17,6 +18,7 @@ class IApplyAction {
1718
bool Apply(IDataReader& indexedDataRead) const {
1819
return DoApply(indexedDataRead);
1920
}
21+
virtual ~IApplyAction() = default;
2022
};
2123

2224
class IDataTasksProcessor {
@@ -25,6 +27,7 @@ class IDataTasksProcessor {
2527
private:
2628
using TBase = NConveyor::ITask;
2729
const NActors::TActorId OwnerId;
30+
NColumnShard::TCounterGuard Guard;
2831
virtual TConclusionStatus DoExecuteImpl() = 0;
2932

3033
protected:
@@ -35,10 +38,9 @@ class IDataTasksProcessor {
3538
using TPtr = std::shared_ptr<ITask>;
3639
virtual ~ITask() = default;
3740

38-
ITask(const NActors::TActorId& ownerId)
41+
ITask(const NActors::TActorId& ownerId, NColumnShard::TCounterGuard&& scanCounter)
3942
: OwnerId(ownerId)
40-
{
41-
43+
, Guard(std::move(scanCounter)) {
4244
}
4345
};
4446
};

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& ra
1919
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())(
2020
"scan_actor_id", Context->GetCommonContext()->GetScanActorId())("status", status.GetErrorMessage())("status_code", status.GetStatus())(
2121
"storage_id", storageId);
22-
NActors::TActorContext::AsActorContext().Send(
23-
Context->GetCommonContext()->GetScanActorId(), std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(
24-
TConclusionStatus::Fail(TStringBuilder{} << "Error reading blob range for data: " << range.ToString() << ", error: " << status.GetErrorMessage() << ", status: " << NKikimrProto::EReplyStatus_Name(status.GetStatus()))));
22+
NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(),
23+
std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(
24+
TConclusionStatus::Fail(TStringBuilder{} << "Error reading blob range for data: " << range.ToString()
25+
<< ", error: " << status.GetErrorMessage()
26+
<< ", status: " << NKikimrProto::EReplyStatus_Name(status.GetStatus())),
27+
std::move(Guard)));
2528
return false;
2629
}
2730

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,18 @@ class TColumnsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnS
7474
THashMap<ui32, std::shared_ptr<IKernelFetchLogic>> DataFetchers;
7575
TFetchingScriptCursor Cursor;
7676
NBlobOperations::NRead::TCompositeReadBlobs ProvidedBlobs;
77-
const NColumnShard::TCounterGuard Guard;
77+
NColumnShard::TCounterGuard Guard;
7878
virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
7979
virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override {
8080
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())(
8181
"scan_actor_id", Source->GetContext()->GetCommonContext()->GetScanActorId())("status", status.GetErrorMessage())(
8282
"status_code", status.GetStatus())("storage_id", storageId);
8383
NActors::TActorContext::AsActorContext().Send(Source->GetContext()->GetCommonContext()->GetScanActorId(),
8484
std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(
85-
TConclusionStatus::Fail(TStringBuilder{} << "Error reading blob range for columns: " << range.ToString() << ", error: " << status.GetErrorMessage() << ", status: " << NKikimrProto::EReplyStatus_Name(status.GetStatus()))));
85+
TConclusionStatus::Fail(TStringBuilder{} << "Error reading blob range for columns: " << range.ToString()
86+
<< ", error: " << status.GetErrorMessage()
87+
<< ", status: " << NKikimrProto::EReplyStatus_Name(status.GetStatus())),
88+
std::move(Guard)));
8689
return false;
8790
}
8891

@@ -105,7 +108,7 @@ class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnSha
105108
const std::shared_ptr<IDataSource> Source;
106109
TFetchingScriptCursor Step;
107110
const std::shared_ptr<TSpecialReadContext> Context;
108-
const NColumnShard::TCounterGuard Guard;
111+
NColumnShard::TCounterGuard Guard;
109112

110113
virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
111114
virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override;

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,11 @@ TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(const std:
7474

7575
void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
7676
auto sourcePtr = Source.lock();
77+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "allocation_impossible")("error", errorMessage);
7778
if (sourcePtr) {
7879
FOR_DEBUG_LOG(NKikimrServices::COLUMNSHARD_SCAN_EVLOG, sourcePtr->AddEvent("fail_malloc"));
7980
sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
8081
"cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
81-
} else {
82-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "allocation_impossible")("error", errorMessage);
8382
}
8483
}
8584

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,9 @@ TConclusionStatus TStepAction::DoExecuteImpl() {
3838

3939
TStepAction::TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId,
4040
const bool changeSyncSection)
41-
: TBase(ownerActorId)
41+
: TBase(ownerActorId, source->GetContext()->GetCommonContext()->GetCounters().GetAssembleTasksGuard())
4242
, Source(source)
43-
, Cursor(std::move(cursor))
44-
, CountersGuard(Source->GetContext()->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) {
43+
, Cursor(std::move(cursor)) {
4544
if (changeSyncSection) {
4645
Source->StartAsyncSection();
4746
} else {

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,6 @@ class TStepAction: public IDataTasksProcessor::ITask {
320320
std::shared_ptr<IDataSource> Source;
321321
TFetchingScriptCursor Cursor;
322322
bool FinishedFlag = false;
323-
const NColumnShard::TCounterGuard CountersGuard;
324323

325324
protected:
326325
virtual bool DoApply(IDataReader& owner) const override;

0 commit comments

Comments
 (0)