Skip to content

Commit 49a8f16

Browse files
speed up merging with correct pointers operation (#7230)
1 parent 7120c84 commit 49a8f16

File tree

6 files changed

+30
-23
lines changed

6 files changed

+30
-23
lines changed

ydb/core/formats/arrow/reader/merger.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
namespace NKikimr::NArrow::NMerger {
66

7-
void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point) {
7+
void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy) {
88
AFL_VERIFY(point.IsSameSortingSchema(SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString());
99
Y_ABORT_UNLESS(point.IsReverseSort() == Reverse);
1010
Y_ABORT_UNLESS(++ControlPoints == 1);
1111

12-
SortHeap.Push(TBatchIterator(point.BuildRWPosition()));
12+
SortHeap.Push(TBatchIterator(point.BuildRWPosition(false, deepCopy)));
1313
}
1414

1515
void TMergePartialStream::RemoveControlPoint() {
@@ -65,7 +65,7 @@ bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, cons
6565
}
6666

6767
bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
68-
PutControlPoint(readTo);
68+
PutControlPoint(readTo, false);
6969
return DrainToControlPoint(builder, includeFinish, lastResultPosition);
7070
}
7171

@@ -191,6 +191,9 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
191191
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
192192
for (auto&& i : positions) {
193193
TRecordBatchBuilder indexesBuilder(resultFields);
194+
if (SortHeap.Empty() || i.GetPosition().Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::less) {
195+
continue;
196+
}
194197
DrainCurrentTo(indexesBuilder, i.GetPosition(), i.IsIncludedToLeftInterval());
195198
result.emplace_back(indexesBuilder.Finalize());
196199
if (result.back()->num_rows() == 0) {

ydb/core/formats/arrow/reader/merger.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ class TMergePartialStream {
3737
void DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition);
3838

3939
void CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition);
40+
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish,
41+
std::optional<TCursor>* lastResultPosition = nullptr);
42+
4043
public:
4144
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
4245
: SortSchema(sortSchema)
@@ -49,6 +52,7 @@ class TMergePartialStream {
4952
Y_ABORT_UNLESS(!DataSchema || DataSchema->num_fields());
5053
}
5154

55+
void PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy);
5256
void SkipToLowerBound(const TSortableBatchPosition& pos, const bool include);
5357

