Skip to content

Commit ac47d76

Browse files
accessors fetching control (#12859)
1 parent c6318e6 commit ac47d76

File tree

21 files changed

+254
-95
lines changed

21 files changed

+254
-95
lines changed

ydb/core/tx/columnshard/columnshard__statistics.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ class TColumnPortionsAccumulator {
133133
const std::shared_ptr<TResultAccumulator> Result;
134134
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
135135
const std::set<ui32> ColumnTagsRequested;
136+
virtual const std::shared_ptr<const TAtomicCounter>& DoGetAbortionFlag() const override {
137+
return Default<std::shared_ptr<const TAtomicCounter>>();
138+
}
136139

137140
virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override {
138141
THashMap<ui32, std::unique_ptr<TCountMinSketch>> sketchesByColumns;

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,9 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
619619
class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber {
620620
private:
621621
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
622+
virtual const std::shared_ptr<const TAtomicCounter>& DoGetAbortionFlag() const override {
623+
return Default<std::shared_ptr<const TAtomicCounter>>();
624+
}
622625

623626
virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final {
624627
AFL_VERIFY(ResourcesGuard);
@@ -1422,13 +1425,13 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14221425

14231426
public:
14241427
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
1425-
THashMap<ui64, NOlap::TPortionInfo::TConstPtr>&& portions, const TString& consumer)
1428+
std::vector<NOlap::TPortionInfo::TConstPtr>&& portions, const TString& consumer)
14261429
: TBase(self)
14271430
, FetchCallback(fetchCallback)
14281431
, Consumer(consumer)
14291432
{
14301433
for (auto&& i : portions) {
1431-
PortionsByPath[i.second->GetPathId()].emplace_back(i.second);
1434+
PortionsByPath[i->GetPathId()].emplace_back(i);
14321435
}
14331436
}
14341437

ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,15 @@
55

66
namespace NKikimr::NOlap::NDataAccessorControl {
77

8-
THashMap<ui64, TPortionDataAccessor> IGranuleDataAccessor::AskData(
8+
void IGranuleDataAccessor::AskData(
99
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
1010
AFL_VERIFY(portions.size());
11-
return DoAskData(portions, callback, consumer);
11+
DoAskData(portions, callback, consumer);
12+
}
13+
14+
TDataCategorized IGranuleDataAccessor::AnalyzeData(
15+
const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) {
16+
return DoAnalyzeData(portions, consumer);
1217
}
1318

1419
void TActorAccessorsCallback::OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) {

ydb/core/tx/columnshard/data_accessor/abstract/collector.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,27 @@ class TActorAccessorsCallback: public IAccessorCallback {
2020
}
2121
};
2222

23+
class TDataCategorized {
24+
private:
25+
YDB_READONLY_DEF(std::vector<TPortionInfo::TConstPtr>, PortionsToAsk);
26+
YDB_READONLY_DEF(std::vector<TPortionDataAccessor>, CachedAccessors);
27+
28+
public:
29+
void AddToAsk(const TPortionInfo::TConstPtr& p) {
30+
PortionsToAsk.emplace_back(p);
31+
}
32+
void AddFromCache(const TPortionDataAccessor& accessor) {
33+
CachedAccessors.emplace_back(accessor);
34+
}
35+
};
36+
2337
class IGranuleDataAccessor {
2438
private:
2539
const ui64 PathId;
2640

27-
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
41+
virtual void DoAskData(
2842
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) = 0;
43+
virtual TDataCategorized DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) = 0;
2944
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) = 0;
3045

3146
public:
@@ -39,8 +54,9 @@ class IGranuleDataAccessor {
3954
: PathId(pathId) {
4055
}
4156

42-
THashMap<ui64, TPortionDataAccessor> AskData(
57+
void AskData(
4358
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer);
59+
TDataCategorized AnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer);
4460
void ModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
4561
return DoModifyPortions(add, remove);
4662
}

ydb/core/tx/columnshard/data_accessor/events.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,12 @@ class TEvUnregisterController
7777

7878
class TEvAskTabletDataAccessors: public NActors::TEventLocal<TEvAskTabletDataAccessors, NColumnShard::TEvPrivate::EEv::EvAskTabletDataAccessors> {
7979
private:
80-
using TPortions = THashMap<ui64, TPortionInfo::TConstPtr>;
81-
YDB_ACCESSOR_DEF(TPortions, Portions);
80+
YDB_ACCESSOR_DEF(std::vector<TPortionInfo::TConstPtr>, Portions);
8281
YDB_READONLY_DEF(std::shared_ptr<NDataAccessorControl::IAccessorCallback>, Callback);
8382
YDB_READONLY_DEF(TString, Consumer);
8483

8584
public:
86-
explicit TEvAskTabletDataAccessors(const THashMap<ui64, TPortionInfo::TConstPtr>& portions,
85+
explicit TEvAskTabletDataAccessors(const std::vector<TPortionInfo::TConstPtr>& portions,
8786
const std::shared_ptr<NDataAccessorControl::IAccessorCallback>& callback, const TString& consumer)
8887
: Portions(portions)
8988
, Callback(callback)

ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@
22

33
namespace NKikimr::NOlap::NDataAccessorControl::NInMem {
44

5-
THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
5+
void TCollector::DoAskData(
66
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& /*callback*/, const TString& /*consumer*/) {
7-
THashMap<ui64, TPortionDataAccessor> accessors;
7+
AFL_VERIFY(portions.empty());
8+
}
9+
10+
TDataCategorized TCollector::DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& /*consumer*/) {
11+
TDataCategorized result;
812
for (auto&& i : portions) {
913
auto it = Accessors.find(i->GetPortionId());
1014
AFL_VERIFY(it != Accessors.end());
11-
accessors.emplace(i->GetPortionId(), it->second);
15+
result.AddFromCache(it->second);
1216
}
13-
return accessors;
17+
return result;
1418
}
1519

1620
void TCollector::DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
@@ -22,4 +26,4 @@ void TCollector::DoModifyPortions(const std::vector<TPortionDataAccessor>& add,
2226
}
2327
}
2428

25-
}
29+
} // namespace NKikimr::NOlap::NDataAccessorControl::NInMem

ydb/core/tx/columnshard/data_accessor/in_mem/collector.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ class TCollector: public IGranuleDataAccessor {
66
private:
77
using TBase = IGranuleDataAccessor;
88
THashMap<ui64, TPortionDataAccessor> Accessors;
9-
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
10-
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
9+
virtual void DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback,
10+
const TString& consumer) override;
11+
virtual TDataCategorized DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) override;
1112
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add,
1213
const std::vector<ui64>& remove) override;
1314

ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,25 @@
33
#include <ydb/core/tx/columnshard/data_accessor/events.h>
44
namespace NKikimr::NOlap::NDataAccessorControl::NLocalDB {
55

6-
THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
6+
void TCollector::DoAskData(
77
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
8-
THashMap<ui64, TPortionDataAccessor> accessors;
9-
THashMap<ui64, TPortionInfo::TConstPtr> portionsToDirectAsk;
8+
if (portions.size()) {
9+
NActors::TActivationContext::Send(
10+
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portions, callback, consumer));
11+
}
12+
}
13+
14+
TDataCategorized TCollector::DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) {
15+
TDataCategorized result;
1016
for (auto&& p : portions) {
1117
auto it = AccessorsCache.Find(p->GetPortionId());
1218
if (it != AccessorsCache.End() && it.Key() == p->GetPortionId()) {
13-
accessors.emplace(p->GetPortionId(), it.Value());
19+
result.AddFromCache(it.Value());
1420
} else {
15-
portionsToDirectAsk.emplace(p->GetPortionId(), p);
21+
result.AddToAsk(p);
1622
}
1723
}
18-
if (portionsToDirectAsk.size()) {
19-
NActors::TActivationContext::Send(
20-
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portionsToDirectAsk, callback, consumer));
21-
}
22-
return accessors;
24+
return result;
2325
}
2426

