From eafc2d990d239fa225a6e62932ae485e81fba67d Mon Sep 17 00:00:00 2001 From: 3gdroid Date: Tue, 25 Mar 2025 18:28:58 -0400 Subject: [PATCH 1/2] add UnifySchemas() --- arrow/util/schemas/unify.go | 490 +++++++++++++++++++++++++++++++ arrow/util/schemas/unify_test.go | 446 ++++++++++++++++++++++++++++ 2 files changed, 936 insertions(+) create mode 100644 arrow/util/schemas/unify.go create mode 100644 arrow/util/schemas/unify_test.go diff --git a/arrow/util/schemas/unify.go b/arrow/util/schemas/unify.go new file mode 100644 index 00000000..a44fc3ee --- /dev/null +++ b/arrow/util/schemas/unify.go @@ -0,0 +1,490 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemas + +import ( + "errors" + "fmt" + "slices" + + "maps" + + "github.com/apache/arrow-go/v18/arrow" +) + +var ( + ErrPathNotFound = errors.New("path not found") + ErrFieldTypeChanged = errors.New("type changed") +) + +type schemaUnifier struct { + old *treeNode + new *treeNode + typeConversion bool +} + +type treeNode struct { + root *treeNode + parent *treeNode + name string + path []string + field arrow.Field + children []*treeNode + childmap map[string]*treeNode + metadatas arrow.Metadata + index, depth int32 + err error +} + +// UnifySchemas unifies multiple schemas into a single schema. If promotePermissive is true, the unification process will promote integer types to larger integer types, integer types to floating-point types, STRING to LARGE_STRING, LIST to LARGE_LIST and LIST_VIEW to LARGE_LIST_VIEW. If promotePermissive is false, the unification process will not allow type conversion and will return an error if a type conflict is found. +func UnifySchemas(promotePermissive bool, schemas ...*arrow.Schema) (*arrow.Schema, error) { + if len(schemas) < 2 { + return nil, fmt.Errorf("not enough schemas to unify") + } + var u schemaUnifier + u.typeConversion = promotePermissive + u.old = newTreeFromSchema(schemas[0]) + for _, s := range schemas[1:] { + u.new = newTreeFromSchema(s) + u.unify() + } + if err := collectErrors(u.old); err != nil { + return nil, err + } + return u.old.schema() +} + +func newTreeRoot() *treeNode { + f := new(treeNode) + f.index = -1 + f.root = f + f.name = "root" + f.childmap = make(map[string]*treeNode) + f.children = make([]*treeNode, 0) + return f +} + +func (f *treeNode) assignChild(child *treeNode) { + f.children = append(f.children, child) + f.childmap[child.name] = child +} + +func (f *treeNode) newChild(childName string) *treeNode { + var child treeNode = treeNode{ + root: f.root, + parent: f, + name: childName, + index: int32(len(f.children)), + depth: f.depth + 1, + } + child.path = child.namePath() + child.childmap = make(map[string]*treeNode) + return &child +} + +func (f *treeNode) mapChildren() { + for i, c := range f.children { + f.childmap[c.name] = f.children[i] + } +} + +// getPath returns a field found at a defined path, otherwise returns ErrPathNotFound. +func (f *treeNode) getPath(path []string) (*treeNode, error) { + if len(path) == 0 { // degenerate input + return nil, fmt.Errorf("getPath needs at least one key") + } + if node, ok := f.childmap[path[0]]; !ok { + return nil, ErrPathNotFound + } else if len(path) == 1 { // we've reached the final key + return node, nil + } else { // 1+ more keys + return node.getPath(path[1:]) + } +} + +// namePath returns a slice of keys making up the path to the field +func (f *treeNode) namePath() []string { + if len(f.path) == 0 { + var path []string + cur := f + for i := f.depth - 1; i >= 0; i-- { + path = append([]string{cur.name}, path...) + cur = cur.parent + } + return path + } + return f.path +} + +// namePath returns the path to the field in json dot notation +func (f *treeNode) dotPath() string { + var path string = "$" + for i, p := range f.path { + path = path + p + if i+1 != len(f.path) { + path = path + "." + } + } + return path +} + +// graft grafts a new field into the schema tree as a child of f +func (f *treeNode) graft(n *treeNode) { + fIsNullable := f.field.Nullable + graft := f.newChild(n.name) + graft.field = n.field + graft.children = append(graft.children, n.children...) + graft.mapChildren() + f.assignChild(graft) + + if !(f.root == f) && f.field.Type.ID() == arrow.STRUCT { + gf := f.field.Type.(*arrow.StructType) + var nf []arrow.Field + nf = append(nf, gf.Fields()...) + nf = append(nf, graft.field) + f.field = arrow.Field{Name: f.name, Type: arrow.StructOf(nf...), Nullable: fIsNullable} + if !(f.parent.name == "root") && (f.parent != nil) && f.parent.field.Type.ID() == arrow.LIST { + f.parent.field = arrow.Field{Name: f.parent.name, Type: arrow.ListOfField(f.field)} + } + } +} + +func collectErrors(f *treeNode) error { + var err error + if f.err != nil { + err = errors.Join(err, f.err) + } + for _, field := range f.children { + err = errors.Join(err, collectErrors(field)) + } + return err +} + +func newTreeFromSchema(a *arrow.Schema) *treeNode { + f := newTreeRoot() + f.metadatas = a.Metadata() + treeFromSchema(f, a) + return f +} + +func treeFromSchema(f *treeNode, a *arrow.Schema) { + for _, field := range a.Fields() { + child := f.newChild(field.Name) + child.field = field + child.metadatas = field.Metadata + + switch field.Type.ID() { + case arrow.STRUCT: + structType := field.Type.(*arrow.StructType) + for _, subField := range structType.Fields() { + subChild := child.newChild(subField.Name) + subChild.field = subField + child.assignChild(subChild) + treeFromSchema(subChild, arrow.NewSchema([]arrow.Field{subField}, nil)) + } + case arrow.LIST: + listType := field.Type.(*arrow.ListType) + elemField := arrow.Field{Name: "element", Type: listType.Elem()} + elemChild := child.newChild("element") + elemChild.field = elemField + child.assignChild(elemChild) + treeFromSchema(elemChild, arrow.NewSchema([]arrow.Field{elemField}, nil)) + case arrow.MAP: + mapType := field.Type.(*arrow.MapType) + keyField := arrow.Field{Name: "key", Type: mapType.KeyType()} + valueField := arrow.Field{Name: "value", Type: mapType.ItemType()} + keyChild := child.newChild("key") + valueChild := child.newChild("value") + keyChild.field = keyField + valueChild.field = valueField + child.assignChild(keyChild) + child.assignChild(valueChild) + treeFromSchema(keyChild, arrow.NewSchema([]arrow.Field{keyField}, nil)) + treeFromSchema(valueChild, arrow.NewSchema([]arrow.Field{valueField}, nil)) + case arrow.RUN_END_ENCODED: + runEndEncodedType := field.Type.(*arrow.RunEndEncodedType) + runEndField := arrow.Field{Name: "run_ends", Type: runEndEncodedType.RunEnds()} + valuesField := arrow.Field{Name: "values", Type: runEndEncodedType.Encoded()} + runEndChild := child.newChild("run_ends") + valuesChild := child.newChild("values") + runEndChild.field = runEndField + valuesChild.field = valuesField + child.assignChild(runEndChild) + child.assignChild(valuesChild) + treeFromSchema(runEndChild, arrow.NewSchema([]arrow.Field{runEndField}, nil)) + treeFromSchema(valuesChild, arrow.NewSchema([]arrow.Field{valuesField}, nil)) + case arrow.DICTIONARY: + dictType := field.Type.(*arrow.DictionaryType) + indexField := arrow.Field{Name: "index", Type: dictType.IndexType} + valueField := arrow.Field{Name: "dictionary", Type: dictType.ValueType} + indexChild := child.newChild("index") + valueChild := child.newChild("dictionary") + indexChild.field = indexField + valueChild.field = valueField + child.assignChild(indexChild) + child.assignChild(valueChild) + treeFromSchema(indexChild, arrow.NewSchema([]arrow.Field{indexField}, nil)) + treeFromSchema(valueChild, arrow.NewSchema([]arrow.Field{valueField}, nil)) + default: + // Scalar types do not need further processing + } + + f.assignChild(child) + } +} + +func (f *treeNode) schema() (*arrow.Schema, error) { + var s *arrow.Schema + defer func(s *arrow.Schema) (*arrow.Schema, error) { + if pErr := recover(); pErr != nil { + return nil, fmt.Errorf("schema problem: %v", pErr) + } + return s, nil + }(s) + var fields []arrow.Field + for _, c := range f.children { + fields = append(fields, c.field) + } + s = arrow.NewSchema(fields, &f.metadatas) + return s, nil +} + +func (o *treeNode) upgradeType(n *treeNode, t arrow.DataType) { + // Update the current node's field and type + o.field = arrow.Field{Name: o.name, Type: t, Nullable: mergeBool(o.field.Nullable, n.field.Nullable)} + + // Update the parent node if it exists and is not the root + if o.parent != o.root { + switch o.parent.field.Type.ID() { + case arrow.LIST: + // Update the parent LIST type to reflect the upgraded child type + o.parent.field = arrow.Field{Name: o.parent.name, Type: arrow.ListOfField(o.field)} + case arrow.LARGE_LIST: + // Update the parent LARGE_LIST type to reflect the upgraded child type + o.parent.field = arrow.Field{Name: o.parent.name, Type: arrow.LargeListOfField(o.field)} + case arrow.LIST_VIEW: + // Update the parent LIST_VIEW type to reflect the upgraded child type + o.parent.field = arrow.Field{Name: o.parent.name, Type: arrow.ListViewOfField(o.field)} + case arrow.LARGE_LIST_VIEW: + // Update the parent LARGE_LIST_VIEW type to reflect the upgraded child type + o.parent.field = arrow.Field{Name: o.parent.name, Type: arrow.LargeListViewOfField(o.field)} + case arrow.STRUCT: + // Update the parent STRUCT type to include the upgraded child field + var fields []arrow.Field + for _, c := range o.parent.children { + fields = append(fields, c.field) + } + o.parent.field = arrow.Field{ + Name: o.parent.name, + Type: arrow.StructOf(fields...), + Nullable: mergeBool(o.parent.field.Nullable, n.parent.field.Nullable), + } + + case arrow.MAP: + // Update the parent MAP type to reflect the upgraded key or value type + var keyType, valueType arrow.DataType + for _, c := range o.parent.children { + if c.name == "key" { + keyType = c.field.Type + } else if c.name == "value" { + valueType = c.field.Type + } + } + o.parent.field = arrow.Field{ + Name: o.parent.name, + Type: arrow.MapOf(keyType, valueType), + Nullable: mergeBool(o.parent.field.Nullable, n.parent.field.Nullable), + } + + case arrow.DICTIONARY: + // Update the parent DICTIONARY type to reflect the upgraded index or value type + var indexType, valueType arrow.DataType + for _, c := range o.parent.children { + if c.name == "index" { + indexType = c.field.Type + } else if c.name == "dictionary" { + valueType = c.field.Type + } + } + o.parent.field = arrow.Field{ + Name: o.parent.name, + Type: &arrow.DictionaryType{IndexType: indexType, ValueType: valueType}, + Nullable: mergeBool(o.parent.field.Nullable, n.parent.field.Nullable), + } + + case arrow.RUN_END_ENCODED: + // Update the parent RUN_END_ENCODED type to reflect the upgraded run_ends or values type + var runEndsType, valuesType arrow.DataType + for _, c := range o.parent.children { + if c.name == "run_ends" { + runEndsType = c.field.Type + } else if c.name == "values" { + valuesType = c.field.Type + } + } + o.parent.field = arrow.Field{ + Name: o.parent.name, + Type: arrow.RunEndEncodedOf(runEndsType, valuesType), + Nullable: mergeBool(o.parent.field.Nullable, n.parent.field.Nullable), + } + } + } +} + +func (u *schemaUnifier) unify() { + if u.new.metadatas.Len() > 0 { + u.old.mergeMetadata(u.new.metadatas) + } + for _, field := range u.new.children { + u.merge(field, nil) + } +} + +// merge merges a new or changed field into the unified schema. +func (u *schemaUnifier) merge(n *treeNode, mergeAt []string) { + var nPath, nParentPath []string + if len(mergeAt) > 0 { + nPath = slices.Concat(mergeAt, n.path) + nParentPath = slices.Concat(mergeAt, n.parent.path) + } else { + nPath = n.path + nParentPath = n.parent.path + } + // if field in new schema is not found in old schema, graft it in. + if kin, err := u.old.getPath(nPath); err == ErrPathNotFound { + // root graft + if n.root == n.parent { + u.old.root.graft(n) + } else { + // branch graft + b, _ := u.old.getPath(nParentPath) + b.graft(n) + } + } else { + // if field in new schema is found in old schema, check for type changes. + if n.field.HasMetadata() { + kin.mergeMetadata(n.field.Metadata) + } + // no type conversion + if !u.typeConversion && (!kin.field.Equal(n.field) && kin.field.Type.ID() != n.field.Type.ID()) { + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + } + // promote nullability + if u.typeConversion && (!kin.field.Equal(n.field) && kin.field.Type.ID() == n.field.Type.ID() && kin.field.Nullable != n.field.Nullable) { + kin.upgradeType(n, n.field.Type) + } + // type convertion + if u.typeConversion && (!kin.field.Equal(n.field) && kin.field.Type.ID() != n.field.Type.ID()) { + // handle integers and floats + switch kt := kin.field.Type.(type) { + case arrow.FixedWidthDataType: + switch nt := n.field.Type.(type) { + case arrow.FixedWidthDataType: + // unsigned integers can be upgraded to types with double the bit width + if arrow.IsUnsignedInteger(kt.ID()) { + if kt.BitWidth()*2 == nt.BitWidth() { + kin.upgradeType(n, n.field.Type) + } else { + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + } + } + // integers can be upgraded to integer types of a higher bit width + if arrow.IsSignedInteger(kt.ID()) && arrow.IsSignedInteger(nt.ID()) { + if kt.BitWidth() < nt.BitWidth() { + kin.upgradeType(n, n.field.Type) + } else { + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + } + } + // ints can be upgraded to floating point types of a higher bit width + if arrow.IsSignedInteger(kt.ID()) && arrow.IsFloating(nt.ID()) { + if kt.BitWidth() < nt.BitWidth() { + kin.upgradeType(n, n.field.Type) + } else { + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + } + } + default: + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + } + default: + switch kin.field.Type.ID() { + // NULL upgrades to any type - no type can be downgraded to NULL + case arrow.NULL: + kin.upgradeType(n, n.field.Type) + // STRING can be upgraded to LARGE_STRING + case arrow.STRING: + switch n.field.Type.ID() { + case arrow.LARGE_STRING: + kin.upgradeType(n, n.field.Type) + default: + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + return + } + // BINARY can be upgraded to LARGE_BINARY + case arrow.BINARY: + switch n.field.Type.ID() { + case arrow.LARGE_BINARY: + kin.upgradeType(n, n.field.Type) + default: + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + return + } + // LIST can be upgraded to LARGE_LIST + case arrow.LIST: + switch n.field.Type.ID() { + case arrow.LARGE_LIST: + kin.upgradeType(n, n.field.Type) + default: + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + return + } + // LIST_VIEW can be upgraded to LARGE_LIST_VIEW + case arrow.LIST_VIEW: + switch n.field.Type.ID() { + case arrow.LARGE_LIST_VIEW: + kin.upgradeType(n, n.field.Type) + default: + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + return + } + // + default: + kin.err = errors.Join(kin.err, fmt.Errorf("%s %v to %v", n.dotPath(), ErrFieldTypeChanged.Error(), n.field.Type.Name())) + return + } + } + + } + + } + for _, v := range n.childmap { + u.merge(v, mergeAt) + } +} + +func (f *treeNode) mergeMetadata(m1 arrow.Metadata) { + merged := make(map[string]string) + + maps.Copy(merged, m1.ToMap()) + maps.Copy(merged, f.metadatas.ToMap()) + f.field.Metadata = arrow.MetadataFrom(merged) +} + +func mergeBool(a, b bool) bool { + return a || b +} diff --git a/arrow/util/schemas/unify_test.go b/arrow/util/schemas/unify_test.go new file mode 100644 index 00000000..9d45f7d4 --- /dev/null +++ b/arrow/util/schemas/unify_test.go @@ -0,0 +1,446 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemas + +import ( + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/stretchr/testify/assert" +) + +func TestUnifySchemas(t *testing.T) { + schema1Metadata := arrow.MetadataFrom(map[string]string{"source": "schema1"}) + schema1 := arrow.NewSchema( + []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + }, + &schema1Metadata, + ) + + t.Run("UnifySchemas with default promotion (no type conversion)", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Float64}, // Conflicting type + }, + nil, + ) + + _, err := UnifySchemas(false, schema1, schema2) + assert.Error(t, err) // Type conversion is not allowed + }) + + t.Run("UnifySchemas with permissive promotion (int32 to float64)", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Float64}, // Upgrade int32 to float64 + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Float64}, // Promoted to float64 + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with permissive promotion (int16 to int32)", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int32)}, // Upgrade int16 to int32 + )}, + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int32), Nullable: true}, // Promoted to int32 + ), Nullable: true}, + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with unsupported type upgrade", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "f1", Type: arrow.BinaryTypes.Binary}, // Unsupported upgrade + }, + nil, + ) + + _, err := UnifySchemas(true, schema1, schema2) + assert.Error(t, err) // Unsupported type upgrade should return an error + }) + + t.Run("UnifySchemas with identical schemas", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + }, + &schema1Metadata, + ) + + unifiedSchema, err := UnifySchemas(false, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + assert.Equal(t, schema1.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with additional field in second schema", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + {Name: "f5", Type: arrow.FixedWidthTypes.Boolean}, // New field + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + {Name: "f5", Type: arrow.FixedWidthTypes.Boolean}, // Added field + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with conflicting nullable property", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, // Nullable conflict + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, // Resolved as nullable + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with LIST to LARGE_LIST upgrade", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f4", Type: arrow.LargeListOf(arrow.PrimitiveTypes.Int16)}, // Upgrade LIST to LARGE_LIST + )}, + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.LargeListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, // Promoted to LARGE_LIST + ), Nullable: true}, + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with metadata merge", func(t *testing.T) { + schema2Metadata := arrow.MetadataFrom(map[string]string{"source": "schema2"}) + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + }, + &schema2Metadata, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedMetadata := arrow.MetadataFrom(map[string]string{"source": "schema1"}) + assert.Equal(t, expectedMetadata, unifiedSchema.Metadata()) + }) + + t.Run("UnifySchemas with STRING to LARGE_STRING upgrade", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "f5", Type: arrow.BinaryTypes.LargeString}, // Upgrade STRING to LARGE_STRING + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + {Name: "f5", Type: arrow.BinaryTypes.LargeString}, // Added field + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with integer promotion to incompatible float type", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Float32}, // Upgrade int32 to float32 + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.Error(t, err) // Float32 cannot safely store all int32 values + assert.Nil(t, unifiedSchema) + }) + t.Run("UnifySchemas with MAP type upgrade", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "mapField", Type: arrow.MapOf(arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Float64)}, // New MAP field + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + {Name: "mapField", Type: arrow.MapOf(arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Float64)}, // Added MAP field + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with DICTIONARY type upgrade", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "dictField", Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: arrow.BinaryTypes.String, + }}, // New DICTIONARY field + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + {Name: "dictField", Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: arrow.BinaryTypes.String, + }}, // Added DICTIONARY field + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with RUN_END_ENCODING type upgrade", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "runEndField", Type: arrow.RunEndEncodedOf( + arrow.PrimitiveTypes.Int32, + arrow.BinaryTypes.String, + )}, // New RUN_END_ENCODING field + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + {Name: "runEndField", Type: arrow.RunEndEncodedOf( + arrow.PrimitiveTypes.Int32, + arrow.BinaryTypes.String, + )}, // Added RUN_END_ENCODING field + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + t.Run("UnifySchemas with RUN_END_ENCODING type upgrade", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "runEndField", Type: arrow.RunEndEncodedOf( + arrow.PrimitiveTypes.Int64, // Upgrade run_ends from Int32 to Int64 + arrow.PrimitiveTypes.Float64, // Upgrade values from Int32 to Float64 + )}, + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + {Name: "runEndField", Type: arrow.RunEndEncodedOf( + arrow.PrimitiveTypes.Int64, // Upgraded run_ends + arrow.PrimitiveTypes.Float64, // Upgraded values + )}, + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with DICTIONARY type upgrade", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "dictField", Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int64, // Upgrade index from Int32 to Int64 + ValueType: arrow.BinaryTypes.LargeString, // Upgrade value from String to LargeString + }}, + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + {Name: "dictField", Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int64, // Upgraded index + ValueType: arrow.BinaryTypes.LargeString, // Upgraded value + }}, + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) + + t.Run("UnifySchemas with MAP type upgrade", func(t *testing.T) { + schema2 := arrow.NewSchema( + []arrow.Field{ + {Name: "mapField", Type: arrow.MapOf( + arrow.PrimitiveTypes.Int64, // Upgrade key from Int32 to Int64 + arrow.PrimitiveTypes.Float64, // Upgrade value from Int32 to Float64 + )}, + }, + nil, + ) + + unifiedSchema, err := UnifySchemas(true, schema1, schema2) + assert.NoError(t, err) + assert.NotNil(t, unifiedSchema) + + expectedFields := []arrow.Field{ + {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2", Type: arrow.PrimitiveTypes.Float64}, + {Name: "structField", Type: arrow.StructOf( + arrow.Field{Name: "f3", Type: arrow.PrimitiveTypes.Int64}, + arrow.Field{Name: "f4", Type: arrow.ListOf(arrow.PrimitiveTypes.Int16), Nullable: true}, + ), Nullable: true}, + {Name: "mapField", Type: arrow.MapOf( + arrow.PrimitiveTypes.Int64, // Upgraded key + arrow.PrimitiveTypes.Float64, // Upgraded value + )}, + } + expectedSchema := arrow.NewSchema(expectedFields, &schema1Metadata) + assert.Equal(t, expectedSchema.Fields(), unifiedSchema.Fields()) + }) +} From ed2e9ed39668345627163582e7001ed82a8f8386 Mon Sep 17 00:00:00 2001 From: 3gdroid Date: Wed, 26 Mar 2025 08:53:45 -0400 Subject: [PATCH 2/2] fix typo in comment --- arrow/util/schemas/unify.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow/util/schemas/unify.go b/arrow/util/schemas/unify.go index a44fc3ee..64b79291 100644 --- a/arrow/util/schemas/unify.go +++ b/arrow/util/schemas/unify.go @@ -130,7 +130,7 @@ func (f *treeNode) namePath() []string { return f.path } -// namePath returns the path to the field in json dot notation +// dotPath returns the path to the field in json dot notation func (f *treeNode) dotPath() string { var path string = "$" for i, p := range f.path {