Skip to content

Commit 29fff7e

Browse files
authored
list have been supported for s3 reader (#6651)
1 parent adb490f commit 29fff7e

18 files changed

+352
-20
lines changed

ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5708,7 +5708,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo
57085708
std::visit([&types](const auto& value) { types.IncNoBlockType(value); }, typeKindOrSlot);
57095709
};
57105710

5711-
auto resolveStatus = types.ArrowResolver->AreTypesSupported(ctx.GetPosition(lambda->Pos()), allInputTypes, ctx, onUnsupportedType);
5711+
auto resolveStatus = types.ArrowResolver->AreTypesSupported(ctx.GetPosition(lambda->Pos()), allInputTypes, ctx, false, onUnsupportedType);
57125712
YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);
57135713
if (resolveStatus != IArrowResolver::OK) {
57145714
return false;
@@ -5848,7 +5848,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo
58485848
allTypes.push_back(node->Child(i)->GetTypeAnn());
58495849
}
58505850

5851-
auto resolveStatus = types.ArrowResolver->AreTypesSupported(ctx.GetPosition(node->Pos()), allTypes, ctx, onUnsupportedType);
5851+
auto resolveStatus = types.ArrowResolver->AreTypesSupported(ctx.GetPosition(node->Pos()), allTypes, ctx, false, onUnsupportedType);
58525852
YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);
58535853
if (resolveStatus != IArrowResolver::OK) {
58545854
return true;

ydb/library/yql/core/yql_arrow_resolver.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class IArrowResolver : public TThrRefBase {
2525

2626
virtual EStatus HasCast(const TPosition& pos, const TTypeAnnotationNode* from, const TTypeAnnotationNode* to, TExprContext& ctx) const = 0;
2727

28-
virtual EStatus AreTypesSupported(const TPosition& pos, const TVector<const TTypeAnnotationNode*>& types, TExprContext& ctx,
28+
virtual EStatus AreTypesSupported(const TPosition& pos, const TVector<const TTypeAnnotationNode*>& types, TExprContext& ctx, bool extraTypes = false,
2929
const TUnsupportedTypeCallback& onUnsupported = {}) const = 0;
3030
};
3131

ydb/library/yql/core/yql_expr_type_annotation.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3153,7 +3153,7 @@ bool IsSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type
31533153
std::visit([&types](const auto& value) { types.IncNoBlockType(value); }, typeKindOrSlot);
31543154
};
31553155
}
3156-
auto resolveStatus = types.ArrowResolver->AreTypesSupported(ctx.GetPosition(pos), { &type }, ctx, onUnsupportedType);
3156+
auto resolveStatus = types.ArrowResolver->AreTypesSupported(ctx.GetPosition(pos), { &type }, ctx, false, onUnsupportedType);
31573157
YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);
31583158
return resolveStatus == IArrowResolver::OK;
31593159
}

ydb/library/yql/minikql/arrow/mkql_functions.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace NKikimr::NMiniKQL {
1515
bool ConvertInputArrowType(TType* blockType, arrow::ValueDescr& descr) {
1616
auto asBlockType = AS_TYPE(TBlockType, blockType);
1717
descr.shape = asBlockType->GetShape() == TBlockType::EShape::Scalar ? arrow::ValueDescr::SCALAR : arrow::ValueDescr::ARRAY;
18-
return ConvertArrowType(asBlockType->GetItemType(), descr.type);
18+
return ConvertArrowType(asBlockType->GetItemType(), descr.type, true);
1919
}
2020

2121
class TOutputTypeVisitor : public arrow::TypeVisitor

ydb/library/yql/minikql/computation/mkql_block_impl.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <ydb/library/yql/minikql/arrow/arrow_util.h>
99
#include <ydb/library/yql/minikql/arrow/mkql_bit_utils.h>
1010
#include <ydb/library/yql/public/udf/arrow/args_dechunker.h>
11+
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_base.h>
12+
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_binary.h>
1113

1214
#include <ydb/library/yql/parser/pg_wrapper/interface/arrow.h>
1315

@@ -65,6 +67,29 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
6567
return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType));
6668
}
6769

