Skip to content

Support for UUID, issue 13047 #21052

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ydb/core/formats/arrow/arrow_batch_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ arrow::Status AppendCell(arrow::Decimal128Builder& builder, const TCell& cell) {
return builder.Append(cell.Data());
}

arrow::Status AppendCell(arrow::FixedSizeBinaryBuilder& builder, const TCell& cell) {
if (cell.IsNull()) {
return builder.AppendNull();
}
Y_ABORT_UNLESS(cell.Size() == static_cast<ui32>(builder.byte_width()));
return builder.Append(cell.Data());
}

template <typename TDataType>
arrow::Status AppendCell(arrow::RecordBatchBuilder& builder, const TCell& cell, ui32 colNum) {
using TBuilderType = typename arrow::TypeTraits<TDataType>::BuilderType;
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,22 @@ std::shared_ptr<arrow::DataType> CreateEmptyArrowImpl<arrow::DurationType>(const
return arrow::duration(arrow::TimeUnit::TimeUnit::MICRO);
}

template <>
std::shared_ptr<arrow::DataType> CreateEmptyArrowImpl<arrow::FixedSizeBinaryType>(const NScheme::TTypeInfo& typeInfo) {
if (typeInfo.GetTypeId() == NScheme::NTypeIds::Uuid) {
return arrow::fixed_size_binary(16);
}
return std::shared_ptr<arrow::DataType>();
}

arrow::Result<std::shared_ptr<arrow::DataType>> GetArrowType(NScheme::TTypeInfo typeInfo) {
std::shared_ptr<arrow::DataType> result;
bool success = SwitchYqlTypeToArrowType(typeInfo, [&]<typename TType>(TTypeWrapper<TType> typeHolder) {
Y_UNUSED(typeHolder);
result = CreateEmptyArrowImpl<TType>(typeInfo);
return true;
});
if (success) {
if (success && result) {
return result;
}

Expand All @@ -79,6 +87,8 @@ arrow::Result<std::shared_ptr<arrow::DataType>> GetCSVArrowType(NScheme::TTypeIn
case NScheme::NTypeIds::Date:
case NScheme::NTypeIds::Date32:
return std::make_shared<arrow::TimestampType>(arrow::TimeUnit::SECOND);
case NScheme::NTypeIds::Uuid:
return std::make_shared<arrow::StringType>();
default:
return GetArrowType(typeId);
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/formats/arrow/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class TArrowToYdbConverter {
return MakeCellFromView<arrow::Decimal128Array>(column, row);
}

template <>
TCell MakeCell<arrow::FixedSizeBinaryArray>(const std::shared_ptr<arrow::Array>& column, i64 row) {
return MakeCellFromView<arrow::FixedSizeBinaryArray>(column, row);
}

public:
static bool NeedDataConversion(const NScheme::TTypeInfo& colType);

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/formats/arrow/switch/switch_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ template <typename TFunc>
case NScheme::NTypeIds::Interval64:
return callback(TTypeWrapper<arrow::Int64Type>());

case NScheme::NTypeIds::Uuid:
return callback(TTypeWrapper<arrow::FixedSizeBinaryType>());

case NScheme::NTypeIds::PairUi64Ui64:
case NScheme::NTypeIds::ActorId:
case NScheme::NTypeIds::StepOrderId:
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/grpc_services/rpc_load_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,12 @@ bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType)
decimalType->set_scale(arrowDecimal->scale());
return true;
}
case arrow::Type::FIXED_SIZE_BINARY: {
toType.set_type_id(Ydb::Type::UUID);
return true;
}
case arrow::Type::NA:
case arrow::Type::HALF_FLOAT:
case arrow::Type::FIXED_SIZE_BINARY:
case arrow::Type::DATE32:
case arrow::Type::DATE64:
case arrow::Type::TIME32:
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/kqp/runtime/kqp_scan_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <yql/essentials/parser/pg_wrapper/interface/pack.h>
#include <yql/essentials/parser/pg_wrapper/interface/type_desc.h>
#include <yql/essentials/public/udf/arrow/util.h>
#include <yql/essentials/types/uuid/uuid.h>
#include <yql/essentials/utils/yql_panic.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/cast.h>
Expand Down Expand Up @@ -286,6 +287,23 @@ class TElementAccessor<arrow::FixedSizeBinaryArray, NUdf::TStringRef> {
}
};

template <>
class TElementAccessor<arrow::FixedSizeBinaryArray, NUdf::TUuid> {
public:
using TArrayType = arrow::FixedSizeBinaryArray;
static void Validate(const arrow::FixedSizeBinaryArray& array) {
YQL_ENSURE(array.byte_width() == NUuid::UUID_LEN);
}

static NYql::NUdf::TUnboxedValue ExtractValue(const arrow::FixedSizeBinaryArray& array, const ui32 rowIndex) {
auto data = array.GetView(rowIndex);
return MakeString(NUdf::TStringRef(data.data(), data.size()));
}
static TFixedWidthStatAccumulator BuildStatAccumulator(const NScheme::TTypeInfo& typeInfo) {
return TFixedWidthStatAccumulator(typeInfo);
}
};

}