5458
void SetPossibleSameVersion(const bool value) {
@@ -67,8 +71,6 @@ class TMergePartialStream {
6771
return TStringBuilder() << "sort_heap=" << SortHeap.DebugJson();
6872
}
6973

70-
void PutControlPoint(const TSortableBatchPosition& point);
71-
7274
void RemoveControlPoint();
7375

7476
bool ControlPointEnriched() const {
@@ -92,7 +94,6 @@ class TMergePartialStream {
9294

9395
void DrainAll(TRecordBatchBuilder& builder);
9496
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
95-
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
9697
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
9798
std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const TIntervalPositions& positions,
9899
const std::vector<std::shared_ptr<arrow::Field>>& resultFields);

ydb/core/formats/arrow/reader/position.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,10 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
8080
return FindPosition(position, posStart, posFinish, forFound, greater);
8181
}
8282

83-
NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition() const {
84-
return TRWSortableBatchPosition(
85-
Position, RecordsCount, ReverseSort, Sorting->BuildCopy(Position), Data ? Data->BuildCopy(Position) : nullptr);
83+
NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(const bool needData, const bool deepCopy) const {
84+
return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort,
85+
deepCopy ? Sorting->BuildCopy(Position) : Sorting,
86+
(needData && Data) ? (deepCopy ? Data->BuildCopy(Position) : Data) : nullptr);
8687
}
8788

8889
NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(

ydb/core/formats/arrow/reader/position.h

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class TSortableScanData {
9898
}
9999

100100
std::shared_ptr<TSortableScanData> BuildCopy(const ui64 position) const {
101-
return std::make_shared<TSortableScanData>(position, RecordsCount, Columns, Fields);
101+
return std::make_shared<TSortableScanData>(*this);
102102
}
103103

104104
TCursor BuildCursor(const ui64 position) const {
@@ -209,6 +209,17 @@ class TSortableBatchPosition {
209209
bool ReverseSort = false;
210210
std::shared_ptr<TSortableScanData> Sorting;
211211
std::shared_ptr<TSortableScanData> Data;
212+
213+
TSortableBatchPosition(const i64 position, const i64 recordsCount, const bool reverseSort, const std::shared_ptr<TSortableScanData>& sorting,
214+
const std::shared_ptr<TSortableScanData>& data)
215+
: Position(position)
216+
, RecordsCount(recordsCount)
217+
, ReverseSort(reverseSort)
218+
, Sorting(sorting)
219+
, Data(data) {
220+
AFL_VERIFY(IsAvailablePosition(Position));
221+
}
222+
212223
public:
213224
TSortableBatchPosition() = default;
214225

@@ -220,7 +231,7 @@ class TSortableBatchPosition {
220231
return RecordsCount;
221232
}
222233

223-
std::shared_ptr<TSortableScanData> GetSorting() const {
234+
const std::shared_ptr<TSortableScanData>& GetSorting() const {
224235
return Sorting;
225236
}
226237

@@ -239,15 +250,6 @@ class TSortableBatchPosition {
239250
return Sorting->GetFields();
240251
}
241252

242-
TSortableBatchPosition(const i64 position, const i64 recordsCount, const bool reverseSort, const std::shared_ptr<TSortableScanData>& sorting, const std::shared_ptr<TSortableScanData>& data)
243-
: Position(position)
244-
, RecordsCount(recordsCount)
245-
, ReverseSort(reverseSort)
246-
, Sorting(sorting)
247-
, Data(data) {
248-
AFL_VERIFY(IsAvailablePosition(Position));
249-
}
250-
251253
TSortableBatchPosition(const TRWSortableBatchPosition& source) = delete;
252254
TSortableBatchPosition(TRWSortableBatchPosition& source) = delete;
253255
TSortableBatchPosition(TRWSortableBatchPosition&& source) = delete;
@@ -256,7 +258,7 @@ class TSortableBatchPosition {
256258
TSortableBatchPosition operator= (TRWSortableBatchPosition& source) = delete;
257259
TSortableBatchPosition operator= (TRWSortableBatchPosition&& source) = delete;
258260

259-
TRWSortableBatchPosition BuildRWPosition() const;
261+
TRWSortableBatchPosition BuildRWPosition(const bool needData, const bool deepCopy) const;
260262

261263
std::shared_ptr<arrow::Table> SliceData(const ui64 offset, const ui64 count) const {
262264
AFL_VERIFY(Data);

ydb/core/formats/arrow/reader/result_builder.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ std::shared_ptr<arrow::RecordBatch> TRecordBatchBuilder::Finalize() {
6464
for (auto&& i : Builders) {
6565
columns.emplace_back(NArrow::TStatusValidator::GetValid(i->Finish()));
6666
}
67-
auto result = arrow::RecordBatch::Make(schema, columns.front()->length(), columns);
67+
auto result = arrow::RecordBatch::Make(schema, columns.front()->length(), std::move(columns));
6868
#ifndef NDEBUG
6969
NArrow::TStatusValidator::Validate(result->ValidateFull());
7070
#endif

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
103103
return TConclusionStatus::Success();
104104
}
105105
}
106-
Merger->PutControlPoint(MergingContext->GetFinish());
106+
Merger->PutControlPoint(MergingContext->GetFinish(), false);
107107
Merger->SkipToLowerBound(MergingContext->GetStart(), MergingContext->GetIncludeStart());
108108
const ui32 originalSourcesCount = Sources.size();
109109
Sources.clear();

0 commit comments

Comments
 (0)