70+
if (type->IsList()) {
71+
auto listType = AS_TYPE(TListType, type);
72+
std::shared_ptr<arrow::DataType> itemType;
73+
MKQL_ENSURE(ConvertArrowType(listType->GetItemType(), itemType), "Unsupported type of scalar " << *listType->GetItemType());
74+
75+
std::unique_ptr<arrow::ArrayBuilder> builder;
76+
auto status = arrow::MakeBuilder(&pool, itemType, &builder);
77+
MKQL_ENSURE(status.ok(), "Couldn't create arrow list builder: " << status.ToString());
78+
79+
auto boxed = value.AsBoxed();
80+
auto iterator = NUdf::TBoxedValueAccessor::GetListIterator(*boxed);
81+
NYql::NUdf::TUnboxedValue unboxed;
82+
while (iterator.Next(unboxed)) {
83+
auto status = builder->AppendScalar(*DoConvertScalar(listType->GetItemType(), unboxed, pool).scalar());
84+
MKQL_ENSURE(status.ok(), "Couldn't append scalar to arrow list builder: " << status.ToString());
85+
}
86+
87+
std::shared_ptr<arrow::Array> array;
88+
status = builder->Finish(&array);
89+
MKQL_ENSURE(status.ok(), "Couldn't finish arrow list builder: " << status.ToString());
90+
return arrow::Datum(std::make_shared<arrow::ListScalar>(array));
91+
}
92+
6893
if (type->IsTuple()) {
6994
auto tupleType = AS_TYPE(TTupleType, type);
7095
std::vector<std::shared_ptr<arrow::Scalar>> arrowValue;

ydb/library/yql/minikql/computation/mkql_block_reader.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,15 @@ struct TConverterTraits {
306306
}
307307
}
308308

