diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index 3826ee95..b840e638 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -71,6 +71,9 @@ function(resolve_arrow_dependency) set(ARROW_FILESYSTEM ON CACHE BOOL "" FORCE) + set(ARROW_JSON + ON + CACHE BOOL "" FORCE) set(ARROW_PARQUET ON CACHE BOOL "" FORCE) @@ -95,8 +98,8 @@ function(resolve_arrow_dependency) fetchcontent_declare(VendoredArrow ${FC_DECLARE_COMMON_OPTIONS} - GIT_REPOSITORY https://github.com/wgtmac/arrow.git - GIT_TAG 7d50c4ac803ad983734de5f418b7cd18f25b0dc9 + GIT_REPOSITORY https://github.com/apache/arrow.git + GIT_TAG 5f0aeb5de53fb25b59a52661a80071faef99a4a4 #URL ${ARROW_SOURCE_URL} #URL_HASH "SHA256=${ICEBERG_ARROW_BUILD_SHA256_CHECKSUM}" SOURCE_SUBDIR diff --git a/src/iceberg/avro/avro_data_util.cc b/src/iceberg/avro/avro_data_util.cc index 48ac7e67..90c480b9 100644 --- a/src/iceberg/avro/avro_data_util.cc +++ b/src/iceberg/avro/avro_data_util.cc @@ -17,16 +17,383 @@ * under the License. */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_error_transform_internal.h" #include "iceberg/avro/avro_data_util_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/schema_util.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" namespace iceberg::avro { +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder); + +/// \brief Append Avro record data to Arrow struct builder. +Status AppendStructToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const std::span& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + const auto& avro_record = avro_datum.value<::avro::GenericRecord>(); + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + for (size_t i = 0; i < projections.size(); ++i) { + const auto& field_projection = projections[i]; + const auto& expected_field = struct_type.fields()[i]; + auto* field_builder = struct_builder->field_builder(static_cast(i)); + + if (field_projection.kind == FieldProjection::Kind::kProjected) { + size_t avro_field_index = + std::get(field_projection.from); + if (avro_field_index >= avro_record.fieldCount()) { + return InvalidArgument("Avro field index {} out of bound {}", avro_field_index, + avro_record.fieldCount()); + } + + const auto& avro_field_node = avro_node->leafAt(avro_field_index); + const auto& avro_field_datum = avro_record.fieldAt(avro_field_index); + ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder(avro_field_node, avro_field_datum, + field_projection, expected_field, + field_builder)); + } else if (field_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } else { + return NotImplemented("Unsupported field projection kind: {}", + ToString(field_projection.kind)); + } + } + return {}; +} + +/// \brief Append Avro array data to Arrow list builder. +Status AppendListToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const FieldProjection& projection, const ListType& list_type, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node)); + } + const auto& avro_array = avro_datum.value<::avro::GenericArray>(); + + auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append( + /*is_valid=*/true, /*length=*/static_cast(avro_array.value().size()))); + + const auto& element_projection = projection.children[0]; + auto* value_builder = list_builder->value_builder(); + const auto& element_node = avro_node->leafAt(0); + const auto& element_field = list_type.fields().back(); + + for (const auto& element : avro_array.value()) { + ICEBERG_RETURN_UNEXPECTED(AppendFieldToBuilder( + element_node, element, element_projection, element_field, value_builder)); + } + return {}; +} + +/// \brief Append Avro map data to Arrow map builder. +Status AppendMapToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const FieldProjection& key_projection, + const FieldProjection& value_projection, + const MapType& map_type, ::arrow::ArrayBuilder* array_builder) { + // TODO(gangwu): support both regular map and array-based map. + return NotImplemented("AppendMapToBuilder is not implemented"); +} + +/// \brief Append nested Avro data to Arrow array builder based on type. +Status AppendNestedValueToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const std::span& projections, + const NestedType& projected_type, + ::arrow::ArrayBuilder* array_builder) { + switch (projected_type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast(projected_type); + return AppendStructToBuilder(avro_node, avro_datum, projections, struct_type, + array_builder); + } + + case TypeId::kList: { + if (projections.size() != 1) { + return InvalidArgument("Expected 1 projection for list, got: {}", + projections.size()); + } + const auto& list_type = internal::checked_cast(projected_type); + return AppendListToBuilder(avro_node, avro_datum, projections[0], list_type, + array_builder); + } + + case TypeId::kMap: { + if (projections.size() != 2) { + return InvalidArgument("Expected 2 projections for map, got: {}", + projections.size()); + } + const auto& map_type = internal::checked_cast(projected_type); + return AppendMapToBuilder(avro_node, avro_datum, projections[0], projections[1], + map_type, array_builder); + } + + default: + return InvalidArgument("Unsupported nested type: {}", projected_type.ToString()); + } +} + +Status AppendPrimitiveValueToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder) { + const auto& projected_type = *projected_field.type(); + if (!projected_type.is_primitive()) { + return InvalidArgument("Expected primitive type, got: {}", projected_type.ToString()); + } + + switch (projected_type.type_id()) { + case TypeId::kBoolean: { + if (avro_node->type() != ::avro::AVRO_BOOL) { + return InvalidArgument("Expected Avro boolean for boolean field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BooleanBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value())); + return {}; + } + + case TypeId::kInt: { + if (avro_node->type() != ::avro::AVRO_INT) { + return InvalidArgument("Expected Avro int for int field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Int32Builder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value())); + return {}; + } + + case TypeId::kLong: { + auto* builder = internal::checked_cast<::arrow::Int64Builder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_LONG) { + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value())); + } else if (avro_node->type() == ::avro::AVRO_INT) { + ICEBERG_ARROW_RETURN_NOT_OK( + builder->Append(static_cast(avro_datum.value()))); + } else { + return InvalidArgument("Expected Avro int/long for long field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kFloat: { + if (avro_node->type() != ::avro::AVRO_FLOAT) { + return InvalidArgument("Expected Avro float for float field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::FloatBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value())); + return {}; + } + + case TypeId::kDouble: { + auto* builder = internal::checked_cast<::arrow::DoubleBuilder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_DOUBLE) { + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value())); + } else if (avro_node->type() == ::avro::AVRO_FLOAT) { + ICEBERG_ARROW_RETURN_NOT_OK( + builder->Append(static_cast(avro_datum.value()))); + } else { + return InvalidArgument("Expected Avro float/double for double field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kString: { + if (avro_node->type() != ::avro::AVRO_STRING) { + return InvalidArgument("Expected Avro string for string field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::StringBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value())); + return {}; + } + + case TypeId::kBinary: { + if (avro_node->type() != ::avro::AVRO_BYTES) { + return InvalidArgument("Expected Avro bytes for binary field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BinaryBuilder*>(array_builder); + const auto& bytes = avro_datum.value>(); + ICEBERG_ARROW_RETURN_NOT_OK( + builder->Append(bytes.data(), static_cast(bytes.size()))); + return {}; + } + + case TypeId::kFixed: { + if (avro_node->type() != ::avro::AVRO_FIXED) { + return InvalidArgument("Expected Avro fixed for fixed field, got: {}", + ToString(avro_node)); + } + const auto& fixed = avro_datum.value<::avro::GenericFixed>(); + const auto& fixed_type = internal::checked_cast(projected_type); + + if (static_cast(fixed.value().size()) != fixed_type.length()) { + return InvalidArgument("Expected Avro fixed[{}], got: {}", fixed_type.length(), + ToString(avro_node)); + } + + auto* builder = + internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); + const auto& value = fixed.value(); + ICEBERG_ARROW_RETURN_NOT_OK( + builder->Append(reinterpret_cast(value.data()))); + return {}; + } + + case TypeId::kUuid: { + if (avro_node->type() != ::avro::AVRO_FIXED || + avro_node->logicalType().type() != ::avro::LogicalType::UUID) { + return InvalidArgument("Expected Avro fixed for uuid field, got: {}", + ToString(avro_node)); + } + + auto* builder = + internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); + const auto& fixed = avro_datum.value<::avro::GenericFixed>(); + if (fixed.value().size() != 16) { + return InvalidArgument("Expected UUID fixed length 16, got: {}", + fixed.value().size()); + } + const auto& value = fixed.value(); + ICEBERG_ARROW_RETURN_NOT_OK( + builder->Append(reinterpret_cast(value.data()))); + return {}; + } + + case TypeId::kDecimal: { + if (avro_node->type() != ::avro::AVRO_FIXED || + avro_node->logicalType().type() != ::avro::LogicalType::DECIMAL) { + return InvalidArgument( + "Expected Avro fixed with decimal logical type for decimal field, got: {}", + ToString(avro_node)); + } + + const auto& fixed = avro_datum.value<::avro::GenericFixed>(); + const auto& value = fixed.value(); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto decimal, ::arrow::Decimal128::FromBigEndian(value.data(), value.size())); + auto* builder = internal::checked_cast<::arrow::Decimal128Builder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(decimal)); + return {}; + } + + case TypeId::kDate: { + if (avro_node->type() != ::avro::AVRO_INT || + avro_node->logicalType().type() != ::avro::LogicalType::DATE) { + return InvalidArgument( + "Expected Avro int with DATE logical type for date field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Date32Builder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value())); + return {}; + } + + case TypeId::kTime: { + if (avro_node->type() != ::avro::AVRO_LONG || + avro_node->logicalType().type() != ::avro::LogicalType::TIME_MICROS) { + return InvalidArgument( + "Expected Avro long with TIME_MICROS for time field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Time64Builder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value())); + return {}; + } + + case TypeId::kTimestamp: + case TypeId::kTimestampTz: { + if (avro_node->type() != ::avro::AVRO_LONG || + avro_node->logicalType().type() != ::avro::LogicalType::TIMESTAMP_MICROS) { + return InvalidArgument( + "Expected Avro long with TIMESTAMP_MICROS for timestamp field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::TimestampBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value())); + return {}; + } + + default: + return InvalidArgument("Unsupported primitive type {} to append avro node {}", + projected_field.type()->ToString(), ToString(avro_node)); + } +} + +/// \brief Dispatch to appropriate handlers based on the projection kind. +Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, + const ::avro::GenericDatum& avro_datum, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder) { + if (avro_node->type() == ::avro::AVRO_UNION) { + const auto& union_datum = avro_datum.value<::avro::GenericUnion>(); + size_t branch = union_datum.currentBranch(); + if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) { + ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); + return {}; + } else { + return AppendFieldToBuilder(avro_node->leafAt(branch), union_datum.datum(), + projection, projected_field, array_builder); + } + } + + const auto& projected_type = *projected_field.type(); + if (projected_type.is_primitive()) { + return AppendPrimitiveValueToBuilder(avro_node, avro_datum, projected_field, + array_builder); + } else { + const auto& nested_type = internal::checked_cast(projected_type); + return AppendNestedValueToBuilder(avro_node, avro_datum, projection.children, + nested_type, array_builder); + } +} + +} // namespace + Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node, const ::avro::GenericDatum& avro_datum, const SchemaProjection& projection, - const Schema& arrow_schema, + const Schema& projected_schema, ::arrow::ArrayBuilder* array_builder) { - return NotImplemented("AppendDatumToBuilder is not yet implemented"); + return AppendNestedValueToBuilder(avro_node, avro_datum, projection.fields, + projected_schema, array_builder); } } // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_data_util_internal.h b/src/iceberg/avro/avro_data_util_internal.h index 4b96483e..ad493688 100644 --- a/src/iceberg/avro/avro_data_util_internal.h +++ b/src/iceberg/avro/avro_data_util_internal.h @@ -26,10 +26,21 @@ namespace iceberg::avro { +/// \brief Append an Avro datum to an Arrow array builder. +/// +/// This function handles schema evolution by using the provided projection to map +/// fields from the Avro data to the expected Arrow schema. +/// +/// \param avro_node The Avro schema node (must be a record at root level) +/// \param avro_datum The Avro data to append +/// \param projection Schema projection from `projected_schema` to `avro_node` +/// \param projected_schema The projected schema +/// \param array_builder The Arrow array builder to append to (must be a struct builder) +/// \return Status indicating success or failure Status AppendDatumToBuilder(const ::avro::NodePtr& avro_node, const ::avro::GenericDatum& avro_datum, const SchemaProjection& projection, - const Schema& arrow_schema, + const Schema& projected_schema, ::arrow::ArrayBuilder* array_builder); } // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 229c62b4..f63de37c 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -73,6 +73,22 @@ ::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) { } // namespace +std::string ToString(const ::avro::NodePtr& node) { + std::stringstream ss; + ss << *node; + return ss.str(); +} + +std::string ToString(const ::avro::LogicalType& logical_type) { + std::stringstream ss; + logical_type.printJson(ss); + return ss.str(); +} + +std::string ToString(const ::avro::LogicalType::Type& logical_type) { + return ToString(::avro::LogicalType(logical_type)); +} + Status ToAvroNodeVisitor::Visit(const BooleanType& type, ::avro::NodePtr* node) { *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BOOL); return {}; @@ -383,22 +399,6 @@ Status HasIdVisitor::Visit(const ::avro::Schema& schema) { return Visit(schema.r namespace { -std::string ToString(const ::avro::NodePtr& node) { - std::stringstream ss; - ss << *node; - return ss.str(); -} - -std::string ToString(const ::avro::LogicalType& logical_type) { - std::stringstream ss; - logical_type.printJson(ss); - return ss.str(); -} - -std::string ToString(const ::avro::LogicalType::Type& logical_type) { - return ToString(::avro::LogicalType(logical_type)); -} - bool HasLogicalType(const ::avro::NodePtr& node, ::avro::LogicalType::Type expected_type) { return node->logicalType().type() == expected_type; @@ -501,7 +501,7 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type, case TypeId::kTimestamp: if (avro_node->type() == ::avro::AVRO_LONG && HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) && - GetAdjustToUtc(avro_node).value_or("false") == "true") { + GetAdjustToUtc(avro_node).value_or("false") == "false") { return {}; } break; diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index 50ff9b23..05de8009 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -135,4 +135,8 @@ class HasIdVisitor { Result Project(const Schema& expected_schema, const ::avro::NodePtr& avro_node, bool prune_source); +std::string ToString(const ::avro::NodePtr& node); +std::string ToString(const ::avro::LogicalType& logical_type); +std::string ToString(const ::avro::LogicalType::Type& logical_type); + } // namespace iceberg::avro diff --git a/src/iceberg/schema_util_internal.h b/src/iceberg/schema_util_internal.h index 33aad93a..e0faf4f6 100644 --- a/src/iceberg/schema_util_internal.h +++ b/src/iceberg/schema_util_internal.h @@ -26,7 +26,7 @@ namespace iceberg { // Fix `from` field of `FieldProjection` to use pruned field index. -void PruneFieldProjection(FieldProjection& field_projection) { +inline void PruneFieldProjection(FieldProjection& field_projection) { std::map local_index_to_pruned_index; for (const auto& child_projection : field_projection.children) { if (child_projection.kind == FieldProjection::Kind::kProjected) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 863bb09e..53569c5c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -71,7 +71,8 @@ add_test(NAME util_test COMMAND util_test) if(ICEBERG_BUILD_BUNDLE) add_executable(avro_test) - target_sources(avro_test PRIVATE avro_test.cc avro_schema_test.cc avro_stream_test.cc) + target_sources(avro_test PRIVATE avro_data_test.cc avro_test.cc avro_schema_test.cc + avro_stream_test.cc) target_link_libraries(avro_test PRIVATE iceberg_bundle_static GTest::gtest_main GTest::gmock) add_test(NAME avro_test COMMAND avro_test) diff --git a/test/avro_data_test.cc b/test/avro_data_test.cc new file mode 100644 index 00000000..1cd6459c --- /dev/null +++ b/test/avro_data_test.cc @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "matchers.h" + +namespace iceberg::avro { + +/// \brief Test case structure for parameterized primitive type tests +struct AppendDatumParam { + std::string name; + std::shared_ptr projected_type; + std::shared_ptr source_type; + std::function value_setter; + std::string expected_json; +}; + +/// \brief Helper function to create test data for a primitive type +std::vector<::avro::GenericDatum> CreateTestData( + const ::avro::NodePtr& avro_node, + const std::function& value_setter, int count = 3) { + std::vector<::avro::GenericDatum> avro_data; + for (int i = 0; i < count; ++i) { + ::avro::GenericDatum avro_datum(avro_node); + value_setter(avro_datum, i); + avro_data.push_back(avro_datum); + } + return avro_data; +} + +/// \brief Utility function to verify AppendDatumToBuilder behavior +void VerifyAppendDatumToBuilder(const Schema& projected_schema, + const ::avro::NodePtr& avro_node, + const std::vector<::avro::GenericDatum>& avro_data, + std::string_view expected_array_json) { + // Create 1 to 1 projection + auto projection_result = Project(projected_schema, avro_node, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + auto projection = std::move(projection_result.value()); + + // Create arrow schema and array builder + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(projected_schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); + auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); + auto builder = ::arrow::MakeBuilder(arrow_struct_type).ValueOrDie(); + + // Call AppendDatumToBuilder repeatedly to append the datum + for (const auto& avro_datum : avro_data) { + ASSERT_THAT(AppendDatumToBuilder(avro_node, avro_datum, projection, projected_schema, + builder.get()), + IsOk()); + } + + // Verify the result + auto array = builder->Finish().ValueOrDie(); + auto expected_array = + ::arrow::json::ArrayFromJSONString(arrow_struct_type, expected_array_json) + .ValueOrDie(); + ASSERT_TRUE(array->Equals(*expected_array)); +} + +/// \brief Test class for primitive types using parameterized tests +class AppendDatumToBuilderTest : public ::testing::TestWithParam {}; + +TEST_P(AppendDatumToBuilderTest, PrimitiveType) { + const auto& test_case = GetParam(); + + Schema projected_schema({SchemaField::MakeRequired( + /*field_id=*/1, /*name=*/"a", test_case.projected_type)}); + Schema source_schema({SchemaField::MakeRequired( + /*field_id=*/1, /*name=*/"a", test_case.source_type)}); + + ::avro::NodePtr avro_node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(source_schema, &avro_node), IsOk()); + + auto avro_data = CreateTestData(avro_node, test_case.value_setter); + ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(projected_schema, avro_node, + avro_data, test_case.expected_json)); +} + +// Define test cases for all primitive types +const std::vector kPrimitiveTestCases = { + { + .name = "Boolean", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = + (i % 2 == 0); + }, + .expected_json = R"([{"a": true}, {"a": false}, {"a": true}])", + }, + { + .name = "Int", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = i * 100; + }, + .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])", + }, + { + .name = "Long", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = + i * 1000000LL; + }, + .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", + }, + { + .name = "Float", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = i * 3.14f; + }, + .expected_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])", + }, + { + .name = "Double", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = + i * 1.234567890; + }, + .expected_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a": 2.469135780}])", + }, + { + .name = "String", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = + "test_string_" + std::to_string(i); + }, + .expected_json = + R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a": "test_string_2"}])", + }, + { + .name = "Binary", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>() + .fieldAt(0) + .value>() = {static_cast('a' + i), + static_cast('b' + i), + static_cast('c' + i)}; + }, + .expected_json = R"([{"a": "abc"}, {"a": "bcd"}, {"a": "cde"}])", + }, + { + .name = "Fixed", + .projected_type = std::make_shared(4), + .source_type = std::make_shared(4), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>() + .fieldAt(0) + .value<::avro::GenericFixed>() + .value() = { + static_cast('a' + i), static_cast('b' + i), + static_cast('c' + i), static_cast('d' + i)}; + }, + .expected_json = R"([{"a": "abcd"}, {"a": "bcde"}, {"a": "cdef"}])", + }, + /// FIXME: NotImplemented: MakeBuilder: cannot construct builder for type + /// extension + // { + // .name = "UUID", + // .projected_type = std::make_shared(), + // .source_type = std::make_shared(), + // .value_setter = + // [](::avro::GenericDatum& datum, int i) { + // datum.value<::avro::GenericRecord>() + // .fieldAt(0) + // .value<::avro::GenericFixed>() + // .value() = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', + // 'i', 'j', 'k', 'l', 'm', 'n', 'o', + // static_cast(i)}; + // }, + // .expected_json = R"([{"a": "abcdefghijklmnop"}, {"a": "bcdefghijklmnopq"}, + // {"a": "cdefghijklmnopqr"}])", + // }, + { + .name = "Decimal", + .projected_type = std::make_shared(10, 2), + .source_type = std::make_shared(10, 2), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + int32_t decimal_value = i * 1000 + i; + std::vector& fixed = datum.value<::avro::GenericRecord>() + .fieldAt(0) + .value<::avro::GenericFixed>() + .value(); + // The byte array must contain the two's-complement representation of + // the unscaled integer value in big-endian byte order. + for (uint8_t& rvalue : std::ranges::reverse_view(fixed)) { + rvalue = static_cast(decimal_value & 0xFF); + decimal_value >>= 8; + } + }, + .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])", + }, + { + .name = "Date", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + // Date as days since epoch (1970-01-01) + // 0 = 1970-01-01, 1 = 1970-01-02, etc. + datum.value<::avro::GenericRecord>().fieldAt(0).value() = + 18000 + i; // ~2019-04-11 + i days + }, + .expected_json = R"([{"a": 18000}, {"a": 18001}, {"a": 18002}])", + }, + { + .name = "Time", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + // Time as microseconds since midnight + // 12:30:45.123456 + i seconds = 45045123456 + i*1000000 microseconds + datum.value<::avro::GenericRecord>().fieldAt(0).value() = + 45045123456LL + i * 1000000LL; + }, + .expected_json = + R"([{"a": 45045123456}, {"a": 45046123456}, {"a": 45047123456}])", + }, + { + .name = "Timestamp", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = + i * 1000000LL; + }, + .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])", + }, + { + .name = "TimestampTz", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = + 1672531200000000LL + i * 1000000LL; + }, + .expected_json = + R"([{"a": 1672531200000000}, {"a": 1672531201000000}, {"a": 1672531202000000}])", + }, + { + .name = "IntToLongPromotion", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = i * 100; + }, + .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])", + }, + { + .name = "FloatToDoublePromotion", + .projected_type = std::make_shared(), + .source_type = std::make_shared(), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + datum.value<::avro::GenericRecord>().fieldAt(0).value() = i * 1.0f; + }, + .expected_json = R"([{"a": 0.0}, {"a": 1.0}, {"a": 2.0}])", + }, + { + .name = "DecimalPrecisionPromotion", + .projected_type = std::make_shared(10, 2), + .source_type = std::make_shared(6, 2), + .value_setter = + [](::avro::GenericDatum& datum, int i) { + int32_t decimal_value = i * 1000 + i; + std::vector& fixed = datum.value<::avro::GenericRecord>() + .fieldAt(0) + .value<::avro::GenericFixed>() + .value(); + for (uint8_t& rvalue : std::ranges::reverse_view(fixed)) { + rvalue = static_cast(decimal_value & 0xFF); + decimal_value >>= 8; + } + }, + .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])", + }, +}; + +INSTANTIATE_TEST_SUITE_P(AllPrimitiveTypes, AppendDatumToBuilderTest, + ::testing::ValuesIn(kPrimitiveTestCases), + [](const ::testing::TestParamInfo& info) { + return info.param.name; + }); + +TEST(AppendDatumToBuilderTest, TwoFieldsRecord) { + Schema iceberg_schema({ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeRequired(2, "name", std::make_shared()), + }); + ::avro::NodePtr avro_node; + ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); + + std::vector<::avro::GenericDatum> avro_data; + ::avro::GenericDatum avro_datum(avro_node); + auto& record = avro_datum.value<::avro::GenericRecord>(); + record.fieldAt(0).value() = 42; + record.fieldAt(1).value() = "test"; + avro_data.push_back(avro_datum); + + ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_node, avro_data, + R"([{"id": 42, "name": "test"}])")); +} + +} // namespace iceberg::avro