1
1
#pragma once
2
- #include " position.h"
3
- #include " heap.h"
4
- #include " result_builder.h"
5
2
#include " batch_iterator.h"
3
+ #include " heap.h"
4
+ #include " position.h"
6
5
7
6
#include < ydb/core/formats/arrow/arrow_filter.h>
8
7
9
8
namespace NKikimr ::NArrow::NMerger {
10
9
10
+ template <typename T>
11
+ concept MergeResultBuilder = requires (const T& constT, T& mutT, const std::shared_ptr<arrow::Schema>& schema, const TBatchIterator& cursor) {
12
+ { constT.IsBufferExhausted () } -> std::same_as<bool >;
13
+ { constT.ValidateDataSchema (schema) } -> std::same_as<void >;
14
+ { mutT.AddRecord (cursor) } -> std::same_as<void >;
15
+ { mutT.SkipRecord (cursor) } -> std::same_as<void >;
16
+ };
17
+
11
18
class TMergePartialStream {
12
19
private:
13
20
#ifndef NDEBUG
@@ -19,6 +26,7 @@ class TMergePartialStream {
19
26
std::shared_ptr<arrow::Schema> DataSchema;
20
27
const bool Reverse;
21
28
const std::vector<std::string> VersionColumnNames;
29
+ std::optional<TCursor> MaxVersion;
22
30
ui32 ControlPoints = 0 ;
23
31
24
32
TSortingHeap<TBatchIterator> SortHeap;
@@ -34,19 +42,92 @@ class TMergePartialStream {
34
42
return result;
35
43
}
36
44
37
- void DrainCurrentPosition (TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition);
45
+ template <MergeResultBuilder TBuilder>
46
+ [[nodiscard]] bool DrainCurrentPosition (TBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition) {
47
+ Y_ABORT_UNLESS (SortHeap.Size ());
48
+ Y_ABORT_UNLESS (!SortHeap.Current ().IsControlPoint ());
49
+ CheckSequenceInDebug (SortHeap.Current ().GetKeyColumns ());
50
+
51
+ const ui64 startPosition = SortHeap.Current ().GetKeyColumns ().GetPosition ();
52
+ const TSortableScanData* startSorting = SortHeap.Current ().GetKeyColumns ().GetSorting ().get ();
53
+ const TSortableScanData* startVersion = SortHeap.Current ().GetVersionColumns ().GetSorting ().get ();
54
+
55
+ if (MaxVersion) {
56
+ bool skippedPk = false ;
57
+ while (SortHeap.Size () && SortHeap.Current ().GetVersionColumns ().Compare (*MaxVersion) == std::partial_ordering::greater && !skippedPk) {
58
+ if (builder) {
59
+ builder->SkipRecord (SortHeap.Current ());
60
+ }
61
+ SortHeap.Next ();
62
+ if (SortHeap.Empty () ||
63
+ SortHeap.Current ().GetKeyColumns ().Compare (*startSorting, startPosition) != std::partial_ordering::equivalent) {
64
+ skippedPk = true ;
65
+ }
66
+ }
67
+ if (skippedPk) {
68
+ SortHeap.CleanFinished ();
69
+ return false ;
70
+ }
71
+ }
72
+
73
+ bool foundResult = false ;
74
+ if (!SortHeap.Current ().IsDeleted ()) {
75
+ foundResult = true ;
76
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_add", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
77
+ if (builder) {
78
+ builder->AddRecord (SortHeap.Current ());
79
+ }
80
+ if (resultScanData && resultPosition) {
81
+ *resultScanData = SortHeap.Current ().GetKeyColumns ().GetSorting ();
82
+ *resultPosition = SortHeap.Current ().GetKeyColumns ().GetPosition ();
83
+ }
84
+ } else {
85
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
86
+ if (builder) {
87
+ builder->SkipRecord (SortHeap.Current ());
88
+ }
89
+ }
90
+ SortHeap.Next ();
91
+
92
+ while (
93
+ SortHeap.Size () && (SortHeap.Current ().GetKeyColumns ().Compare (*startSorting, startPosition) == std::partial_ordering::equivalent)) {
94
+ if (builder) {
95
+ builder->SkipRecord (SortHeap.Current ());
96
+ }
97
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("key_skip1", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
98
+ auto & anotherIterator = SortHeap.Current ();
99
+ if (PossibleSameVersionFlag) {
100
+ AFL_VERIFY (anotherIterator.GetVersionColumns ().Compare (*startVersion, startPosition) != std::partial_ordering::greater)
101
+ (" r" , startVersion->BuildCursor (startPosition).DebugJson ())(" a" , anotherIterator.GetVersionColumns ().DebugJson ())(
102
+ " key" , startSorting->BuildCursor (startPosition).DebugJson ());
103
+ } else {
104
+ AFL_VERIFY (anotherIterator.GetVersionColumns ().Compare (*startVersion, startPosition) == std::partial_ordering::less)
105
+ (" r" , startVersion->BuildCursor (startPosition).DebugJson ())(" a" , anotherIterator.GetVersionColumns ().DebugJson ())(
106
+ " key" , startSorting->BuildCursor (startPosition).DebugJson ());
107
+ }
108
+ SortHeap.Next ();
109
+ }
110
+ SortHeap.CleanFinished ();
111
+ return foundResult;
112
+ }
38
113
39
114
void CheckSequenceInDebug (const TRWSortableBatchPosition& nextKeyColumnsPosition);
40
- bool DrainCurrentTo (TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish,
41
- std::optional<TCursor>* lastResultPosition = nullptr );
115
+
116
+ template <MergeResultBuilder TBuilder>
117
+ bool DrainCurrentTo (TBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish,
118
+ std::optional<TCursor>* lastResultPosition = nullptr ) {
119
+ PutControlPoint (readTo, false );
120
+ return DrainToControlPoint (builder, includeFinish, lastResultPosition);
121
+ }
42
122
43
123
public:
44
- TMergePartialStream (std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
124
+ TMergePartialStream (std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse,
125
+ const std::vector<std::string>& versionColumnNames, const std::optional<TCursor>& maxVersion)
45
126
: SortSchema(sortSchema)
46
127
, DataSchema(dataSchema)
47
128
, Reverse(reverse)
48
129
, VersionColumnNames(versionColumnNames)
49
- {
130
+ , MaxVersion(maxVersion) {
50
131
Y_ABORT_UNLESS (SortSchema);
51
132
Y_ABORT_UNLESS (SortSchema->num_fields ());
52
133
Y_ABORT_UNLESS (!DataSchema || DataSchema->num_fields ());
@@ -78,25 +159,67 @@ class TMergePartialStream {
78
159
}
79
160
80
161
template <class TDataContainer >
81
- void AddSource (const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter) {
162
+ void AddSource (const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter,
163
+ const std::optional<ui64> sourceIdExt = std::nullopt) {
164
+ const ui64 sourceId = sourceIdExt.value_or (SortHeap.Size ());
82
165
if (!batch || !batch->num_rows ()) {
83
166
return ;
84
167
}
85
- // Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
168
+ // Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
86
169
const bool isDenyFilter = filter && filter->IsTotalDenyFilter ();
87
170
auto filterImpl = (!filter || filter->IsTotalAllowFilter ()) ? nullptr : filter;
88
- SortHeap.Push (TBatchIterator (batch, filterImpl, SortSchema->field_names (), (!isDenyFilter && DataSchema) ? DataSchema->field_names () : std::vector<std::string>(), Reverse, VersionColumnNames));
171
+ static const arrow::Schema emptySchema = arrow::Schema (arrow::FieldVector ());
172
+ TBatchIterator iterator (
173
+ batch, filterImpl, *SortSchema, (!isDenyFilter && DataSchema) ? *DataSchema : emptySchema, Reverse, VersionColumnNames, sourceId);
174
+ if (MaxVersion) {
175
+ MaxVersion->ValidateSchema (*iterator.GetVersionColumns ().GetSorting ());
176
+ }
177
+ SortHeap.Push (std::move (iterator));
89
178
}
90
179
91
180
bool IsEmpty () const {
92
181
return !SortHeap.Size ();
93
182
}
94
183
95
- void DrainAll (TRecordBatchBuilder& builder);
96
- std::shared_ptr<arrow::Table> SingleSourceDrain (const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr );
97
- bool DrainToControlPoint (TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr );
98
- std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts (const TIntervalPositions& positions,
99
- const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
184
+ template <MergeResultBuilder TBuilder>
185
+ void DrainAll (TBuilder& builder) {
186
+ builder.ValidateDataSchema (DataSchema);
187
+ while (SortHeap.Size ()) {
188
+ Y_UNUSED (DrainCurrentPosition (&builder, nullptr , nullptr ));
189
+ }
190
+ }
191
+ std::shared_ptr<arrow::Table> SingleSourceDrain (
192
+ const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr );
193
+ std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts (
194
+ const TIntervalPositions& positions, const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
195
+
196
+ template <MergeResultBuilder TBuilder>
197
+ bool DrainToControlPoint (TBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr ) {
198
+ AFL_VERIFY (ControlPoints == 1 );
199
+ builder.ValidateDataSchema (DataSchema);
200
+ bool cpReachedFlag = false ;
201
+ std::shared_ptr<TSortableScanData> resultScanData;
202
+ ui64 resultPosition;
203
+ while (SortHeap.Size () && !cpReachedFlag && !builder.IsBufferExhausted ()) {
204
+ if (SortHeap.Current ().IsControlPoint ()) {
205
+ auto keyColumns = SortHeap.Current ().GetKeyColumns ().BuildSortingCursor ();
206
+ RemoveControlPoint ();
207
+ cpReachedFlag = true ;
208
+ if (SortHeap.Empty () || !includeFinish ||
209
+ SortHeap.Current ().GetKeyColumns ().Compare (keyColumns) == std::partial_ordering::greater) {
210
+ if (lastResultPosition && resultScanData) {
211
+ *lastResultPosition = resultScanData->BuildCursor (resultPosition);
212
+ }
213
+ return true ;
214
+ }
215
+ }
216
+ Y_UNUSED (DrainCurrentPosition (&builder, &resultScanData, &resultPosition));
217
+ }
218
+ if (lastResultPosition && resultScanData) {
219
+ *lastResultPosition = resultScanData->BuildCursor (resultPosition);
220
+ }
221
+ return cpReachedFlag;
222
+ }
100
223
};
101
224
102
- }
225
+ } // namespace NKikimr::NArrow::NMerger
0 commit comments