2527
void TCollector::DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {

ydb/core/tx/columnshard/data_accessor/local_db/collector.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ class TCollector: public IGranuleDataAccessor {
1717

1818
TLRUCache<ui64, TPortionDataAccessor, TNoopDelete, TMetadataSizeProvider> AccessorsCache;
1919
using TBase = IGranuleDataAccessor;
20-
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions,
20+
virtual void DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions,
2121
const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
22+
virtual TDataCategorized DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) override;
2223
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) override;
2324

2425
public:

ydb/core/tx/columnshard/data_accessor/manager.cpp

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,112 @@
22

33
namespace NKikimr::NOlap::NDataAccessorControl {
44

5+
void TLocalManager::DrainQueue() {
6+
THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk;
7+
std::optional<ui64> lastPathId;
8+
IGranuleDataAccessor* lastDataAccessor = nullptr;
9+
ui32 countToFlight = 0;
10+
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
11+
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
12+
if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) {
13+
PortionsAsk.pop_front();
14+
continue;
15+
}
16+
auto p = PortionsAsk.front().ExtractPortion();
17+
PortionsAsk.pop_front();
18+
if (!lastPathId || *lastPathId != p->GetPathId()) {
19+
lastPathId = p->GetPathId();
20+
auto it = Managers.find(p->GetPathId());
21+
if (it == Managers.end()) {
22+
lastDataAccessor = nullptr;
23+
} else {
24+
lastDataAccessor = it->second.get();
25+
}
26+
}
27+
if (!lastDataAccessor) {
28+
auto it = RequestsByPortion.find(p->GetPortionId());
29+
AFL_VERIFY(it != RequestsByPortion.end());
30+
for (auto&& i : it->second) {
31+
if (!i->IsFetched()) {
32+
i->AddError(p->GetPathId(), "path id absent");
33+
}
34+
}
35+
RequestsByPortion.erase(it);
36+
} else {
37+
portionsToAsk[p->GetPathId()].emplace_back(p);
38+
++countToFlight;
39+
}
40+
}
41+
for (auto&& i : portionsToAsk) {
42+
auto it = Managers.find(i.first);
43+
AFL_VERIFY(it != Managers.end());
44+
auto dataAnalyzed = it->second->AnalyzeData(i.second, "ANALYZE");
45+
for (auto&& accessor : dataAnalyzed.GetCachedAccessors()) {
46+
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
47+
AFL_VERIFY(it != RequestsByPortion.end());
48+
for (auto&& i : it->second) {
49+
if (!i->IsFetched()) {
50+
i->AddAccessor(accessor);
51+
}
52+
}
53+
RequestsByPortion.erase(it);
54+
AFL_VERIFY(countToFlight);
55+
--countToFlight;
56+
}
57+
if (dataAnalyzed.GetPortionsToAsk().size()) {
58+
it->second->AskData(dataAnalyzed.GetPortionsToAsk(), AccessorCallback, "ANALYZE");
59+
}
60+
}
61+
}
62+
PortionsAskInFlight += countToFlight;
63+
}
64+
65+
void TLocalManager::DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) {
66+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString());
67+
for (auto&& pathId : request->GetPathIds()) {
68+
auto portions = request->StartFetching(pathId);
69+
for (auto&& [_, i] : portions) {
70+
auto itRequest = RequestsByPortion.find(i->GetPortionId());
71+
if (itRequest == RequestsByPortion.end()) {
72+
AFL_VERIFY(RequestsByPortion.emplace(i->GetPortionId(), std::vector<std::shared_ptr<TDataAccessorsRequest>>({request})).second);
73+
PortionsAsk.emplace_back(i, request->GetAbortionFlag());
74+
} else {
75+
itRequest->second.emplace_back(request);
76+
}
77+
}
78+
}
79+
DrainQueue();
80+
}
81+
82+
void TLocalManager::DoRegisterController(std::unique_ptr<IGranuleDataAccessor>&& controller, const bool update) {
83+
if (update) {
84+
auto it = Managers.find(controller->GetPathId());
85+
if (it != Managers.end()) {
86+
it->second = std::move(controller);
87+
}
88+
} else {
89+
AFL_VERIFY(Managers.emplace(controller->GetPathId(), std::move(controller)).second);
90+
}
91+
}
92+
93+
void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) {
94+
{
95+
auto it = Managers.find(accessor.GetPortionInfo().GetPathId());
96+
AFL_VERIFY(it != Managers.end());
97+
it->second->ModifyPortions({ accessor }, {});
98+
}
99+
{
100+
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
101+
if (it != RequestsByPortion.end()) {
102+
for (auto&& i : it->second) {
103+
i->AddAccessor(accessor);
104+
}
105+
AFL_VERIFY(PortionsAskInFlight);
106+
--PortionsAskInFlight;
107+
}
108+
RequestsByPortion.erase(it);
109+
}
110+
DrainQueue();
111+
}
112+
5113
} // namespace NKikimr::NOlap

0 commit comments

Comments
 (0)