Skip to content
This repository was archived by the owner on Nov 16, 2023. It is now read-only.
This repository was archived by the owner on Nov 16, 2023. It is now read-only.

parquet-go can read nested objects, but not parquet cli (Parquet/Avro schema mismatch) #483

@remiphilippe

Description

@remiphilippe

Hello,
I'm seeing a weird situation which I can't explain. I've created a parquet file in go, I can read it in go, I can read the schema from the cli, but I get an error when I cat it.

Note: I've never used the parquet command before, could be a user error too, I'm using the brew install parquet-cli
Note2: If I remove the nested structure it works fine

code:

package main

import (
	"bytes"
	"flag"
	"os"
	"time"

	"github.com/davecgh/go-spew/spew"
	"github.com/golang/glog"
	"github.com/segmentio/parquet-go"
)

type ParquetOtherElements struct {
	ElementId   string   `parquet:"elementId"`
	Type        string   `parquet:"type,enum"`
	RehomedFrom []string `parquet:"rehomedFrom,list"`
}

type ParquetElement struct {
	ElementId string `parquet:"elementId"`
	Type      string `parquet:"type,enum"`
	Label     string `parquet:"label"`
	Status    string `parquet:"status,enum"`
	Element   string `parquet:"element,json"`
}

type ParquetStruct struct {
	TenantId             string                 `parquet:"tenantId"`
	TriggerProviderId    string                 `parquet:"triggerProviderId"`
	TriggerEventId       string                 `parquet:"triggerEventId"`
	TriggerProcessorType string                 `parquet:"triggerProcessorType"`
	EventTime            int64                  `parquet:"eventTime,timestamp(nanosecond)"`
	EventFlagged         bool                   `parquet:"eventFlagged"`
	Nodes                []ParquetElement       `parquet:"nodes,list"`
	Edges                []ParquetElement       `parquet:"edges,list"`
	OtherElements        []ParquetOtherElements `parquet:"otherElements,optional,list"`
}

func main() {
	flag.Set("alsologtostderr", "true")
	flag.Set("v", "3")
	flag.Parse()

	content := new(bytes.Buffer)
	w := parquet.NewGenericWriter[ParquetStruct](content)
	p := []ParquetStruct{
		{
			TenantId:             "tenantIdV",
			TriggerProviderId:    "triggerProviderIdV",
			TriggerEventId:       "triggerEventIdV",
			TriggerProcessorType: "triggerProcessorTypeV",
			EventFlagged:         false,
			EventTime:            time.Now().UnixNano(),
			Nodes:                []ParquetElement{{ElementId: "elementIdV", Type: "typeV", Label: "labelV", Status: "statusV", Element: "{}"}},
		},
	}
	spew.Dump(p)
	if _, err := w.Write(p); err != nil {
		glog.Fatal(err)
	}
	if err := w.Close(); err != nil {
		glog.Fatal(err)
	}

	// write file from content buffer
	f, err := os.Create("myfile.parquet")
	if err != nil {
		glog.Fatal(err)
	}
	defer f.Close()
	if _, err := f.Write(content.Bytes()); err != nil {
		glog.Fatal(err)
	}

	glog.Infoln("reading buffer....")
	file := bytes.NewReader(content.Bytes())
	rows, err := parquet.Read[ParquetStruct](file, file.Size())
	if err != nil {
		glog.Fatal(err)
	}
	spew.Dump(rows)
}

