Skip to content

Commit c6318e6

Browse files
blobs fetcher unification (#12858)
1 parent 1e8be6b commit c6318e6

File tree

10 files changed

+105
-96
lines changed

10 files changed

+105
-96
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#include "constructor.h"
2-
#include <ydb/core/tx/conveyor/usage/service.h>
2+
33
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
4+
#include <ydb/core/tx/conveyor/usage/service.h>
45

5-
namespace NKikimr::NOlap::NReader::NSimple {
6+
namespace NKikimr::NOlap::NReader::NCommon {
67

78
void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
89
Source->MutableStageData().AddBlobs(Source->DecodeBlobAddresses(ExtractBlobsData()));
@@ -12,22 +13,23 @@ void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSu
1213
}
1314

1415
bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
15-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId())
16-
("status", status.GetErrorMessage())("status_code", status.GetStatus())("storage_id", storageId);
17-
NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(),
18-
std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
16+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())(
17+
"scan_actor_id", Context->GetCommonContext()->GetScanActorId())("status", status.GetErrorMessage())("status_code", status.GetStatus())(
18+
"storage_id", storageId);
19+
NActors::TActorContext::AsActorContext().Send(
20+
Context->GetCommonContext()->GetScanActorId(), std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(
21+
TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
1922
return false;
2023
}
2124

2225
TBlobsFetcherTask::TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions,
2326
const std::shared_ptr<NCommon::IDataSource>& sourcePtr, const TFetchingScriptCursor& step,
24-
const std::shared_ptr<NCommon::TSpecialReadContext>& context,
25-
const TString& taskCustomer, const TString& externalTaskId)
27+
const std::shared_ptr<NCommon::TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId)
2628
: TBase(readActions, taskCustomer, externalTaskId)
2729
, Source(sourcePtr)
2830
, Step(step)
2931
, Context(context)
3032
, Guard(Context->GetCommonContext()->GetCounters().GetFetchBlobsGuard()) {
3133
}
3234

33-
}
35+
} // namespace NKikimr::NOlap::NReader::NCommon
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,37 @@
11
#pragma once
2-
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
3-
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
4-
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
5-
#include <ydb/core/tx/columnshard/blobs_reader/task.h>
6-
#include <ydb/core/tx/columnshard/blob.h>
2+
#include "fetching.h"
73
#include "source.h"
84

9-
namespace NKikimr::NOlap::NReader::NPlain {
5+
#include <ydb/core/tx/columnshard/blob.h>
6+
#include <ydb/core/tx/columnshard/blobs_reader/task.h>
7+
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
8+
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
9+
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
10+
11+
namespace NKikimr::NOlap::NReader::NCommon {
1012

1113
class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnShard::TMonitoringObjectsCounter<TBlobsFetcherTask> {
1214
private:
1315
using TBase = NBlobOperations::NRead::ITask;
14-
const std::shared_ptr<NCommon::IDataSource> Source;
16+
const std::shared_ptr<IDataSource> Source;
1517
TFetchingScriptCursor Step;
16-
const std::shared_ptr<NCommon::TSpecialReadContext> Context;
18+
const std::shared_ptr<TSpecialReadContext> Context;
19+
const NColumnShard::TCounterGuard Guard;
1720

1821
virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
1922
virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override;
23+
2024
public:
21-
TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, const std::shared_ptr<NCommon::IDataSource>& sourcePtr,
25+
template <class TSource>
26+
TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, const std::shared_ptr<TSource>& sourcePtr,
2227
const TFetchingScriptCursor& step, const std::shared_ptr<NCommon::TSpecialReadContext>& context, const TString& taskCustomer,
2328
const TString& externalTaskId)
24-
: TBase(readActions, taskCustomer, externalTaskId)
25-
, Source(sourcePtr)
26-
, Step(step)
27-
, Context(context)
28-
{
29-
29+
: TBlobsFetcherTask(readActions, std::static_pointer_cast<IDataSource>(sourcePtr), step, context, taskCustomer, externalTaskId) {
3030
}
31+
32+
TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions,
33+
const std::shared_ptr<NCommon::IDataSource>& sourcePtr, const TFetchingScriptCursor& step,
34+
const std::shared_ptr<NCommon::TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId);
3135
};
3236

33-
}
37+
} // namespace NKikimr::NOlap::NReader::NCommon

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22
#include "columns_set.h"
33

