diff --git a/ydb/core/formats/arrow/arrow_batch_builder.cpp b/ydb/core/formats/arrow/arrow_batch_builder.cpp index 7db800ffd816..5d87a89651ab 100644 --- a/ydb/core/formats/arrow/arrow_batch_builder.cpp +++ b/ydb/core/formats/arrow/arrow_batch_builder.cpp @@ -35,15 +35,26 @@ arrow::Status AppendCell(arrow::StringBuilder& builder, const TCell& cell) { return builder.Append(cell.Data(), cell.Size()); } -arrow::Status AppendCell(arrow::Decimal128Builder& builder, const TCell& cell) { +// arrow::Status AppendCell(arrow::Decimal128Builder& builder, const TCell& cell) { +// if (cell.IsNull()) { +// return builder.AppendNull(); +// } + +// /// @warning There's no conversion for special YQL Decimal valies here, +// /// so we could convert them to Arrow and back but cannot calculate anything on them. +// /// We need separate Arrow.Decimal, YQL.Decimal, CH.Decimal and YDB.Decimal in future. +// return builder.Append(cell.Data()); +// } + +arrow::Status AppendCell(arrow::FixedSizeBinaryBuilder& builder, const TCell& cell) { + std::cout << "[DEBUG] AppendCell: is_null=" << cell.IsNull() << " size=" << cell.Size() << std::endl; if (cell.IsNull()) { return builder.AppendNull(); } - - /// @warning There's no conversion for special YQL Decimal valies here, - /// so we could convert them to Arrow and back but cannot calculate anything on them. - /// We need separate Arrow.Decimal, YQL.Decimal, CH.Decimal and YDB.Decimal in future. - return builder.Append(cell.Data()); + if (cell.Size() != 16) { + return arrow::Status::Invalid("Decimal cell must be 16 bytes, got ", cell.Size()); + } + return builder.Append(reinterpret_cast(cell.Data())); } template diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 0cc76775251b..f42e0ea873ef 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -37,8 +37,9 @@ std::shared_ptr CreateEmptyArrowImpl(const NScheme::TTypeInfo& } template <> -std::shared_ptr CreateEmptyArrowImpl(const NScheme::TTypeInfo& typeInfo) { - return arrow::decimal(typeInfo.GetDecimalType().GetPrecision(), typeInfo.GetDecimalType().GetScale()); +std::shared_ptr CreateEmptyArrowImpl(const NScheme::TTypeInfo& typeInfo) { + Y_UNUSED(typeInfo); + return std::make_shared(16); } template <> diff --git a/ydb/core/formats/arrow/converter.h b/ydb/core/formats/arrow/converter.h index 8288d50cb178..e74cddb372bc 100644 --- a/ydb/core/formats/arrow/converter.h +++ b/ydb/core/formats/arrow/converter.h @@ -33,7 +33,12 @@ class TArrowToYdbConverter { template TCell MakeCellFromView(const std::shared_ptr& column, i64 row) { auto array = std::static_pointer_cast(column); + if (array->IsNull(row)) { + std::cout << "[DEBUG] MakeCellFromView: row=" << row << " is_null=1 size=0" << std::endl; + return TCell(); + } auto data = array->GetView(row); + std::cout << "[DEBUG] MakeCellFromView: row=" << row << " is_null=0 size=" << data.size() << std::endl; return TCell(data.data(), data.size()); } @@ -57,6 +62,18 @@ class TArrowToYdbConverter { return MakeCellFromView(column, row); } + template <> + TCell MakeCell(const std::shared_ptr& column, i64 row) { + auto array = std::static_pointer_cast(column); + if (array->IsNull(row)) { + std::cout << "[DEBUG] MakeCell: row=" << row << " is_null=1 size=0" << std::endl; + return TCell(); + } + auto data = array->GetView(row); + std::cout << "[DEBUG] MakeCell: row=" << row << " is_null=0 size=" << data.size() << std::endl; + return TCell(data.data(), data.size()); + } + public: static bool NeedDataConversion(const NScheme::TTypeInfo& colType); diff --git a/ydb/core/formats/arrow/switch/switch_type.h b/ydb/core/formats/arrow/switch/switch_type.h index b5183df39d9a..fd8b167ef5e4 100644 --- a/ydb/core/formats/arrow/switch/switch_type.h +++ b/ydb/core/formats/arrow/switch/switch_type.h @@ -64,7 +64,7 @@ template case NScheme::NTypeIds::Interval: return callback(TTypeWrapper()); case NScheme::NTypeIds::Decimal: - return callback(TTypeWrapper()); + return callback(TTypeWrapper()); case NScheme::NTypeIds::Datetime64: case NScheme::NTypeIds::Timestamp64: diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index e544a56f94db..fbbc5e7348cf 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -18,6 +18,7 @@ #include #include +#include namespace NKikimr { namespace NGRpcService { @@ -28,7 +29,7 @@ using namespace Ydb; namespace { // TODO: no mapping for DATE, DATETIME, TZ_*, YSON, JSON, UUID, JSON_DOCUMENT, DYNUMBER -bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType) { +bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType, const arrow::Field* field = nullptr) { switch (type.id()) { case arrow::Type::BOOL: toType.set_type_id(Ydb::Type::BOOL); @@ -82,9 +83,29 @@ bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType) decimalType->set_scale(arrowDecimal->scale()); return true; } + case arrow::Type::FIXED_SIZE_BINARY: { + auto& fsb = static_cast(type); + if (fsb.byte_width() == 16) { + Ydb::DecimalType* decimalType = toType.mutable_decimal_type(); + ui32 precision = 22, scale = 9; + if (field && field->metadata()) { + auto precisionMeta = field->metadata()->Get("precision").ValueOr(""); + auto scaleMeta = field->metadata()->Get("scale").ValueOr(""); + if (!precisionMeta.empty()) { + precision = FromString(precisionMeta); + } + if (!scaleMeta.empty()) { + scale = FromString(scaleMeta); + } + } + decimalType->set_precision(precision); + decimalType->set_scale(scale); + return true; + } + break; + } case arrow::Type::NA: case arrow::Type::HALF_FLOAT: - case arrow::Type::FIXED_SIZE_BINARY: case arrow::Type::DATE32: case arrow::Type::DATE64: case arrow::Type::TIME32: @@ -410,7 +431,7 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBasetype(); Ydb::Type ydbType; - if (!ConvertArrowToYdbPrimitive(*type, ydbType)) { + if (!ConvertArrowToYdbPrimitive(*type, ydbType, field.get())) { return TConclusionStatus::Fail("Cannot convert arrow type to ydb one: " + type->ToString()); } out.emplace_back(name, std::move(ydbType)); diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp index 16f2f4e14393..47932188168f 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp @@ -17,6 +17,8 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA return node; } + Y_UNUSED(typesCtx); + auto rowType = node.Ref().GetTypeAnn()->Cast()->GetItemType()->Cast(); TVector args; @@ -41,18 +43,14 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA } else if (auto maybeRead = node.Maybe()) { wideRead = ctx.RenameNode(*node.Raw(), TKqpWideReadTableRanges::CallableName()); } else if (auto maybeRead = node.Maybe()) { - if (typesCtx.IsBlockEngineEnabled()) { - wideRead = Build(ctx, node.Pos()) - .Input() - .Input() - .Input(ctx.RenameNode(*node.Raw(), TKqpBlockReadOlapTableRanges::CallableName())) - .Build() + wideRead = Build(ctx, node.Pos()) + .Input() + .Input() + .Input(ctx.RenameNode(*node.Raw(), TKqpBlockReadOlapTableRanges::CallableName())) .Build() - .Done() - .Ptr(); - } else { - wideRead = ctx.RenameNode(*node.Raw(), TKqpWideReadOlapTableRanges::CallableName()); - } + .Build() + .Done() + .Ptr(); } else { YQL_ENSURE(false, "Unknown read table operation: " << node.Ptr()->Content()); } diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index f44ce45889b5..ab40dd9eb1bc 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -1,6 +1,7 @@ #include "columnshard.h" #include +#include #include #include #include @@ -390,8 +391,16 @@ namespace NKqp { return arrow::field(name, arrow::int64(), nullable); case NScheme::NTypeIds::JsonDocument: return arrow::field(name, arrow::binary(), nullable); - case NScheme::NTypeIds::Decimal: - return arrow::field(name, arrow::decimal(typeInfo.GetDecimalType().GetPrecision(), typeInfo.GetDecimalType().GetScale()), nullable); + case NScheme::NTypeIds::Decimal: { + auto meta = arrow::KeyValueMetadata::Make( + std::vector{"precision", "scale"}, + std::vector{ + ToString(typeInfo.GetDecimalType().GetPrecision()), + ToString(typeInfo.GetDecimalType().GetScale()) + } + ); + return arrow::field(name, arrow::fixed_size_binary(16), nullable, meta); + } case NScheme::NTypeIds::Pg: switch (NPg::PgTypeIdFromTypeDesc(typeInfo.GetPgTypeDesc())) { case INT2OID: diff --git a/ydb/core/kqp/ut/olap/decimal_ut.cpp b/ydb/core/kqp/ut/olap/decimal_ut.cpp index 99cb4795ffd2..3e0fb61da7cd 100644 --- a/ydb/core/kqp/ut/olap/decimal_ut.cpp +++ b/ydb/core/kqp/ut/olap/decimal_ut.cpp @@ -244,9 +244,9 @@ Y_UNIT_TEST_SUITE(KqpDecimalColumnShard) { tester35.PrepareTable1(); auto check = [](const TDecimalTestCase& tester) { - tester.CheckQuery("SELECT min(dec) FROM `/Root/Table1`", "[[[\"3.14\"]]]"); - tester.CheckQuery("SELECT max(dec) FROM `/Root/Table1`", "[[[\"12.46\"]]]"); - tester.CheckQuery("SELECT sum(dec) FROM `/Root/Table1`", "[[[\"32.252\"]]]"); + tester.CheckQuery("SELECT min(dec) FROM `/Root/Table1`", "[[[\"3.14\"]]]", EQueryMode::EXECUTE_QUERY); + tester.CheckQuery("SELECT max(dec) FROM `/Root/Table1`", "[[[\"12.46\"]]]", EQueryMode::EXECUTE_QUERY); + tester.CheckQuery("SELECT sum(dec) FROM `/Root/Table1`", "[[[\"32.252\"]]]", EQueryMode::EXECUTE_QUERY); }; check(tester22); diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h index 38f878b400ed..5a35cd98e4c3 100644 --- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h @@ -534,9 +534,13 @@ class TTableUpdatesBuilder { } } - if constexpr (std::is_same::value) { - if constexpr (arrow::is_decimal128_type::value) { - Y_ABORT_UNLESS(typedBuilder.Append(arrow::Decimal128(data.Hi_, data.Low_)).ok()); + if constexpr (std::is_same::value) { + if constexpr (std::is_same::value) { + char bytes[16] = {0}; + // Little-endian: Low_ (unsigned) в первые 8 байт, Hi_ (signed) в последние 8 байт + std::memcpy(bytes, &data.Low_, 8); + std::memcpy(bytes + 8, &data.Hi_, 8); + Y_ABORT_UNLESS(typedBuilder.Append(bytes).ok()); return true; } } diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 4f419a63e434..7a8ec5d742de 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -519,7 +519,9 @@ class TUploadRowsBase : public TActorBootstrapped()); case arrow::Type::DECIMAL: - return f(TTypeWrapper()); + return f(TTypeWrapper()); case arrow::Type::DURATION: return f(TTypeWrapper()); case arrow::Type::LARGE_STRING: diff --git a/ydb/tests/functional/tpc/medium/test_tpch.py b/ydb/tests/functional/tpc/medium/test_tpch.py index 4dd5f0bf0e9b..1c9ab637973d 100644 --- a/ydb/tests/functional/tpc/medium/test_tpch.py +++ b/ydb/tests/functional/tpc/medium/test_tpch.py @@ -5,9 +5,23 @@ class TestTpchS1(tpch.TestTpch1, FunctionalTestBase): iterations: int = 1 + @classmethod + def addition_init_params(cls) -> list[str]: + if cls.float_mode: + return ['--float-mode', cls.float_mode] + return [] + @classmethod def setup_class(cls) -> None: cls.setup_cluster() - cls.run_cli(['workload', 'tpch', '-p', 'olap_yatests/tpch/s1', 'init', '--store=column']) + cls.run_cli(['workload', 'tpch', '-p', 'olap_yatests/tpch/s1', 'init', '--store=row'] + cls.addition_init_params()) cls.run_cli(['workload', 'tpch', '-p', 'olap_yatests/tpch/s1', 'import', 'generator', '--scale=1']) tpch.TestTpch1.setup_class() + + +class TestTpchS1Decimal_22_9(TestTpchS1): + float_mode = 'decimal_ydb' + + +class TestTpchS1DecimalNative(TestTpchS1): + float_mode = 'decimal' \ No newline at end of file diff --git a/ydb/tests/olap/lib/ydb_cli.py b/ydb/tests/olap/lib/ydb_cli.py index 353d25f07419..ab842c02fb4d 100644 --- a/ydb/tests/olap/lib/ydb_cli.py +++ b/ydb/tests/olap/lib/ydb_cli.py @@ -152,7 +152,9 @@ def __init__(self, scale: Optional[int], query_prefix: Optional[str], external_path: str, - threads: int): + threads: int, + float_mode: str, + ): self.result = YdbCliHelper.WorkloadRunResult() self.iterations = iterations self.check_canonical = check_canonical @@ -172,6 +174,7 @@ def __init__(self, self.__plan_path = f'{self.__prefix}.plan' self.__query_output_path = f'{self.__prefix}.result' self.json_path = f'{self.__prefix}.stats.json' + self.float_mode = float_mode def get_plan_path(self, query_name: str, plan_name: Any) -> str: return f'{self.__plan_path}.{query_name}.{plan_name}' @@ -204,6 +207,8 @@ def __get_cmd(self) -> list[str]: cmd += ['--scale', str(self.scale)] if self.threads > 0: cmd += ['--threads', str(self.threads)] + if self.float_mode: + cmd += ['--float-mode', self.float_mode] return cmd def run(self) -> YdbCliHelper.WorkloadRunResult: @@ -343,7 +348,7 @@ def __process(self) -> YdbCliHelper.WorkloadRunResult: @staticmethod def workload_run(workload_type: WorkloadType, path: str, query_names: list[str], iterations: int = 5, timeout: float = 100., check_canonical: CheckCanonicalPolicy = CheckCanonicalPolicy.NO, query_syntax: str = '', - scale: Optional[int] = None, query_prefix=None, external_path='', threads: int = 0) -> dict[str, YdbCliHelper.WorkloadRunResult]: + scale: Optional[int] = None, query_prefix=None, external_path='', threads: int = 0, float_mode: str = '') -> dict[str, YdbCliHelper.WorkloadRunResult]: runner = YdbCliHelper.WorkloadRunner( workload_type, path, @@ -355,7 +360,8 @@ def workload_run(workload_type: WorkloadType, path: str, query_names: list[str], scale, query_prefix=query_prefix, external_path=external_path, - threads=threads + threads=threads, + float_mode=float_mode, ) extended_query_names = query_names + ["Sum", "Avg", "GAvg"] if runner.run(): diff --git a/ydb/tests/olap/load/lib/conftest.py b/ydb/tests/olap/load/lib/conftest.py index 21f838cdbd91..b3dc07caf80b 100644 --- a/ydb/tests/olap/load/lib/conftest.py +++ b/ydb/tests/olap/load/lib/conftest.py @@ -38,6 +38,7 @@ def __init__(self, iterations: Optional[int] = None, timeout: Optional[float] = scale: Optional[int] = None query_prefix: str = get_external_param('query-prefix', '') verify_data: bool = True + float_mode: str = '' __nodes_state: Optional[dict[str, YdbCluster.Node]] = None @classmethod @@ -465,6 +466,7 @@ def run_workload_test(self, path: str, query_num: Optional[int] = None, query_na scale=self.scale, query_prefix=qparams.query_prefix, external_path=self.get_external_path(), + float_mode=self.float_mode, )[query_name] self.process_query_result(result, query_name, True) @@ -827,7 +829,8 @@ def do_setup_class(cls): scale=cls.scale, query_prefix=qparams.query_prefix, external_path=cls.get_external_path(), - threads=cls.threads + threads=cls.threads, + float_mode=cls.float_mode, ) def test(self, query_name):