template <class TElementAccessor, class TAccessor>
Expand Down Expand Up @@ -430,6 +448,10 @@ TBytesStatistics WriteColumnValuesFromArrowImpl(TAccessor editAccessor,
{
return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::Decimal128Array, NYql::NDecimal::TInt128>>(editAccessor, batch, columnIndex, columnPtr, columnType);
}
case NTypeIds::Uuid:
{
return WriteColumnValuesFromArrowSpecImpl<TElementAccessor<arrow::FixedSizeBinaryArray, NUdf::TUuid>>(editAccessor, batch, columnIndex, columnPtr, columnType);
}
case NTypeIds::PairUi64Ui64:
case NTypeIds::ActorId:
case NTypeIds::StepOrderId:
Expand Down
26 changes: 21 additions & 5 deletions ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <yql/essentials/public/udf/udf_ut_helpers.h>
#include <yql/essentials/minikql/mkql_alloc.h>
#include <yql/essentials/types/uuid/uuid.h>
#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr::NMiniKQL {
Expand Down Expand Up @@ -44,6 +45,7 @@ struct TDataRow {
{27, TTypeInfo(NPg::TypeDescFromPgTypeName("pgint8")), ""},
{28, TTypeInfo(NPg::TypeDescFromPgTypeName("pgfloat4")), ""},
{29, TTypeInfo(NPg::TypeDescFromPgTypeName("pgfloat8")), ""},
{30, TTypeInfo(NTypeIds::Uuid), ""},
};
}