309+
static std::unique_ptr<TResult> MakeList(bool isOptional, std::unique_ptr<IBlockItemConverter>&&) {
310+
if (isOptional) {
311+
return std::make_unique<TResourceBlockItemConverter<true>>();
312+
} else {
313+
return std::make_unique<TResourceBlockItemConverter<false>>();
314+
}
315+
}
316+
309317
static std::unique_ptr<TResult> MakeResource(bool isOptional) {
310-
Y_UNUSED(isOptional);
311318
if (isOptional) {
312319
return std::make_unique<TResourceBlockItemConverter<true>>();
313320
} else {

ydb/library/yql/minikql/computation/mkql_block_transport.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,11 @@ struct TSerializerTraits {
630630
return std::make_unique<TStrings<arrow::BinaryType, true>>();
631631
}
632632

633+
static std::unique_ptr<TResult> MakeList(bool isOptional, std::unique_ptr<IBlockSerializer>&& inner) {
634+
Y_UNUSED(isOptional, inner);
635+
ythrow yexception() << "Serializer not implemented for list";
636+
}
637+
633638
static std::unique_ptr<TResult> MakeResource(bool isOptional) {
634639
Y_UNUSED(isOptional);
635640
ythrow yexception() << "Serializer not implemented for block resources";
@@ -666,6 +671,11 @@ struct TDeserializerTraits {
666671
return std::make_unique<TStrings<arrow::BinaryType, true>>();
667672
}
668673

674+
static std::unique_ptr<TResult> MakeList(bool isOptional, std::unique_ptr<TBlockDeserializerBase>&& inner) {
675+
Y_UNUSED(isOptional, inner);
676+
ythrow yexception() << "Deserializer not implemented for list";
677+
}
678+
669679
static std::unique_ptr<TResult> MakeResource(bool isOptional) {
670680
Y_UNUSED(isOptional);
671681
ythrow yexception() << "Deserializer not implemented for block resources";

ydb/library/yql/minikql/mkql_type_builder.cpp

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,7 +1517,7 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& ty
15171517
}
15181518
}
15191519

1520-
bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type, const TArrowConvertFailedCallback& onFail) {
1520+
bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type, bool extraTypes, const TArrowConvertFailedCallback& onFail) {
15211521
bool isOptional;
15221522
auto unpacked = UnpackOptional(itemType, isOptional);
15231523
if (unpacked->IsOptional() || isOptional && unpacked->IsPg()) {
@@ -1538,7 +1538,7 @@ bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type, c
15381538

15391539
// previousType is always Optional
15401540
std::shared_ptr<arrow::DataType> innerArrowType;
1541-
if (!ConvertArrowType(previousType, innerArrowType, onFail)) {
1541+
if (!ConvertArrowType(previousType, innerArrowType, extraTypes, onFail)) {
15421542
return false;
15431543
}
15441544

@@ -1560,7 +1560,7 @@ bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type, c
15601560
std::shared_ptr<arrow::DataType> childType;
15611561
const TString memberName(structType->GetMemberName(i));
15621562
auto memberType = structType->GetMemberType(i);
1563-
if (!ConvertArrowType(memberType, childType, onFail)) {
1563+
if (!ConvertArrowType(memberType, childType, extraTypes, onFail)) {
15641564
return false;
15651565
}
15661566
members.emplace_back(std::make_shared<arrow::Field>(memberName, childType, memberType->IsOptional()));
@@ -1570,13 +1570,27 @@ bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type, c
15701570
return true;
15711571
}
15721572

1573+
if (extraTypes) {
1574+
if (unpacked->IsList()) {
1575+
auto listType = AS_TYPE(TListType, unpacked);
1576+
std::shared_ptr<arrow::DataType> childType;
1577+
auto itemType = listType->GetItemType();
1578+
if (!ConvertArrowType(itemType, childType, extraTypes)) {
1579+
return false;
1580+
}
1581+
type = std::make_shared<arrow::ListType>(std::make_shared<arrow::Field>("item", childType, itemType->IsOptional()));
1582+
return true;
1583+
}
1584+
}
1585+
1586+
15731587
if (unpacked->IsTuple()) {
15741588
auto tupleType = AS_TYPE(TTupleType, unpacked);
15751589
std::vector<std::shared_ptr<arrow::Field>> fields;
15761590
for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
15771591
std::shared_ptr<arrow::DataType> childType;
15781592
auto elementType = tupleType->GetElementType(i);
1579-
if (!ConvertArrowType(elementType, childType, onFail)) {
1593+
if (!ConvertArrowType(elementType, childType, extraTypes, onFail)) {
15801594
return false;
15811595
}
15821596

@@ -2424,6 +2438,10 @@ size_t CalcMaxBlockItemSize(const TType* type) {
24242438
return result;
24252439
}
24262440

2441+
if (type->IsList()) {
2442+
return sizeof(NYql::NUdf::TUnboxedValue);
2443+
}
2444+
24272445
if (type->IsTuple()) {
24282446
auto tupleType = AS_TYPE(TTupleType, type);
24292447
size_t result = 0;
@@ -2526,6 +2544,11 @@ struct TComparatorTraits {
25262544
return std::unique_ptr<TResult>(MakePgItemComparator(desc.TypeId).Release());
25272545
}
25282546

2547+
static std::unique_ptr<TResult> MakeList(bool isOptional, std::unique_ptr<NYql::NUdf::IBlockItemComparator>&& inner) {
2548+
Y_UNUSED(isOptional, inner);
2549+
ythrow yexception() << "Comparator not implemented for block list: ";
2550+
}
2551+
25292552
static std::unique_ptr<TResult> MakeResource(bool isOptional) {
25302553
Y_UNUSED(isOptional);
25312554
ythrow yexception() << "Comparator not implemented for block resources: ";
@@ -2558,6 +2581,11 @@ struct THasherTraits {
25582581
return std::unique_ptr<TResult>(MakePgItemHasher(desc.TypeId).Release());
25592582
}
25602583

2584+
static std::unique_ptr<TResult> MakeList(bool isOptional, std::unique_ptr<NYql::NUdf::IBlockItemHasher>&& inner) {
2585+
Y_UNUSED(isOptional, inner);
2586+
ythrow yexception() << "Hasher not implemented for list";
2587+
}
2588+
25612589
static std::unique_ptr<TResult> MakeResource(bool isOptional) {
25622590
Y_UNUSED(isOptional);
25632591
ythrow yexception() << "Hasher not implemented for block resources";

ydb/library/yql/minikql/mkql_type_builder.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ inline size_t CalcBlockLen(size_t maxBlockItemSize) {
3131
}
3232

3333
using TArrowConvertFailedCallback = std::function<void(TType*)>;
34-
bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type, const TArrowConvertFailedCallback& = {});
34+
bool ConvertArrowType(TType* itemType, std::shared_ptr<arrow::DataType>& type, bool extraTypes = false, const TArrowConvertFailedCallback& = {});
3535
bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>& type);
3636

3737
template<NUdf::EDataSlot slot>

ydb/library/yql/providers/common/arrow_resolve/yql_simple_arrow_resolver.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class TSimpleArrowResolver: public IArrowResolver {
5757
}
5858
}
5959

60-
EStatus AreTypesSupported(const TPosition& pos, const TVector<const TTypeAnnotationNode*>& types, TExprContext& ctx,
60+
EStatus AreTypesSupported(const TPosition& pos, const TVector<const TTypeAnnotationNode*>& types, TExprContext& ctx, bool extraTypes,
6161
const TUnsupportedTypeCallback& onUnsupported = {}) const override
6262
{
6363
try {
@@ -84,7 +84,7 @@ class TSimpleArrowResolver: public IArrowResolver {
8484
TNullOutput null;
8585
auto mkqlType = NCommon::BuildType(*type, typeBuilder, null);
8686
std::shared_ptr<arrow::DataType> arrowType;
87-
if (!ConvertArrowType(mkqlType, arrowType, cb)) {
87+
if (!ConvertArrowType(mkqlType, arrowType, extraTypes, cb)) {
8888
allOk = false;
8989
if (!cb) {
9090
break;

0 commit comments

Comments
 (0)