bob  ~/scratch/parquet  go run .                                                                                                                                                                                                                                                                                              
([]main.ParquetStruct) (len=1 cap=1) {
 (main.ParquetStruct) {
  TenantId: (string) (len=9) "tenantIdV",
  TriggerProviderId: (string) (len=18) "triggerProviderIdV",
  TriggerEventId: (string) (len=15) "triggerEventIdV",
  TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
  EventTime: (int64) 1678235792424865000,
  EventFlagged: (bool) false,
  Nodes: ([]main.ParquetElement) (len=1 cap=1) {
   (main.ParquetElement) {
    ElementId: (string) (len=10) "elementIdV",
    Type: (string) (len=5) "typeV",
    Label: (string) (len=6) "labelV",
    Status: (string) (len=7) "statusV",
    Element: (string) (len=2) "{}"
   }
  },
  Edges: ([]main.ParquetElement) <nil>,
  OtherElements: ([]main.ParquetOtherElements) <nil>
 }
}
I0307 16:36:32.428573   62343 main.go:76] reading buffer....
([]main.ParquetStruct) (len=1 cap=1) {
 (main.ParquetStruct) {
  TenantId: (string) (len=9) "tenantIdV",
  TriggerProviderId: (string) (len=18) "triggerProviderIdV",
  TriggerEventId: (string) (len=15) "triggerEventIdV",
  TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
  EventTime: (int64) 1678235792424865000,
  EventFlagged: (bool) false,
  Nodes: ([]main.ParquetElement) (len=1 cap=1) {
   (main.ParquetElement) {
    ElementId: (string) (len=10) "elementIdV",
    Type: (string) (len=5) "typeV",
    Label: (string) (len=6) "labelV",
    Status: (string) (len=7) "statusV",
    Element: (string) (len=2) "{}"
   }
  },
  Edges: ([]main.ParquetElement) {
  },
  OtherElements: ([]main.ParquetOtherElements) <nil>
 }
}
 bob  ~/scratch/parquet  parquet cat myfile.parquet                                                                                                                                                                                                                                                                           
Unknown error
java.lang.RuntimeException: Failed on record 0
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:86)
        at org.apache.parquet.cli.Main.run(Main.java:157)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
        at org.apache.parquet.cli.Main.main(Main.java:187)
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'elementId' not found
        at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
        at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
        at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539)
        at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
        at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
        at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
        at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
        at org.apache.parquet.cli.BaseCommand$1$1.advance(BaseCommand.java:363)
        at org.apache.parquet.cli.BaseCommand$1$1.<init>(BaseCommand.java:344)
        at org.apache.parquet.cli.BaseCommand$1.iterator(BaseCommand.java:342)
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:73)
        ... 3 more
 1  bob  ~/scratch/parquet  parquet schema myfile.parquet                                                                                                                                                                                                                                                                     
{
  "type" : "record",
  "name" : "ParquetStruct",
  "fields" : [ {
    "name" : "tenantId",
    "type" : "string"
  }, {
    "name" : "triggerProviderId",
    "type" : "string"
  }, {
    "name" : "triggerEventId",
    "type" : "string"
  }, {
    "name" : "triggerProcessorType",
    "type" : "string"
  }, {
    "name" : "eventTime",
    "type" : "long"
  }, {
    "name" : "eventFlagged",
    "type" : "boolean"
  }, {
    "name" : "nodes",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "label",
              "type" : "string"
            }, {
              "name" : "status",
              "type" : "string"
            }, {
              "name" : "element",
              "type" : "bytes"
            } ]
          }
        } ]
      }
    }
  }, {
    "name" : "edges",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "namespace" : "list2",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "namespace" : "element2",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "label",
              "type" : "string"
            }, {
              "name" : "status",
              "type" : "string"
            }, {
              "name" : "element",
              "type" : "bytes"
            } ]
          }
        } ]
      }
    }
  }, {
    "name" : "otherElements",
    "type" : [ "null", {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "namespace" : "list3",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "namespace" : "element3",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "rehomedFrom",
              "type" : {
                "type" : "array",
                "items" : {
                  "type" : "record",
                  "name" : "list",
                  "namespace" : "list4",
                  "fields" : [ {
                    "name" : "element",
                    "type" : "string"
                  } ]
                }
              }
            } ]
          }
        } ]
      }
    } ],
    "default" : null
  } ]
}
 bob  ~/scratch/parquet                                                                                                                                                                                                                                                                                                         

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions