Skip to content

Commit 094be61

Browse files
fetcher accessors signals (#13038)
1 parent 24c8b75 commit 094be61

File tree

2 files changed

+33
-6
lines changed

2 files changed

+33
-6
lines changed

ydb/core/tx/columnshard/data_accessor/manager.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ void TLocalManager::DrainQueue() {
5757
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
5858
AFL_VERIFY(it != RequestsByPortion.end());
5959
for (auto&& i : it->second) {
60+
Counters.ResultFromCache->Add(1);
6061
if (!i->IsFetched() && !i->IsAborted()) {
6162
i->AddAccessor(accessor);
6263
}
@@ -66,11 +67,14 @@ void TLocalManager::DrainQueue() {
6667
--countToFlight;
6768
}
6869
if (dataAnalyzed.GetPortionsToAsk().size()) {
70+
Counters.ResultAskDirectly->Add(dataAnalyzed.GetPortionsToAsk().size());
6971
it->second->AskData(dataAnalyzed.GetPortionsToAsk(), AccessorCallback, "ANALYZE");
7072
}
7173
}
7274
}
7375
PortionsAskInFlight += countToFlight;
76+
Counters.FetchingCount->Set(PortionsAskInFlight);
77+
Counters.QueueSize->Set(PortionsAsk.size());
7478
}
7579

7680
void TLocalManager::DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) {
@@ -82,8 +86,10 @@ void TLocalManager::DoAskData(const std::shared_ptr<TDataAccessorsRequest>& requ
8286
if (itRequest == RequestsByPortion.end()) {
8387
AFL_VERIFY(RequestsByPortion.emplace(i->GetPortionId(), std::vector<std::shared_ptr<TDataAccessorsRequest>>({request})).second);
8488
PortionsAsk.emplace_back(i, request->GetAbortionFlag());
89+
Counters.AskNew->Add(1);
8590
} else {
8691
itRequest->second.emplace_back(request);
92+
Counters.AskDuplication->Add(1);
8793
}
8894
}
8995
}

ydb/core/tx/columnshard/data_accessor/manager.h

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,29 @@
88

99
namespace NKikimr::NOlap::NDataAccessorControl {
1010

11+
class TAccessorSignals: public NColumnShard::TCommonCountersOwner {
12+
private:
13+
using TBase = NColumnShard::TCommonCountersOwner;
14+
15+
public:
16+
const NMonitoring::TDynamicCounters::TCounterPtr QueueSize;
17+
const NMonitoring::TDynamicCounters::TCounterPtr FetchingCount;
18+
const NMonitoring::TDynamicCounters::TCounterPtr AskNew;
19+
const NMonitoring::TDynamicCounters::TCounterPtr AskDuplication;
20+
const NMonitoring::TDynamicCounters::TCounterPtr ResultFromCache;
21+
const NMonitoring::TDynamicCounters::TCounterPtr ResultAskDirectly;
22+
23+
TAccessorSignals()
24+
: TBase("AccessorsFetching")
25+
, QueueSize(TBase::GetValue("Queue/Count"))
26+
, FetchingCount(TBase::GetValue("Fetching/Count"))
27+
, AskNew(TBase::GetDeriviative("Ask/Fault/Count"))
28+
, AskDuplication(TBase::GetDeriviative("Ask/Duplication/Count"))
29+
, ResultFromCache(TBase::GetDeriviative("ResultFromCache/Count"))
30+
, ResultAskDirectly(TBase::GetDeriviative("ResultAskDirectly/Count")) {
31+
}
32+
};
33+
1134
class IDataAccessorsManager {
1235
private:
1336
virtual void DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) = 0;
@@ -80,10 +103,8 @@ class TActorAccessorsManager: public IDataAccessorsManager {
80103
public:
81104
TActorAccessorsManager(const NActors::TActorId& actorId, const NActors::TActorId& tabletActorId)
82105
: TBase(tabletActorId)
83-
, ActorId(actorId)
84-
, AccessorsCallback(std::make_shared<TActorAccessorsCallback>(ActorId))
85-
{
86-
106+
, ActorId(actorId)
107+
, AccessorsCallback(std::make_shared<TActorAccessorsCallback>(ActorId)) {
87108
AFL_VERIFY(!!tabletActorId);
88109
}
89110
};
@@ -93,6 +114,7 @@ class TLocalManager: public IDataAccessorsManager {
93114
using TBase = IDataAccessorsManager;
94115
THashMap<ui64, std::unique_ptr<IGranuleDataAccessor>> Managers;
95116
THashMap<ui64, std::vector<std::shared_ptr<TDataAccessorsRequest>>> RequestsByPortion;
117+
TAccessorSignals Counters;
96118
const std::shared_ptr<IAccessorCallback> AccessorCallback;
97119

98120
class TPortionToAsk {
@@ -157,8 +179,7 @@ class TLocalManager: public IDataAccessorsManager {
157179

158180
TLocalManager(const std::shared_ptr<IAccessorCallback>& callback)
159181
: TBase(NActors::TActorId())
160-
, AccessorCallback(callback)
161-
{
182+
, AccessorCallback(callback) {
162183
}
163184
};
164185

0 commit comments

Comments
 (0)