Skip to content

Commit d8ee31d

Browse files
Correct schemas adaptation (#9425)
1 parent e6a5701 commit d8ee31d

File tree

21 files changed

+321
-133
lines changed

21 files changed

+321
-133
lines changed

ydb/core/formats/arrow/arrow_helpers.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,18 @@ bool IsSortedAndUnique(const std::shared_ptr<arrow::RecordBatch>& batch,
196196
}
197197
}
198198

199-
std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
200-
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
199+
std::shared_ptr<arrow::RecordBatch> SortBatch(
200+
const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::shared_ptr<arrow::Array>>& sortingKey, const bool andUnique) {
201+
auto sortPermutation = MakeSortPermutation(sortingKey, andUnique);
202+
if (sortPermutation) {
203+
return Reorder(batch, sortPermutation, andUnique);
204+
} else {
205+
return batch;
206+
}
207+
}
208+
209+
std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey,
210+
const bool andUnique) {
201211
auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique);
202212
if (sortPermutation) {
203213
return Reorder(batch, sortPermutation, andUnique);

ydb/core/formats/arrow/arrow_helpers.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@ TString SerializeBatchNoCompression(const std::shared_ptr<arrow::RecordBatch>& b
2626
std::shared_ptr<arrow::RecordBatch> DeserializeBatch(const TString& blob,
2727
const std::shared_ptr<arrow::Schema>& schema);
2828

29-
std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
30-
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
29+
std::shared_ptr<arrow::RecordBatch> SortBatch(
30+
const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
31+
std::shared_ptr<arrow::RecordBatch> SortBatch(
32+
const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::shared_ptr<arrow::Array>>& sortingKey, const bool andUnique);
3133
bool IsSorted(const std::shared_ptr<arrow::RecordBatch>& batch,
3234
const std::shared_ptr<arrow::Schema>& sortingKey,
3335
bool desc = false);

ydb/core/formats/arrow/permutations.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,25 @@
1515

1616
namespace NKikimr::NArrow {
1717

18-
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
19-
auto keyBatch = TColumnOperator().VerifyIfAbsent().Adapt(batch, sortingKey).DetachResult();
20-
auto keyColumns = std::make_shared<TArrayVec>(keyBatch->columns());
18+
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::vector<std::shared_ptr<arrow::Array>>& keyColumns, const bool andUnique) {
19+
std::optional<i64> count;
20+
for (auto&& i : keyColumns) {
21+
AFL_VERIFY(i);
22+
if (!count) {
23+
count = i->length();
24+
} else {
25+
AFL_VERIFY(*count == i->length());
26+
}
27+
}
28+
AFL_VERIFY(count);
2129
std::vector<TRawReplaceKey> points;
22-
points.reserve(keyBatch->num_rows());
23-
24-
for (int i = 0; i < keyBatch->num_rows(); ++i) {
25-
points.push_back(TRawReplaceKey(keyColumns.get(), i));
30+
points.reserve(*count);
31+
for (int i = 0; i < *count; ++i) {
32+
points.push_back(TRawReplaceKey(&keyColumns, i));
2633
}
2734

2835
bool haveNulls = false;
29-
for (auto& column : *keyColumns) {
36+
for (auto& column : keyColumns) {
3037
if (HasNulls(column)) {
3138
haveNulls = true;
3239
break;
@@ -36,11 +43,9 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
3643
if (haveNulls) {
3744
std::sort(points.begin(), points.end());
3845
} else {
39-
std::sort(points.begin(), points.end(),
40-
[](const TRawReplaceKey& a, const TRawReplaceKey& b) {
41-
return a.CompareNotNull(b) == std::partial_ordering::less;
42-
}
43-
);
46+
std::sort(points.begin(), points.end(), [](const TRawReplaceKey& a, const TRawReplaceKey& b) {
47+
return a.CompareNotNull(b) == std::partial_ordering::less;
48+
});
4449
}
4550

4651
arrow::UInt64Builder builder;
@@ -78,6 +83,12 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
7883
return out;
7984
}
8085

86+
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch,
87+
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
88+
auto keyBatch = TColumnOperator().VerifyIfAbsent().Adapt(batch, sortingKey).DetachResult();
89+
return MakeSortPermutation(keyBatch->columns(), andUnique);
90+
}
91+
8192
namespace {
8293

8394
template <class TDataContainer>

ydb/core/formats/arrow/permutations.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ class THashConstructor {
1616

1717
};
1818

19-
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
19+
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(
20+
const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique);
2021

21-
}
22+
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::vector<std::shared_ptr<arrow::Array>>& columns, const bool andUnique);
23+
24+
} // namespace NKikimr::NArrow

ydb/core/formats/arrow/process_columns.cpp

Lines changed: 96 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,22 +99,22 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(
9999
}
100100

101101
template <class TDataContainer, class TStringContainer>
102-
std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProblemsPolicy& policy,
102+
std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EAbsentFieldPolicy& policy,
103103
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringContainer>& columnNames) {
104104
AFL_VERIFY(incoming);
105105
AFL_VERIFY(columnNames.size());
106106
auto result = ExtractColumnsValidateImpl(incoming, columnNames);
107107
switch (policy) {
108-
case TColumnOperator::EExtractProblemsPolicy::Verify:
108+
case TColumnOperator::EAbsentFieldPolicy::Verify:
109109
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())(
110110
"required", TColumnNameAccessor<TStringContainer>::DebugString(columnNames));
111111
break;
112-
case TColumnOperator::EExtractProblemsPolicy::Null:
112+
case TColumnOperator::EAbsentFieldPolicy::Error:
113113
if ((ui32)result->num_columns() != columnNames.size()) {
114114
return nullptr;
115115
}
116116
break;
117-
case TColumnOperator::EExtractProblemsPolicy::Skip:
117+
case TColumnOperator::EAbsentFieldPolicy::Skip:
118118
break;
119119
}
120120
return result;
@@ -211,8 +211,8 @@ NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
211211
}
212212
namespace {
213213
template <class TDataContainer, class TSchemaImpl>
214-
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(
215-
const std::shared_ptr<TDataContainer>& srcBatch, const std::shared_ptr<TSchemaImpl>& dstSchema) {
214+
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(const std::shared_ptr<TDataContainer>& srcBatch,
215+
const std::shared_ptr<TSchemaImpl>& dstSchema, const TColumnOperator::ECheckFieldTypesPolicy checkFieldTypesPolicy) {
216216
AFL_VERIFY(srcBatch);
217217
AFL_VERIFY(dstSchema);
218218
if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) {
@@ -228,10 +228,20 @@ TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(
228228
++itDst;
229229
} else {
230230
fieldIdx.emplace(itDst - dstSchema->fields().begin());
231-
if (!(*itDst)->Equals(*itSrc)) {
232-
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
233-
"column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true));
234-
return TConclusionStatus::Fail("incompatible column types");
231+
if (checkFieldTypesPolicy != TColumnOperator::ECheckFieldTypesPolicy::Ignore && (*itDst)->Equals(*itSrc)) {
232+
switch (checkFieldTypesPolicy) {
233+
case TColumnOperator::ECheckFieldTypesPolicy::Error: {
234+
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
235+
"column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true));
236+
return TConclusionStatus::Fail("incompatible column types");
237+
}
238+
case TColumnOperator::ECheckFieldTypesPolicy::Verify: {
239+
AFL_VERIFY(false)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
240+
"column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true));
241+
}
242+
case TColumnOperator::ECheckFieldTypesPolicy::Ignore:
243+
AFL_VERIFY(false);
244+
}
235245
}
236246

237247
++itDst;
@@ -249,7 +259,82 @@ TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(
249259

250260
TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset(
251261
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema) {
252-
return BuildSequentialSubsetImpl(incoming, dstSchema);
262+
return BuildSequentialSubsetImpl(incoming, dstSchema, DifferentColumnTypesPolicy);
263+
}
264+
namespace {
265+
template <class TDataContainer>
266+
TConclusion<std::shared_ptr<TDataContainer>> AdaptIncomingToDestinationExtImpl(const std::shared_ptr<TDataContainer>& incoming,
267+
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
268+
const std::function<i32(const std::string&)>& nameResolver,
269+
const TColumnOperator::ECheckFieldTypesPolicy differentColumnTypesPolicy,
270+
const TColumnOperator::EAbsentFieldPolicy absentColumnPolicy) {
271+
struct TFieldData {
272+
ui32 Index;
273+
std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn> Column;
274+
bool operator<(const TFieldData& item) const {
275+
return Index < item.Index;
276+
}
277+
};
278+
AFL_VERIFY(incoming);
279+
AFL_VERIFY(dstSchema);
280+
std::vector<TFieldData> resultColumns;
281+
resultColumns.reserve(incoming->num_columns());
282+
ui32 idx = 0;
283+
for (auto& srcField : incoming->schema()->fields()) {
284+
const int dstIndex = nameResolver(srcField->name());
285+
if (dstIndex > -1) {
286+
const auto& dstField = dstSchema->GetFieldByIndexVerified(dstIndex);
287+
switch (differentColumnTypesPolicy) {
288+
case TColumnOperator::ECheckFieldTypesPolicy::Verify:
289+
AFL_VERIFY(dstField->Equals(srcField))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
290+
"dst_column", dstField->ToString(true))("src_column", srcField->ToString(true));
291+
break;
292+
case TColumnOperator::ECheckFieldTypesPolicy::Error:
293+
if (!dstField->Equals(srcField)) {
294+
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
295+
"dst_column", dstField->ToString(true))("src_column", srcField->ToString(true));
296+
return TConclusionStatus::Fail("incompatible column types for '" + dstField->name() + "'");
297+
}
298+
break;
299+
case TColumnOperator::ECheckFieldTypesPolicy::Ignore:
300+
break;
301+
}
302+
auto resultCheck = checker(idx, dstIndex);
303+
if (resultCheck.IsFail()) {
304+
return resultCheck;
305+
}
306+
resultColumns.emplace_back(TFieldData{ .Index = (ui32)dstIndex, .Column = incoming->column(idx) });
307+
} else if (absentColumnPolicy == TColumnOperator::EAbsentFieldPolicy::Skip) {
308+
} else if (absentColumnPolicy == TColumnOperator::EAbsentFieldPolicy::Verify) {
309+
AFL_VERIFY(false)("event", "cannot_use_incoming_batch")("reason", "absent_field")("dst_column", srcField->ToString(true));
310+
} else if (absentColumnPolicy == TColumnOperator::EAbsentFieldPolicy::Error) {
311+
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "absent_field")(
312+
"dst_column", srcField->ToString(true));
313+
return TConclusionStatus::Fail("not found column '" + srcField->name() + "'");
314+
} else {
315+
AFL_VERIFY(false);
316+
}
317+
++idx;
318+
}
319+
if (resultColumns.empty()) {
320+
return TConclusionStatus::Fail("not found any column");
321+
}
322+
std::sort(resultColumns.begin(), resultColumns.end());
323+
std::vector<std::shared_ptr<arrow::Field>> fields;
324+
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
325+
columns.reserve(resultColumns.size());
326+
fields.reserve(resultColumns.size());
327+
for (auto&& i : resultColumns) {
328+
fields.emplace_back(dstSchema->field(i.Index));
329+
columns.emplace_back(i.Column);
330+
}
331+
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), incoming->num_rows());
332+
}
333+
} // namespace
334+
TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::AdaptIncomingToDestinationExt(
335+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<TSchemaLite>& dstSchema,
336+
const std::function<TConclusionStatus(const ui32, const i32)>& checker, const std::function<i32(const std::string&)>& nameResolver) const {
337+
return AdaptIncomingToDestinationExtImpl(incoming, dstSchema, checker, nameResolver, DifferentColumnTypesPolicy, AbsentColumnPolicy);
253338
}
254339

255340
} // namespace NKikimr::NArrow

ydb/core/formats/arrow/process_columns.h

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <ydb/library/conclusion/result.h>
33

44
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
5+
#include <functional>
56

67
namespace NKikimr::NArrow {
78

@@ -10,31 +11,57 @@ class TSchemaLite;
1011

1112
class TColumnOperator {
1213
public:
13-
enum class EExtractProblemsPolicy {
14-
Null,
14+
enum class EAbsentFieldPolicy {
15+
Error,
1516
Verify,
1617
Skip
1718
};
1819

20+
enum class ECheckFieldTypesPolicy {
21+
Ignore,
22+
Error,
23+
Verify
24+
};
25+
1926
private:
20-
EExtractProblemsPolicy AbsentColumnPolicy = EExtractProblemsPolicy::Verify;
27+
EAbsentFieldPolicy AbsentColumnPolicy = EAbsentFieldPolicy::Verify;
28+
ECheckFieldTypesPolicy DifferentColumnTypesPolicy = ECheckFieldTypesPolicy::Error;
2129

2230
public:
23-
TColumnOperator& NullIfAbsent() {
24-
AbsentColumnPolicy = EExtractProblemsPolicy::Null;
31+
TColumnOperator& VerifyOnDifferentFieldTypes() {
32+
DifferentColumnTypesPolicy = ECheckFieldTypesPolicy::Verify;
33+
return *this;
34+
};
35+
36+
TColumnOperator& ErrorOnDifferentFieldTypes() {
37+
DifferentColumnTypesPolicy = ECheckFieldTypesPolicy::Error;
38+
return *this;
39+
};
40+
41+
TColumnOperator& IgnoreOnDifferentFieldTypes() {
42+
DifferentColumnTypesPolicy = ECheckFieldTypesPolicy::Ignore;
43+
return *this;
44+
};
45+
46+
TColumnOperator& ErrorIfAbsent() {
47+
AbsentColumnPolicy = EAbsentFieldPolicy::Error;
2548
return *this;
2649
}
2750

2851
TColumnOperator& VerifyIfAbsent() {
29-
AbsentColumnPolicy = EExtractProblemsPolicy::Verify;
52+
AbsentColumnPolicy = EAbsentFieldPolicy::Verify;
3053
return *this;
3154
}
3255

3356
TColumnOperator& SkipIfAbsent() {
34-
AbsentColumnPolicy = EExtractProblemsPolicy::Skip;
57+
AbsentColumnPolicy = EAbsentFieldPolicy::Skip;
3558
return *this;
3659
}
3760

61+
TConclusion<std::shared_ptr<arrow::RecordBatch>> AdaptIncomingToDestinationExt(const std::shared_ptr<arrow::RecordBatch>& incoming,
62+
const std::shared_ptr<TSchemaLite>& dstSchema, const std::function<TConclusionStatus(const ui32, const i32)>& checker,
63+
const std::function<i32(const std::string&)>& nameResolver) const;
64+
3865
std::shared_ptr<arrow::RecordBatch> Extract(
3966
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
4067
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);

ydb/core/io_formats/arrow/csv_arrow.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
250250
}
251251

252252
if (batch && ResultColumns.size()) {
253-
batch = NArrow::TColumnOperator().NullIfAbsent().Extract(batch, ResultColumns);
253+
batch = NArrow::TColumnOperator().ErrorIfAbsent().Extract(batch, ResultColumns);
254254
if (!batch) {
255255
errString = ErrorPrefix() + "not all result columns present";
256256
}

ydb/core/tablet_flat/flat_table_column.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ namespace NTable {
4949
}
5050
}
5151

52+
std::optional<ui32> GetCorrectKeyOrder() const {
53+
if (KeyOrder == Max<TPos>()) {
54+
return std::nullopt;
55+
} else {
56+
return KeyOrder;
57+
}
58+
}
59+
5260
NTable::TTag Id = Max<TTag>();
5361
NScheme::TTypeInfo PType;
5462
TString PTypeMod;

ydb/core/tx/columnshard/engines/scheme/abstract/index_info.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,18 @@ std::optional<ui32> IIndexInfo::GetColumnIdOptional(const std::string& name) con
4646
return {};
4747
}
4848

49-
TString IIndexInfo::GetColumnName(ui32 id, bool required) const {
49+
std::optional<ui32> IIndexInfo::GetColumnIndexOptional(const std::string& name, const ui32 shift) const {
50+
if (name == SPEC_COL_PLAN_STEP) {
51+
return shift + 0;
52+
} else if (name == SPEC_COL_TX_ID) {
53+
return shift + 1;
54+
} else if (name == SPEC_COL_DELETE_FLAG) {
55+
return shift + 2;
56+
}
57+
return {};
58+
}
59+
60+
TString IIndexInfo::GetColumnName(const ui32 id, const bool required) const {
5061
if (ESpecialColumn(id) == ESpecialColumn::PLAN_STEP) {
5162
return SPEC_COL_PLAN_STEP;
5263
} else if (ESpecialColumn(id) == ESpecialColumn::TX_ID) {

ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ class IIndexInfo {
133133
}
134134

135135
std::optional<ui32> GetColumnIdOptional(const std::string& name) const;
136-
TString GetColumnName(ui32 id, bool required) const;
136+
std::optional<ui32> GetColumnIndexOptional(const std::string& name, const ui32 shift) const;
137+
TString GetColumnName(const ui32 id, const bool required) const;
137138
static std::shared_ptr<arrow::Field> GetColumnFieldOptional(const ui32 columnId);
138139
static std::shared_ptr<arrow::Field> GetColumnFieldVerified(const ui32 columnId);
139140

0 commit comments

Comments
 (0)