Skip to content

Commit f6d081d

Browse files
authored
Fix json (#9034)
1 parent df017db commit f6d081d

File tree

1 file changed

+24
-2
lines changed

1 file changed

+24
-2
lines changed

ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/library/yql/utils/yql_panic.h>
77
#include <ydb/library/yql/minikql/mkql_type_builder.h>
88
#include <ydb/library/yql/minikql/mkql_type_ops.h>
9+
#include <ydb/library/yql/minikql/mkql_node_cast.h>
910

1011
#include <library/cpp/yson/node/node_io.h>
1112
#include <library/cpp/yson/detail.h>
@@ -254,6 +255,10 @@ class IYsonBlockReaderWithNativeFlag : public IYsonBlockReader {
254255
return NUdf::TBlockItem();
255256
}
256257
YQL_ENSURE(prev == BeginListSymbol);
258+
if (buf.Current() == EndListSymbol) {
259+
buf.Next();
260+
return NUdf::TBlockItem();
261+
}
257262
auto result = GetNotNull(buf);
258263
if (buf.Current() == ListItemSeparatorSymbol) {
259264
buf.Next();
@@ -513,7 +518,7 @@ class TPrimitiveColumnConverter {
513518
case arrow::Type::UINT64: PrimitiveConverterImpl_ = GEN_TYPE(UInt64); break;
514519
case arrow::Type::DOUBLE: PrimitiveConverterImpl_ = GEN_TYPE(Double); break;
515520
case arrow::Type::FLOAT: PrimitiveConverterImpl_ = GEN_TYPE(Float); break;
516-
case arrow::Type::STRING: PrimitiveConverterImpl_ = GEN_TYPE_STR(String); break;
521+
case arrow::Type::STRING: PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary); break; // all strings from yt is in binary format
517522
case arrow::Type::BINARY: PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary); break;
518523
default:
519524
return; // will check in runtime
@@ -615,7 +620,13 @@ class TYtColumnConverter final : public IYtColumnConverter {
615620
: Settings_(std::move(settings))
616621
, DictYsonConverter_(Settings_)
617622
, YsonConverter_(Settings_)
618-
, DictPrimitiveConverter_(Settings_) {}
623+
, DictPrimitiveConverter_(Settings_)
624+
{
625+
auto type = Settings_.Type;
626+
IsJson_ = type->IsData() && AS_TYPE(TDataType, type)->GetDataSlot() == NUdf::EDataSlot::Json
627+
|| (Native && type->IsOptional() && AS_TYPE(TOptionalType, type)->GetItemType()->IsData()
628+
&& AS_TYPE(TDataType, AS_TYPE(TOptionalType, type)->GetItemType())->GetDataSlot() == NUdf::EDataSlot::Json);
629+
}
619630

620631
arrow::Datum Convert(std::shared_ptr<arrow::ArrayData> block) override {
621632
if (arrow::Type::DICTIONARY == block->type->id()) {
@@ -628,6 +639,11 @@ class TYtColumnConverter final : public IYtColumnConverter {
628639
auto result = arrow::compute::Cast(DictPrimitiveConverter_.Convert(block), Settings_.ArrowType);
629640
YQL_ENSURE(result.ok());
630641
return *result;
642+
} else if (IsJson_ && arrow::Type::STRING == Settings_.ArrowType->id() && arrow::Type::BINARY == valType->id())
643+
{
644+
auto result = arrow::compute::Cast(DictPrimitiveConverter_.Convert(block), Settings_.ArrowType);
645+
YQL_ENSURE(result.ok());
646+
return *result;
631647
} else {
632648
return DictYsonConverter_.Convert(block);
633649
}
@@ -640,6 +656,11 @@ class TYtColumnConverter final : public IYtColumnConverter {
640656
auto result = arrow::compute::Cast(arrow::Datum(*block), Settings_.ArrowType);
641657
YQL_ENSURE(result.ok());
642658
return *result;
659+
} else if (IsJson_ && arrow::Type::STRING == Settings_.ArrowType->id() && arrow::Type::BINARY == blockType->id())
660+
{
661+
auto result = arrow::compute::Cast(arrow::Datum(*block), Settings_.ArrowType);
662+
YQL_ENSURE(result.ok());
663+
return *result;
643664
} else {
644665
YQL_ENSURE(arrow::Type::BINARY == blockType->id());
645666
return YsonConverter_.Convert(block);
@@ -651,6 +672,7 @@ class TYtColumnConverter final : public IYtColumnConverter {
651672
TYtYsonColumnConverter<Native, IsTopOptional, true> DictYsonConverter_;
652673
TYtYsonColumnConverter<Native, IsTopOptional, false> YsonConverter_;
653674
TPrimitiveColumnConverter<true> DictPrimitiveConverter_;
675+
bool IsJson_;
654676
};
655677

656678
TYtColumnConverterSettings::TYtColumnConverterSettings(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative)

0 commit comments

Comments
 (0)