4+
#include <ydb/core/tx/columnshard/counters/common/owner.h>
45
#include <ydb/core/tx/columnshard/counters/scan.h>
56
#include <ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h>
67

@@ -17,11 +18,58 @@ class IDataSource;
1718
class TSpecialReadContext;
1819
class TFetchingScriptCursor;
1920

20-
class IFetchingStep {
21+
class TFetchingStepSignals: public NColumnShard::TCommonCountersOwner {
22+
private:
23+
using TBase = NColumnShard::TCommonCountersOwner;
24+
NMonitoring::TDynamicCounters::TCounterPtr DurationCounter;
25+
NMonitoring::TDynamicCounters::TCounterPtr BytesCounter;
26+
27+
public:
28+
TFetchingStepSignals(NColumnShard::TCommonCountersOwner&& owner)
29+
: TBase(std::move(owner))
30+
, DurationCounter(TBase::GetDeriviative("duration_ms"))
31+
, BytesCounter(TBase::GetDeriviative("bytes_ms")) {
32+
}
33+
34+
void AddDuration(const TDuration d) const {
35+
DurationCounter->Add(d.MilliSeconds());
36+
}
37+
38+
void AddBytes(const ui32 v) const {
39+
BytesCounter->Add(v);
40+
}
41+
};
42+
43+
class TFetchingStepsSignalsCollection: public NColumnShard::TCommonCountersOwner {
44+
private:
45+
using TBase = NColumnShard::TCommonCountersOwner;
46+
TMutex Mutex;
47+
THashMap<TString, TFetchingStepSignals> Collection;
48+
TFetchingStepSignals GetSignalsImpl(const TString& name) {
49+
TGuard<TMutex> g(Mutex);
50+
auto it = Collection.find(name);
51+
if (it == Collection.end()) {
52+
it = Collection.emplace(name, TFetchingStepSignals(CreateSubGroup("step_name", name))).first;
53+
}
54+
return it->second;
55+
}
56+
57+
public:
58+
TFetchingStepsSignalsCollection()
59+
: TBase("scan_steps") {
60+
}
61+
62+
static TFetchingStepSignals GetSignals(const TString& name) {
63+
return Singleton<TFetchingStepsSignalsCollection>()->GetSignalsImpl(name);
64+
}
65+
};
66+
67+
class IFetchingStep: public TNonCopyable {
2168
private:
2269
YDB_READONLY_DEF(TString, Name);
2370
YDB_READONLY(TDuration, SumDuration, TDuration::Zero());
2471
YDB_READONLY(ui64, SumSize, 0);
72+
TFetchingStepSignals Signals;
2573

2674
protected:
2775
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const = 0;
@@ -32,9 +80,11 @@ class IFetchingStep {
3280
public:
3381
void AddDuration(const TDuration d) {
3482
SumDuration += d;
83+
Signals.AddDuration(d);
3584
}
3685
void AddDataSize(const ui64 size) {
3786
SumSize += size;
87+
Signals.AddBytes(size);
3888
}
3989

4090
virtual ~IFetchingStep() = default;
@@ -48,7 +98,8 @@ class IFetchingStep {
4898
}
4999

50100
IFetchingStep(const TString& name)
51-
: Name(name) {
101+
: Name(name)
102+
, Signals(TFetchingStepsSignalsCollection::GetSignals(name)) {
52103
}
53104

54105
TString DebugString() const;
@@ -195,9 +246,7 @@ class TStepAction: public IDataTasksProcessor::ITask {
195246

196247
template <class T>
197248
TStepAction(const std::shared_ptr<T>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId)
198-
: TStepAction(std::static_pointer_cast<IDataSource>(source), std::move(cursor), ownerActorId)
199-
{
200-
249+
: TStepAction(std::static_pointer_cast<IDataSource>(source), std::move(cursor), ownerActorId) {
201250
}
202251
TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId);
203252
};

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
LIBRARY()
22

33
SRCS(
4-
fetched_data.cpp
54
columns_set.cpp
6-
iterator.cpp
5+
constructor.cpp
76
context.cpp
8-
source.cpp
9-
fetching.cpp
107
fetch_steps.cpp
8+
fetched_data.cpp
9+
fetching.cpp
10+
iterator.cpp
11+
source.cpp
1112
)
1213

1314
PEERDIR(

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/constructor.cpp

Lines changed: 0 additions & 22 deletions
This file was deleted.

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#include "constructor.h"
21
#include "fetched_data.h"
32
#include "interval.h"
43
#include "plain_read_data.h"
@@ -7,6 +6,7 @@
76
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
87
#include <ydb/core/tx/columnshard/blobs_reader/events.h>
98
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
9+
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h>
1010
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
1111
#include <ydb/core/tx/conveyor/usage/service.h>
1212
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
@@ -17,7 +17,7 @@ namespace NKikimr::NOlap::NReader::NPlain {
1717

1818
void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetching) {
1919
AFL_VERIFY(fetching);
20-
// AFL_VERIFY(!FetchingPlan);
20+
// AFL_VERIFY(!FetchingPlan);
2121
FetchingPlan = fetching;
2222
}
2323

@@ -122,7 +122,8 @@ bool TPortionDataSource::DoStartFetchingColumns(
122122
return false;
123123
}
124124

125-
auto constructor = std::make_shared<TBlobsFetcherTask>(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
125+
auto constructor =
126+
std::make_shared<NCommon::TBlobsFetcherTask>(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
126127
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
127128
return true;
128129
}
@@ -157,7 +158,8 @@ bool TPortionDataSource::DoStartFetchingIndexes(
157158
return false;
158159
}
159160

160-
auto constructor = std::make_shared<TBlobsFetcherTask>(readingActions, std::static_pointer_cast<NCommon::IDataSource>(sourcePtr), step, GetContext(), "CS::READ::" + step.GetName(), "");
161+
auto constructor =
162+
std::make_shared<NCommon::TBlobsFetcherTask>(readingActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
161163
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
162164
return true;
163165
}
@@ -276,7 +278,7 @@ bool TCommittedDataSource::DoStartFetchingColumns(
276278
readAction->AddRange(CommittedBlob.GetBlobRange());
277279

278280
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = { readAction };
279-
auto constructor = std::make_shared<TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
281+
auto constructor = std::make_shared<NCommon::TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
280282
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
281283
return true;
282284
}
@@ -290,7 +292,8 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>&
290292
AFL_VERIFY(GetStageData().GetBlobs().size() == 1);
291293
auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first);
292294
auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion());
293-
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema.begin(), schema.end())));
295+
auto rBatch = NArrow::DeserializeBatch(
296+
bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema.begin(), schema.end())));
294297
AFL_VERIFY(rBatch)("schema", schema.ToString());
295298
auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch);
296299
std::set<ui32> columnIdsToDelete = batchSchema->GetColumnIdsToDelete(resultSchema);

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ LIBRARY()
22

