Skip to content

test commit #20685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions ydb/core/formats/arrow/arrow_batch_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,26 @@ arrow::Status AppendCell(arrow::StringBuilder& builder, const TCell& cell) {
return builder.Append(cell.Data(), cell.Size());
}

arrow::Status AppendCell(arrow::Decimal128Builder& builder, const TCell& cell) {
// arrow::Status AppendCell(arrow::Decimal128Builder& builder, const TCell& cell) {
// if (cell.IsNull()) {
// return builder.AppendNull();
// }

// /// @warning There's no conversion for special YQL Decimal valies here,
// /// so we could convert them to Arrow and back but cannot calculate anything on them.
// /// We need separate Arrow.Decimal, YQL.Decimal, CH.Decimal and YDB.Decimal in future.
// return builder.Append(cell.Data());
// }

arrow::Status AppendCell(arrow::FixedSizeBinaryBuilder& builder, const TCell& cell) {
std::cout << "[DEBUG] AppendCell<FixedSizeBinaryBuilder>: is_null=" << cell.IsNull() << " size=" << cell.Size() << std::endl;
if (cell.IsNull()) {
return builder.AppendNull();
}

/// @warning There's no conversion for special YQL Decimal valies here,
/// so we could convert them to Arrow and back but cannot calculate anything on them.
/// We need separate Arrow.Decimal, YQL.Decimal, CH.Decimal and YDB.Decimal in future.
return builder.Append(cell.Data());
if (cell.Size() != 16) {
return arrow::Status::Invalid("Decimal cell must be 16 bytes, got ", cell.Size());
}
return builder.Append(reinterpret_cast<const uint8_t*>(cell.Data()));
}

template <typename TDataType>
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ std::shared_ptr<arrow::DataType> CreateEmptyArrowImpl(const NScheme::TTypeInfo&
}

template <>
std::shared_ptr<arrow::DataType> CreateEmptyArrowImpl<arrow::Decimal128Type>(const NScheme::TTypeInfo& typeInfo) {
return arrow::decimal(typeInfo.GetDecimalType().GetPrecision(), typeInfo.GetDecimalType().GetScale());
std::shared_ptr<arrow::DataType> CreateEmptyArrowImpl<arrow::FixedSizeBinaryType>(const NScheme::TTypeInfo& typeInfo) {
Y_UNUSED(typeInfo);
return std::make_shared<arrow::FixedSizeBinaryType>(16);
}

template <>
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/formats/arrow/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ class TArrowToYdbConverter {
template <typename TArray>
TCell MakeCellFromView(const std::shared_ptr<arrow::Array>& column, i64 row) {
auto array = std::static_pointer_cast<TArray>(column);
if (array->IsNull(row)) {
std::cout << "[DEBUG] MakeCellFromView: row=" << row << " is_null=1 size=0" << std::endl;
return TCell();
}
auto data = array->GetView(row);
std::cout << "[DEBUG] MakeCellFromView: row=" << row << " is_null=0 size=" << data.size() << std::endl;
return TCell(data.data(), data.size());
}

Expand All @@ -57,6 +62,18 @@ class TArrowToYdbConverter {
return MakeCellFromView<arrow::Decimal128Array>(column, row);
}

template <>
TCell MakeCell<arrow::FixedSizeBinaryArray>(const std::shared_ptr<arrow::Array>& column, i64 row) {
auto array = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(column);
if (array->IsNull(row)) {
std::cout << "[DEBUG] MakeCell<FixedSizeBinaryArray>: row=" << row << " is_null=1 size=0" << std::endl;
return TCell();
}
auto data = array->GetView(row);
std::cout << "[DEBUG] MakeCell<FixedSizeBinaryArray>: row=" << row << " is_null=0 size=" << data.size() << std::endl;
return TCell(data.data(), data.size());
}

public:
static bool NeedDataConversion(const NScheme::TTypeInfo& colType);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/switch/switch_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ template <typename TFunc>
case NScheme::NTypeIds::Interval:
return callback(TTypeWrapper<arrow::DurationType>());
case NScheme::NTypeIds::Decimal:
return callback(TTypeWrapper<arrow::Decimal128Type>());
return callback(TTypeWrapper<arrow::FixedSizeBinaryType>());

case NScheme::NTypeIds::Datetime64:
case NScheme::NTypeIds::Timestamp64:
Expand Down
27 changes: 24 additions & 3 deletions ydb/core/grpc_services/rpc_load_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <util/string/vector.h>
#include <util/generic/size_literals.h>
#include <util/string/cast.h>

namespace NKikimr {
namespace NGRpcService {
Expand All @@ -28,7 +29,7 @@ using namespace Ydb;
namespace {

// TODO: no mapping for DATE, DATETIME, TZ_*, YSON, JSON, UUID, JSON_DOCUMENT, DYNUMBER
bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType) {
bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType, const arrow::Field* field = nullptr) {
switch (type.id()) {
case arrow::Type::BOOL:
toType.set_type_id(Ydb::Type::BOOL);
Expand Down Expand Up @@ -82,9 +83,29 @@ bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType)
decimalType->set_scale(arrowDecimal->scale());
return true;
}
case arrow::Type::FIXED_SIZE_BINARY: {
auto& fsb = static_cast<const arrow::FixedSizeBinaryType&>(type);
if (fsb.byte_width() == 16) {
Ydb::DecimalType* decimalType = toType.mutable_decimal_type();
ui32 precision = 22, scale = 9;
if (field && field->metadata()) {
auto precisionMeta = field->metadata()->Get("precision").ValueOr("");
auto scaleMeta = field->metadata()->Get("scale").ValueOr("");
if (!precisionMeta.empty()) {
precision = FromString<ui32>(precisionMeta);
}
if (!scaleMeta.empty()) {
scale = FromString<ui32>(scaleMeta);
}
}
decimalType->set_precision(precision);
decimalType->set_scale(scale);
return true;
}
break;
}
case arrow::Type::NA:
case arrow::Type::HALF_FLOAT:
case arrow::Type::FIXED_SIZE_BINARY:
case arrow::Type::DATE32:
case arrow::Type::DATE64:
case arrow::Type::TIME32:
Expand Down Expand Up @@ -410,7 +431,7 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices
auto& type = field->type();

Ydb::Type ydbType;
if (!ConvertArrowToYdbPrimitive(*type, ydbType)) {
if (!ConvertArrowToYdbPrimitive(*type, ydbType, field.get())) {
return TConclusionStatus::Fail("Cannot convert arrow type to ydb one: " + type->ToString());
}
out.emplace_back(name, std::move(ydbType));
Expand Down
20 changes: 9 additions & 11 deletions ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA
return node;
}

Y_UNUSED(typesCtx);

auto rowType = node.Ref().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TStructExprType>();

TVector<TCoArgument> args;
Expand All @@ -41,18 +43,14 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA
} else if (auto maybeRead = node.Maybe<TKqpReadTableRanges>()) {
wideRead = ctx.RenameNode(*node.Raw(), TKqpWideReadTableRanges::CallableName());
} else if (auto maybeRead = node.Maybe<TKqpReadOlapTableRanges>()) {
if (typesCtx.IsBlockEngineEnabled()) {
wideRead = Build<TCoToFlow>(ctx, node.Pos())
.Input<TCoWideFromBlocks>()
.Input<TCoFromFlow>()
.Input(ctx.RenameNode(*node.Raw(), TKqpBlockReadOlapTableRanges::CallableName()))
.Build()
wideRead = Build<TCoToFlow>(ctx, node.Pos())
.Input<TCoWideFromBlocks>()
.Input<TCoFromFlow>()
.Input(ctx.RenameNode(*node.Raw(), TKqpBlockReadOlapTableRanges::CallableName()))
.Build()
.Done()
.Ptr();
} else {
wideRead = ctx.RenameNode(*node.Raw(), TKqpWideReadOlapTableRanges::CallableName());
}
.Build()
.Done()
.Ptr();
} else {
YQL_ENSURE(false, "Unknown read table operation: " << node.Ptr()->Content());
}
Expand Down
13 changes: 11 additions & 2 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "columnshard.h"

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/formats/arrow/converter.h>
#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/formats/arrow/serializer/parsing.h>
#include <ydb/core/testlib/cs_helper.h>
Expand Down Expand Up @@ -390,8 +391,16 @@ namespace NKqp {
return arrow::field(name, arrow::int64(), nullable);
case NScheme::NTypeIds::JsonDocument:
return arrow::field(name, arrow::binary(), nullable);
case NScheme::NTypeIds::Decimal:
return arrow::field(name, arrow::decimal(typeInfo.GetDecimalType().GetPrecision(), typeInfo.GetDecimalType().GetScale()), nullable);
case NScheme::NTypeIds::Decimal: {
auto meta = arrow::KeyValueMetadata::Make(
std::vector<std::string>{"precision", "scale"},
std::vector<std::string>{
ToString(typeInfo.GetDecimalType().GetPrecision()),
ToString(typeInfo.GetDecimalType().GetScale())
}
);
return arrow::field(name, arrow::fixed_size_binary(16), nullable, meta);
}
case NScheme::NTypeIds::Pg:
switch (NPg::PgTypeIdFromTypeDesc(typeInfo.GetPgTypeDesc())) {
case INT2OID:
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/ut/olap/decimal_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ Y_UNIT_TEST_SUITE(KqpDecimalColumnShard) {
tester35.PrepareTable1();

auto check = [](const TDecimalTestCase& tester) {
tester.CheckQuery("SELECT min(dec) FROM `/Root/Table1`", "[[[\"3.14\"]]]");
tester.CheckQuery("SELECT max(dec) FROM `/Root/Table1`", "[[[\"12.46\"]]]");
tester.CheckQuery("SELECT sum(dec) FROM `/Root/Table1`", "[[[\"32.252\"]]]");
tester.CheckQuery("SELECT min(dec) FROM `/Root/Table1`", "[[[\"3.14\"]]]", EQueryMode::EXECUTE_QUERY);
tester.CheckQuery("SELECT max(dec) FROM `/Root/Table1`", "[[[\"12.46\"]]]", EQueryMode::EXECUTE_QUERY);
tester.CheckQuery("SELECT sum(dec) FROM `/Root/Table1`", "[[[\"32.252\"]]]", EQueryMode::EXECUTE_QUERY);
};

check(tester22);
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,13 @@ class TTableUpdatesBuilder {
}
}

if constexpr (std::is_same<TData, NYdb::TDecimalValue>::value) {
if constexpr (arrow::is_decimal128_type<T>::value) {
Y_ABORT_UNLESS(typedBuilder.Append(arrow::Decimal128(data.Hi_, data.Low_)).ok());
if constexpr (std::is_same<TData, NYdb::Dev::TDecimalValue>::value) {
if constexpr (std::is_same<T, arrow::FixedSizeBinaryType>::value) {
char bytes[16] = {0};
// Little-endian: Low_ (unsigned) в первые 8 байт, Hi_ (signed) в последние 8 байт
std::memcpy(bytes, &data.Low_, 8);
std::memcpy(bytes + 8, &data.Hi_, 8);
Y_ABORT_UNLESS(typedBuilder.Append(bytes).ok());
return true;
}
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,9 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit

for (const auto& [colName, colType] : YdbSchema) {
if (HasInternalConversion.contains(colName)) {
continue;
if (colType.GetTypeId() != NScheme::NTypeIds::Decimal) {
continue;
}
}
if (NArrow::TArrowToYdbConverter::NeedDataConversion(colType)) {
ColumnsToConvert[colName] = colType;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/formats/arrow/switch/switch_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ TResult SwitchTypeImpl(arrow::Type::type typeId, TFunc&& f) {
case arrow::Type::INTERVAL_MONTHS:
return f(TTypeWrapper<arrow::MonthIntervalType>());
case arrow::Type::DECIMAL:
return f(TTypeWrapper<arrow::Decimal128Type>());
return f(TTypeWrapper<arrow::FixedSizeBinaryType>());
case arrow::Type::DURATION:
return f(TTypeWrapper<arrow::DurationType>());
case arrow::Type::LARGE_STRING:
Expand Down
16 changes: 15 additions & 1 deletion ydb/tests/functional/tpc/medium/test_tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,23 @@
class TestTpchS1(tpch.TestTpch1, FunctionalTestBase):
iterations: int = 1

@classmethod
def addition_init_params(cls) -> list[str]:
if cls.float_mode:
return ['--float-mode', cls.float_mode]
return []

@classmethod
def setup_class(cls) -> None:
cls.setup_cluster()
cls.run_cli(['workload', 'tpch', '-p', 'olap_yatests/tpch/s1', 'init', '--store=column'])
cls.run_cli(['workload', 'tpch', '-p', 'olap_yatests/tpch/s1', 'init', '--store=row'] + cls.addition_init_params())
cls.run_cli(['workload', 'tpch', '-p', 'olap_yatests/tpch/s1', 'import', 'generator', '--scale=1'])
tpch.TestTpch1.setup_class()


class TestTpchS1Decimal_22_9(TestTpchS1):
float_mode = 'decimal_ydb'


class TestTpchS1DecimalNative(TestTpchS1):
float_mode = 'decimal'
12 changes: 9 additions & 3 deletions ydb/tests/olap/lib/ydb_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ def __init__(self,
scale: Optional[int],
query_prefix: Optional[str],
external_path: str,
threads: int):
threads: int,
float_mode: str,
):
self.result = YdbCliHelper.WorkloadRunResult()
self.iterations = iterations
self.check_canonical = check_canonical
Expand All @@ -172,6 +174,7 @@ def __init__(self,
self.__plan_path = f'{self.__prefix}.plan'
self.__query_output_path = f'{self.__prefix}.result'
self.json_path = f'{self.__prefix}.stats.json'
self.float_mode = float_mode

def get_plan_path(self, query_name: str, plan_name: Any) -> str:
return f'{self.__plan_path}.{query_name}.{plan_name}'
Expand Down Expand Up @@ -204,6 +207,8 @@ def __get_cmd(self) -> list[str]:
cmd += ['--scale', str(self.scale)]
if self.threads > 0:
cmd += ['--threads', str(self.threads)]
if self.float_mode:
cmd += ['--float-mode', self.float_mode]
return cmd

def run(self) -> YdbCliHelper.WorkloadRunResult:
Expand Down Expand Up @@ -343,7 +348,7 @@ def __process(self) -> YdbCliHelper.WorkloadRunResult:
@staticmethod
def workload_run(workload_type: WorkloadType, path: str, query_names: list[str], iterations: int = 5,
timeout: float = 100., check_canonical: CheckCanonicalPolicy = CheckCanonicalPolicy.NO, query_syntax: str = '',
scale: Optional[int] = None, query_prefix=None, external_path='', threads: int = 0) -> dict[str, YdbCliHelper.WorkloadRunResult]:
scale: Optional[int] = None, query_prefix=None, external_path='', threads: int = 0, float_mode: str = '') -> dict[str, YdbCliHelper.WorkloadRunResult]:
runner = YdbCliHelper.WorkloadRunner(
workload_type,
path,
Expand All @@ -355,7 +360,8 @@ def workload_run(workload_type: WorkloadType, path: str, query_names: list[str],
scale,
query_prefix=query_prefix,
external_path=external_path,
threads=threads
threads=threads,
float_mode=float_mode,
)
extended_query_names = query_names + ["Sum", "Avg", "GAvg"]
if runner.run():
Expand Down
5 changes: 4 additions & 1 deletion ydb/tests/olap/load/lib/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, iterations: Optional[int] = None, timeout: Optional[float] =
scale: Optional[int] = None
query_prefix: str = get_external_param('query-prefix', '')
verify_data: bool = True
float_mode: str = ''
__nodes_state: Optional[dict[str, YdbCluster.Node]] = None

@classmethod
Expand Down Expand Up @@ -465,6 +466,7 @@ def run_workload_test(self, path: str, query_num: Optional[int] = None, query_na
scale=self.scale,
query_prefix=qparams.query_prefix,
external_path=self.get_external_path(),
float_mode=self.float_mode,
)[query_name]
self.process_query_result(result, query_name, True)

Expand Down Expand Up @@ -827,7 +829,8 @@ def do_setup_class(cls):
scale=cls.scale,
query_prefix=qparams.query_prefix,
external_path=cls.get_external_path(),
threads=cls.threads
threads=cls.threads,
float_mode=cls.float_mode,
)

def test(self, query_name):
Expand Down
Loading