Skip to content

Commit 553d194

Browse files
not nulls information provide into arrow::array for correct kernels c… (#15060)
1 parent d077bd0 commit 553d194

File tree

6 files changed

+48
-18
lines changed

6 files changed

+48
-18
lines changed

ydb/core/formats/arrow/accessor/common/chunk_data.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,27 @@
55
namespace NKikimr::NArrow::NAccessor {
66

77
TChunkConstructionData::TChunkConstructionData(const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue,
8-
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer)
8+
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer,
9+
const std::optional<ui32>& notNullRecordsCount)
910
: RecordsCount(recordsCount)
11+
, NotNullRecordsCount(notNullRecordsCount)
1012
, DefaultValue(defaultValue)
1113
, ColumnType(columnType)
1214
, DefaultSerializer(defaultSerializer) {
1315
AFL_VERIFY(ColumnType);
1416
AFL_VERIFY(RecordsCount);
17+
AFL_VERIFY(!NotNullRecordsCount || *NotNullRecordsCount <= RecordsCount)("records", RecordsCount)("not_null", NotNullRecordsCount);
1518
AFL_VERIFY(!!DefaultSerializer);
1619
}
1720

18-
TChunkConstructionData TChunkConstructionData::GetSubset(const ui32 recordsCount) const {
21+
TChunkConstructionData TChunkConstructionData::GetSubset(const ui32 recordsCount, const std::optional<ui32>& notNullRecordsCount) const {
1922
AFL_VERIFY(recordsCount <= RecordsCount)("sub", recordsCount)("global", RecordsCount);
20-
return TChunkConstructionData(recordsCount, DefaultValue, ColumnType, DefaultSerializer);
23+
return TChunkConstructionData(recordsCount, DefaultValue, ColumnType, DefaultSerializer, notNullRecordsCount);
24+
}
25+
26+
ui32 TChunkConstructionData::GetNullRecordsCountVerified() const {
27+
AFL_VERIFY(NotNullRecordsCount);
28+
return RecordsCount - *NotNullRecordsCount;
2129
}
2230

2331
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/common/chunk_data.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,22 @@ namespace NKikimr::NArrow::NAccessor {
1313
class TChunkConstructionData {
1414
private:
1515
YDB_READONLY(ui32, RecordsCount, 0);
16+
YDB_READONLY_DEF(std::optional<ui32>, NotNullRecordsCount);
1617
YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, DefaultValue);
1718
YDB_READONLY_DEF(std::shared_ptr<arrow::DataType>, ColumnType);
1819
YDB_READONLY_DEF(std::shared_ptr<NSerialization::ISerializer>, DefaultSerializer);
1920

2021
public:
2122
TChunkConstructionData(const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue,
22-
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer);
23+
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer,
24+
const std::optional<ui32>& notNullRecordsCount = std::nullopt);
2325

24-
TChunkConstructionData GetSubset(const ui32 recordsCount) const;
26+
TChunkConstructionData GetSubset(const ui32 recordsCount, const std::optional<ui32>& notNullRecordsCount = std::nullopt) const;
27+
28+
bool HasNullRecordsCount() const {
29+
return !!NotNullRecordsCount;
30+
}
31+
ui32 GetNullRecordsCountVerified() const;
2532
};
2633

2734
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/plain/accessor.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class TTrivialArray: public IChunkedArray {
3232
virtual ui32 DoGetValueRawBytes() const override;
3333

3434
public:
35+
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const override {
36+
return std::make_shared<arrow::ChunkedArray>(Array);
37+
}
38+
3539
const std::shared_ptr<arrow::Array>& GetArray() const {
3640
return Array;
3741
}
@@ -105,6 +109,7 @@ class TTrivialChunkedArray: public IChunkedArray {
105109
}
106110
virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override;
107111
virtual std::optional<ui64> DoGetRawSize() const override;
112+
108113
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override {
109114
auto chunk = GetChunkSlow(index);
110115
return NArrow::TStatusValidator::GetValid(chunk.GetArray()->GetScalar(chunk.GetAddress().GetLocalIndex(index)));
@@ -116,6 +121,10 @@ class TTrivialChunkedArray: public IChunkedArray {
116121
virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override;
117122

118123
public:
124+
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const override {
125+
return Array;
126+
}
127+
119128
TTrivialChunkedArray(const std::shared_ptr<arrow::ChunkedArray>& data)
120129
: TBase(data->length(), EType::ChunkedArray, data->type())
121130
, Array(data) {

ydb/core/formats/arrow/accessor/plain/constructor.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromStrin
1818
auto result = externalInfo.GetDefaultSerializer()->Deserialize(originalData, schema);
1919
if (!result.ok()) {
2020
return TConclusionStatus::Fail(result.status().ToString());
21-
} else {
22-
auto rb = TStatusValidator::GetValid(result);
23-
AFL_VERIFY(rb->num_columns() == 1)("count", rb->num_columns())("schema", schema->ToString());
24-
return std::make_shared<NArrow::NAccessor::TTrivialArray>(rb->column(0));
2521
}
22+
auto rb = TStatusValidator::GetValid(result);
23+
AFL_VERIFY(rb->num_columns() == 1)("count", rb->num_columns())("schema", schema->ToString());
24+
if (externalInfo.HasNullRecordsCount()) {
25+
rb->column(0)->data()->SetNullCount(externalInfo.GetNullRecordsCountVerified());
26+
}
27+
return std::make_shared<NArrow::NAccessor::TTrivialArray>(rb->column(0));
2628
}
2729

2830
TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstructDefault(const TChunkConstructionData& externalInfo) const {

ydb/core/formats/arrow/save_load/loader.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,18 @@ const std::shared_ptr<arrow::Field>& TColumnLoader::GetField() const {
2828
return ResultField;
2929
}
3030

31-
TChunkConstructionData TColumnLoader::BuildAccessorContext(const ui32 recordsCount) const {
32-
return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type(), Serializer.GetObjectPtr());
31+
TChunkConstructionData TColumnLoader::BuildAccessorContext(
32+
const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
33+
return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type(), Serializer.GetObjectPtr(), notNullCount);
3334
}
3435

35-
TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::ApplyConclusion(const TString& dataStr, const ui32 recordsCount) const {
36-
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount));
36+
TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::ApplyConclusion(
37+
const TString& dataStr, const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
38+
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount, notNullCount));
3739
}
3840

39-
std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount) const {
40-
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount)).DetachResult();
41+
std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
42+
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount, notNullCount)).DetachResult();
4143
}
4244

4345
TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::BuildAccessor(

ydb/core/formats/arrow/save_load/loader.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ class TColumnLoader {
3535

3636
const std::shared_ptr<arrow::Field>& GetField() const;
3737

38-
TChunkConstructionData BuildAccessorContext(const ui32 recordsCount) const;
39-
std::shared_ptr<IChunkedArray> ApplyVerified(const TString& data, const ui32 expectedRecordsCount) const;
40-
TConclusion<std::shared_ptr<IChunkedArray>> ApplyConclusion(const TString& data, const ui32 expectedRecordsCount) const;
38+
TChunkConstructionData BuildAccessorContext(const ui32 recordsCount, const std::optional<ui32>& notNullCount = std::nullopt) const;
39+
std::shared_ptr<IChunkedArray> ApplyVerified(
40+
const TString& data, const ui32 expectedRecordsCount, const std::optional<ui32>& notNullCount = std::nullopt) const;
41+
TConclusion<std::shared_ptr<IChunkedArray>> ApplyConclusion(
42+
const TString& data, const ui32 expectedRecordsCount, const std::optional<ui32>& notNullCount = std::nullopt) const;
4143
};
4244

4345
} // namespace NKikimr::NArrow::NAccessor

0 commit comments

Comments
 (0)