33
SRCS(
44
scanner.cpp
5-
constructor.cpp
65
source.cpp
76
interval.cpp
87
fetched_data.cpp

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/constructor.h

Lines changed: 0 additions & 27 deletions
This file was deleted.

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
#include "constructor.h"
21
#include "fetched_data.h"
32
#include "plain_read_data.h"
43
#include "source.h"
54

65
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
76
#include <ydb/core/tx/columnshard/blobs_reader/events.h>
87
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
8+
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h>
99
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
1010
#include <ydb/core/tx/conveyor/usage/service.h>
1111
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
@@ -29,7 +29,7 @@ void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr)
2929
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());
3030
SetMemoryGroupId(SourceGroupGuard->GetGroupId());
3131
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx());
32-
// NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
32+
// NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
3333
TFetchingScriptCursor cursor(FetchingPlan, 0);
3434
auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId());
3535
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
@@ -123,7 +123,8 @@ bool TPortionDataSource::DoStartFetchingColumns(
123123
return false;
124124
}
125125

126-
auto constructor = std::make_shared<TBlobsFetcherTask>(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
126+
auto constructor =
127+
std::make_shared<NCommon::TBlobsFetcherTask>(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
127128
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
128129
return true;
129130
}
@@ -158,8 +159,8 @@ bool TPortionDataSource::DoStartFetchingIndexes(
158159
return false;
159160
}
160161

161-
auto constructor = std::make_shared<TBlobsFetcherTask>(
162-
readingActions, std::static_pointer_cast<NCommon::IDataSource>(sourcePtr), step, GetContext(), "CS::READ::" + step.GetName(), "");
162+
auto constructor =
163+
std::make_shared<NCommon::TBlobsFetcherTask>(readingActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
163164
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
164165
return true;
165166
}

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ LIBRARY()
22

33
SRCS(
44
scanner.cpp
5-
constructor.cpp
65
source.cpp
76
fetched_data.cpp
87
plain_read_data.cpp

0 commit comments

Comments
 (0)