Expand Down Expand Up @@ -77,6 +79,7 @@ struct TDataRow {
i64 PgInt8;
float PgFloat4;
double PgFloat8;
NYql::NUuid::TUuid Uuid;

static std::shared_ptr<arrow::Schema> MakeArrowSchema() {
std::vector<std::shared_ptr<arrow::Field>> fields = {
Expand Down Expand Up @@ -110,6 +113,7 @@ struct TDataRow {
arrow::field("pgint8", arrow::int64()),
arrow::field("pgfloat4", arrow::float32()),
arrow::field("pgfloat8", arrow::float64()),
arrow::field("uuid", arrow::fixed_size_binary(16)),
};

return std::make_shared<arrow::Schema>(std::move(fields));
Expand Down Expand Up @@ -195,6 +199,9 @@ std::shared_ptr<arrow::RecordBatch> VectorToBatch(const std::vector<struct TData
} else if (colName == "interval64") {
auto result = batchBuilder->GetFieldAs<arrow::Date64Builder>(colIndex++)->Append(row.Interval64);
UNIT_ASSERT(result.ok());
} else if (colName == "uuid") {
auto result = batchBuilder->GetFieldAs<arrow::FixedSizeBinaryBuilder>(colIndex++)->Append(row.Uuid.Data);
UNIT_ASSERT(result.ok());
} else if (colName == "dec") {
auto result = batchBuilder->GetFieldAs<arrow::Decimal128Builder>(colIndex++)->Append(reinterpret_cast<const char*>(&row.Decimal));
UNIT_ASSERT(result.ok());
Expand Down Expand Up @@ -225,13 +232,19 @@ std::shared_ptr<arrow::RecordBatch> VectorToBatch(const std::vector<struct TData
return batch;
}

NYql::NUuid::TUuid MakeUuid(TStringBuf stringUuid) {
NYql::NUuid::TUuid uuid;
Y_ABORT_UNLESS(NKikimr::NUuid::ParseUuidToArray(stringUuid, reinterpret_cast<ui16*>(uuid.Data), false));
return uuid;
}

TVector<TDataRow> TestRows() {
TVector<TDataRow> rows = {
{false, -1, -1, -1, -1, 1, 1, 1, 1, -1.0f, -1.0, "s1" , "u1" , "{j:1}", "{y:1}", 0, 0, 0, 0, -1, -1, -1, -1, 111, 1111, -21, 210, -2100, 21.3f, 21.6},
{false, 2, 2, 2, 2, 2, 2, 2, 2, 2.0f, 2.0, "s2" , "u2" , "{j:2}", "{y:2}", 0, 0, 0, 0, -2, -2, -2, -2, 222, 2222, 22, -220, 2200, -22.3f, 22.6},
{false, -3, -3, -3, -3, 3, 3, 3, 3, -3.0f, -3.0, "s3" , "u3" , "{j:3}", "{y:3}", 0, 0, 0, 0, -3, -3, -3, -3, 333, 3333, 23, 230, -2300, 23.3f, -23.6},
{false, -4, -4, -4, -4, 4, 4, 4, 4, 4.0f, 4.0, "s4" , "u4" , "{j:4}", "{y:4}", 0, 0, 0, 0, -4, -4, -4, -4, 444, 4444, -24, 240, 2400, -24.3f, 24.6},
{false, -5, -5, -5, -5, 5, 5, 5, 5, 5.0f, 5.0, "long5long5long5long5long5", "utflong5utflong5utflong5", "{j:5}", "{y:5}", 0, 0, 0, 0, -5, -5, -5, -5, 555, 5555, 25, -250, 2500, 25.3f, -25.6},
{false, -1, -1, -1, -1, 1, 1, 1, 1, -1.0f, -1.0, "s1" , "u1" , "{j:1}", "{y:1}", 0, 0, 0, 0, -1, -1, -1, -1, 111, 1111, -21, 210, -2100, 21.3f, 21.6, MakeUuid("00000011-0022-0033-0044-000000000055")},
{false, 2, 2, 2, 2, 2, 2, 2, 2, 2.0f, 2.0, "s2" , "u2" , "{j:2}", "{y:2}", 0, 0, 0, 0, -2, -2, -2, -2, 222, 2222, 22, -220, 2200, -22.3f, 22.6, MakeUuid("00001100-2200-3300-4400-000000005500")},
{false, -3, -3, -3, -3, 3, 3, 3, 3, -3.0f, -3.0, "s3" , "u3" , "{j:3}", "{y:3}", 0, 0, 0, 0, -3, -3, -3, -3, 333, 3333, 23, 230, -2300, 23.3f, -23.6, MakeUuid("00110000-0033-0044-0000-000000550000")},
{false, -4, -4, -4, -4, 4, 4, 4, 4, 4.0f, 4.0, "s4" , "u4" , "{j:4}", "{y:4}", 0, 0, 0, 0, -4, -4, -4, -4, 444, 4444, -24, 240, 2400, -24.3f, 24.6, MakeUuid("11000000-3300-4400-0000-000055000000")},
{false, -5, -5, -5, -5, 5, 5, 5, 5, 5.0f, 5.0, "long5long5long5long5long5", "utflong5utflong5utflong5", "{j:5}", "{y:5}", 0, 0, 0, 0, -5, -5, -5, -5, 555, 5555, 25, -250, 2500, 25.3f, -25.6, MakeUuid("00000033-0044-0000-0000-005500000011")},
};
return rows;
}
Expand All @@ -252,6 +265,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
NUdf::TStringValue str(pattern.size());
std::memcpy(str.Data(), pattern.data(), pattern.size());
NUdf::TUnboxedValue containsLongString(NUdf::TUnboxedValuePod(std::move(str)));
NYql::NUuid::TUuid uuid(MakeUuid("00000000-4400-3300-2200-000000000011"));
NYql::NDecimal::TInt128 decimalVal = 123456789012;
NYql::NDecimal::TInt128 decimal35Val = 987654321012;
TVector<TTestCase> cases = {
Expand Down Expand Up @@ -303,6 +317,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
{NUdf::TUnboxedValuePod::Embedded("{j:0}"), NTypeIds::Json , {16, 8 } },
{NUdf::TUnboxedValuePod::Embedded("{y:0}"), NTypeIds::Yson , {16, 8 } },
{containsLongString , NTypeIds::String, {16 + pattern.size(), pattern.size()}},
{NUdf::TUnboxedValuePod(uuid ), NTypeIds::Uuid, {16 + 16, 16}},
{NUdf::TUnboxedValuePod( ), TTypeInfo(NPg::TypeDescFromPgTypeName("pgint2" )), {16, 8 } },
{NUdf::TUnboxedValuePod( ), TTypeInfo(NPg::TypeDescFromPgTypeName("pgint4" )), {16, 8 } },
{NUdf::TUnboxedValuePod( ), TTypeInfo(NPg::TypeDescFromPgTypeName("pgint8" )), {16, 8 } },
Expand Down Expand Up @@ -374,6 +389,7 @@ Y_UNIT_TEST_SUITE(TKqpScanData) {
UNIT_ASSERT_EQUAL(container[27].Get<i64 >(), row.PgInt8 );
UNIT_ASSERT_EQUAL(container[28].Get<float >(), row.PgFloat4 );
UNIT_ASSERT_EQUAL(container[29].Get<double>(), row.PgFloat8 );
UNIT_ASSERT_EQUAL(container[30].Get<NYql::NUuid::TUuid>(), row.Uuid );
}

UNIT_ASSERT(scanData.IsEmpty());
Expand Down
12 changes: 7 additions & 5 deletions ydb/core/kqp/ut/arrow/kqp_types_arrow_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void InsertAllColumnsAndCheckSelectAll(TKikimrRunner* runner) {
YsonValue Yson,
JsonDocumentValue JsonDocument,
DyNumberValue DyNumber,
UuidValue Uuid,
Int32NotNullValue Int32 NOT NULL,
PRIMARY KEY (Key)
);
Expand All @@ -56,18 +57,18 @@ void InsertAllColumnsAndCheckSelectAll(TKikimrRunner* runner) {

auto insertResult = session.ExecuteDataQuery(R"(
--!syntax_v1
INSERT INTO `/Root/Tmp` (Key, BoolValue, Int32Value, Uint32Value, Int64Value, Uint64Value, FloatValue, DoubleValue, StringValue, Utf8Value, DateValue, DatetimeValue, TimestampValue, IntervalValue, DecimalValue, JsonValue, YsonValue, JsonDocumentValue, DyNumberValue, Int32NotNullValue) VALUES
(42, true, -1, 1, -2, 2, CAST(3.0 AS Float), 4.0, "five", Utf8("six"), Date("2007-07-07"), Datetime("2008-08-08T08:08:08Z"), Timestamp("2009-09-09T09:09:09.09Z"), Interval("P10D"), CAST("11.11" AS Decimal(22, 9)), "[12]", "[13]", JsonDocument("[14]"), DyNumber("15.15"), 123);
INSERT INTO `/Root/Tmp` (Key, BoolValue, Int32Value, Uint32Value, Int64Value, Uint64Value, FloatValue, DoubleValue, StringValue, Utf8Value, DateValue, DatetimeValue, TimestampValue, IntervalValue, DecimalValue, JsonValue, YsonValue, JsonDocumentValue, DyNumberValue, UuidValue, Int32NotNullValue) VALUES
(42, true, -1, 1, -2, 2, CAST(3.0 AS Float), 4.0, "five", Utf8("six"), Date("2007-07-07"), Datetime("2008-08-08T08:08:08Z"), Timestamp("2009-09-09T09:09:09.09Z"), Interval("P10D"), CAST("11.11" AS Decimal(22, 9)), "[12]", "[13]", JsonDocument("[14]"), DyNumber("15.15"), Uuid("00000000-0000-0000-0000-000000000011"), 123);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());

auto it = db.StreamExecuteScanQuery("SELECT Key, BoolValue, Int32Value, Uint32Value, Int64Value, Uint64Value, FloatValue, DoubleValue, StringValue, Utf8Value, DateValue, DatetimeValue, TimestampValue, IntervalValue, DecimalValue, JsonValue, YsonValue, JsonDocumentValue, DyNumberValue, Int32NotNullValue FROM `/Root/Tmp`").GetValueSync();
auto it = db.StreamExecuteScanQuery("SELECT Key, BoolValue, Int32Value, Uint32Value, Int64Value, Uint64Value, FloatValue, DoubleValue, StringValue, Utf8Value, DateValue, DatetimeValue, TimestampValue, IntervalValue, DecimalValue, JsonValue, YsonValue, JsonDocumentValue, DyNumberValue, UuidValue, Int32NotNullValue FROM `/Root/Tmp`").GetValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
auto streamPart = it.ReadNext().GetValueSync();
UNIT_ASSERT_C(streamPart.IsSuccess(), streamPart.GetIssues().ToString());
auto resultSet = streamPart.ExtractResultSet();
auto columns = resultSet.GetColumnsMeta();
UNIT_ASSERT_C(columns.size() == 20, "Wrong columns count");
UNIT_ASSERT_C(columns.size() == 21, "Wrong columns count");
NYdb::TResultSetParser parser(resultSet);
UNIT_ASSERT_C(parser.TryNextRow(), "Row is missing");
UNIT_ASSERT(parser.ColumnParser(0).GetOptionalUint64().value() == 42);
Expand All @@ -90,7 +91,8 @@ void InsertAllColumnsAndCheckSelectAll(TKikimrRunner* runner) {
UNIT_ASSERT(parser.ColumnParser(16).GetOptionalYson().value() == "[13]");
UNIT_ASSERT(parser.ColumnParser(17).GetOptionalJsonDocument().value() == "[14]");
UNIT_ASSERT(parser.ColumnParser(18).GetOptionalDyNumber().value() == ".1515e2");
UNIT_ASSERT(parser.ColumnParser(19).GetInt32() == 123);
UNIT_ASSERT(parser.ColumnParser(19).GetOptionalUuid().value().ToString() == "00000000-0000-0000-0000-000000000011");
UNIT_ASSERT(parser.ColumnParser(20).GetInt32() == 123);
}

}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/formats/arrow/serializer/parsing.h>
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/tx/columnshard/engines/scheme/objects_cache.h>
#include <yql/essentials/types/uuid/uuid.h>

extern "C" {
#include <yql/essentials/parser/pg_wrapper/postgresql/src/include/catalog/pg_type_d.h>
Expand Down Expand Up @@ -392,6 +393,8 @@ namespace NKqp {
return arrow::field(name, arrow::binary(), nullable);
case NScheme::NTypeIds::Decimal:
return arrow::field(name, arrow::decimal(typeInfo.GetDecimalType().GetPrecision(), typeInfo.GetDecimalType().GetScale()), nullable);
case NScheme::NTypeIds::Uuid:
return arrow::field(name, arrow::fixed_size_binary(NUuid::UUID_LEN), nullable);
case NScheme::NTypeIds::Pg:
switch (NPg::PgTypeIdFromTypeDesc(typeInfo.GetPgTypeDesc())) {
case INT2OID:
Expand Down
Loading