From 922124a24cdaad94cf1968c65d54dcfb8d2efc27 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Sat, 19 Oct 2024 23:46:30 +0200 Subject: [PATCH 1/6] Implement a geoparquet writer --- .../geoparquet/OvertureMapsBenchmark.java | 6 +- .../geoparquet/SmallFileBenchmark.java | 7 +- .../geoparquet/GeoParquetDataTable.java | 3 +- .../baremaps/geoparquet/GeoParquetGroup.java | 2 +- .../baremaps/geoparquet/GeoParquetReader.java | 26 ++-- .../geoparquet/GeoParquetWriteSupport.java | 133 ++++++++++++++++++ .../baremaps/geoparquet/GeoParquetWriter.java | 76 ++++++++++ .../geoparquet/GeoParquetReaderTest.java | 12 +- .../geoparquet/GeoParquetWriterTest.java | 116 +++++++++++++++ .../baremaps/geoparquet/OvertureMapsTest.java | 32 ++--- 10 files changed, 364 insertions(+), 49 deletions(-) create mode 100644 baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java create mode 100644 baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java create mode 100644 baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java diff --git a/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/OvertureMapsBenchmark.java b/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/OvertureMapsBenchmark.java index a230e41da..59da4d153 100644 --- a/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/OvertureMapsBenchmark.java +++ b/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/OvertureMapsBenchmark.java @@ -83,14 +83,16 @@ public void setup() throws IOException { @SuppressWarnings({"squid:S1481", "squid:S2201"}) @Benchmark public void read() { - GeoParquetReader reader = new GeoParquetReader(directory.toUri()); + var path = new org.apache.hadoop.fs.Path(directory.toUri()); + GeoParquetReader reader = new GeoParquetReader(path); reader.read().count(); } @SuppressWarnings({"squid:S1481", "squid:S2201"}) @Benchmark public void readParallel() { - GeoParquetReader reader = new GeoParquetReader(directory.toUri()); + var path = new org.apache.hadoop.fs.Path(directory.toUri()); + GeoParquetReader reader = new GeoParquetReader(path); reader.readParallel().count(); } } diff --git a/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/SmallFileBenchmark.java b/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/SmallFileBenchmark.java index 11f468f00..1ae2a7476 100644 --- a/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/SmallFileBenchmark.java +++ b/baremaps-benchmarking/src/main/java/org/apache/baremaps/benchmarking/geoparquet/SmallFileBenchmark.java @@ -61,16 +61,17 @@ public void setup() throws IOException { @SuppressWarnings({"squid:S1481", "squid:S2201"}) @Benchmark public void read() { - GeoParquetReader reader = - new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri()); + var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet"); + GeoParquetReader reader = new GeoParquetReader(path); reader.read().count(); } @SuppressWarnings({"squid:S1481", "squid:S2201"}) @Benchmark public void readParallel() { + var path = new org.apache.hadoop.fs.Path("baremaps-benchmarking/data/small/*.parquet"); GeoParquetReader reader = - new GeoParquetReader(Path.of("baremaps-benchmarking/data/small/*.parquet").toUri()); + new GeoParquetReader(path); reader.readParallel().count(); } } diff --git a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java index 90c78d1a7..b3867cfa6 100644 --- a/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java +++ b/baremaps-core/src/main/java/org/apache/baremaps/storage/geoparquet/GeoParquetDataTable.java @@ -24,6 +24,7 @@ import org.apache.baremaps.data.storage.*; import org.apache.baremaps.geoparquet.GeoParquetException; import org.apache.baremaps.geoparquet.GeoParquetReader; +import org.apache.hadoop.fs.Path; public class GeoParquetDataTable implements DataTable { @@ -35,7 +36,7 @@ public class GeoParquetDataTable implements DataTable { public GeoParquetDataTable(URI path) { this.path = path; - this.reader = new GeoParquetReader(path); + this.reader = new GeoParquetReader(new Path(path)); } @Override diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java index 38c6dd746..e524a229f 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetGroup.java @@ -105,7 +105,7 @@ public int getFieldRepetitionCount(int fieldIndex) { } } - private Object getValue(int fieldIndex, int index) { + Object getValue(int fieldIndex, int index) { Object value = data[fieldIndex]; if (value instanceof Listlist) { return list.get(index); diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java index 8b4697aa1..e965d250a 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetReader.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.net.URI; import java.util.*; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -53,31 +52,31 @@ public class GeoParquetReader { /** * Constructs a new {@code GeoParquetReader}. * - * @param uri the URI to read from + * @param path the path to read from */ - public GeoParquetReader(URI uri) { - this(uri, null, new Configuration()); + public GeoParquetReader(Path path) { + this(path, null, new Configuration()); } /** * Constructs a new {@code GeoParquetReader}. * - * @param uri the URI to read from + * @param path the path to read from * @param envelope the envelope to filter records */ - public GeoParquetReader(URI uri, Envelope envelope) { - this(uri, envelope, new Configuration()); + public GeoParquetReader(Path path, Envelope envelope) { + this(path, envelope, new Configuration()); } /** * Constructs a new {@code GeoParquetReader}. * - * @param uri the URI to read from + * @param path the path to read from * @param configuration the configuration */ - public GeoParquetReader(URI uri, Envelope envelope, Configuration configuration) { + public GeoParquetReader(Path path, Envelope envelope, Configuration configuration) { this.configuration = configuration; - this.files = initializeFiles(uri, configuration); + this.files = initializeFiles(path, configuration); this.envelope = envelope; } @@ -168,11 +167,10 @@ private FileInfo getFileInfo(FileStatus fileStatus) { } } - private static List initializeFiles(URI uri, Configuration configuration) { + private static List initializeFiles(Path path, Configuration configuration) { try { - Path globPath = new Path(uri.getPath()); - FileSystem fileSystem = FileSystem.get(uri, configuration); - FileStatus[] fileStatuses = fileSystem.globStatus(globPath); + FileSystem fileSystem = FileSystem.get(path.toUri(), configuration); + FileStatus[] fileStatuses = fileSystem.globStatus(path); if (fileStatuses == null) { throw new GeoParquetException("No files found at the specified URI."); } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java new file mode 100644 index 000000000..236737058 --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.*; + +/** + * WriteSupport implementation for writing GeoParquetGroup instances to Parquet. + */ +public class GeoParquetWriteSupport extends WriteSupport { + + private RecordConsumer recordConsumer; + private final MessageType schema; + private final GeoParquetMetadata metadata; + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Constructs a new GeoParquetWriteSupport. + * + * @param schema the Parquet schema + * @param metadata the GeoParquet metadata + */ + public GeoParquetWriteSupport(MessageType schema, GeoParquetMetadata metadata) { + this.schema = schema; + this.metadata = metadata; + } + + @Override + public WriteContext init(Configuration configuration) { + Map extraMetadata = new HashMap<>(); + // Serialize the GeoParquet metadata to JSON and add it to the file metadata + String geoMetadataJson = serializeMetadata(metadata); + extraMetadata.put("geo", geoMetadataJson); + + return new WriteContext(schema, extraMetadata); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(GeoParquetGroup group) { + writeGroup(group, schema); + } + + private void writeGroup(GeoParquetGroup group, GroupType groupType) { + recordConsumer.startMessage(); + for (int i = 0; i < groupType.getFieldCount(); i++) { + Type fieldType = groupType.getType(i); + String fieldName = fieldType.getName(); + int repetitionCount = group.getFieldRepetitionCount(i); + if (repetitionCount == 0) { + continue; // Skip if no values are present + } + for (int j = 0; j < repetitionCount; j++) { + recordConsumer.startField(fieldName, i); + if (fieldType.isPrimitive()) { + Object value = group.getValue(i, j); + writePrimitive(value, fieldType.asPrimitiveType()); + } else { + GeoParquetGroup childGroup = group.getGroup(i, j); + writeGroup(childGroup, fieldType.asGroupType()); + } + recordConsumer.endField(fieldName, i); + } + } + recordConsumer.endMessage(); + } + + private void writePrimitive(Object value, PrimitiveType primitiveType) { + if (value == null) { + // The Parquet format does not support writing null values directly. + // If the field is optional and the value is null, we simply do not write it. + return; + } + switch (primitiveType.getPrimitiveTypeName()) { + case INT32: + recordConsumer.addInteger((Integer) value); + break; + case INT64: + recordConsumer.addLong((Long) value); + break; + case FLOAT: + recordConsumer.addFloat((Float) value); + break; + case DOUBLE: + recordConsumer.addDouble((Double) value); + break; + case BOOLEAN: + recordConsumer.addBoolean((Boolean) value); + break; + case BINARY, FIXED_LEN_BYTE_ARRAY: + recordConsumer.addBinary((Binary) value); + break; + default: + throw new GeoParquetException( + "Unsupported type: " + primitiveType.getPrimitiveTypeName()); + } + } + + private String serializeMetadata(GeoParquetMetadata metadata) { + try { + return objectMapper.writeValueAsString(metadata); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize GeoParquet metadata", e); + } + } +} diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java new file mode 100644 index 000000000..39fef2085 --- /dev/null +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; + +/** + * A writer for GeoParquet files that writes GeoParquetGroup instances to a Parquet file. + */ +public class GeoParquetWriter implements AutoCloseable { + + private final ParquetWriter parquetWriter; + + /** + * Constructs a new GeoParquetWriter. + * + * @param outputFile the output file + * @param schema the Parquet schema + * @param metadata the GeoParquet metadata + * @throws IOException if an I/O error occurs + */ + public GeoParquetWriter(Path outputFile, MessageType schema, GeoParquetMetadata metadata) + throws IOException { + this.parquetWriter = new ParquetWriter<>( + outputFile, + new GeoParquetWriteSupport(schema, metadata), + CompressionCodecName.UNCOMPRESSED, + ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.DEFAULT_PAGE_SIZE, + ParquetWriter.DEFAULT_PAGE_SIZE, + ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, + ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, + WriterVersion.PARQUET_2_0, + new Configuration()); + } + + /** + * Writes a GeoParquetGroup to the Parquet file. + * + * @param group the GeoParquetGroup to write + * @throws IOException if an I/O error occurs + */ + public void write(GeoParquetGroup group) throws IOException { + parquetWriter.write(group); + } + + /** + * Closes the writer and releases any system resources associated with it. + * + * @throws IOException if an I/O error occurs + */ + public void close() throws IOException { + parquetWriter.close(); + } +} diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java index 849720a3e..b7b73a9d5 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetReaderTest.java @@ -19,8 +19,8 @@ import static org.junit.jupiter.api.Assertions.*; -import java.net.URI; import org.apache.baremaps.testing.TestFiles; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import org.locationtech.jts.geom.Envelope; @@ -28,14 +28,14 @@ class GeoParquetReaderTest { @Test void read() { - URI geoParquet = TestFiles.GEOPARQUET.toUri(); + Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri()); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); assertEquals(5, geoParquetReader.read().count()); } @Test void readFiltered() { - URI geoParquet = TestFiles.GEOPARQUET.toUri(); + Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri()); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet, new Envelope(-172, -65, 18, 72)); assertEquals(1, geoParquetReader.read().count()); @@ -43,21 +43,21 @@ void readFiltered() { @Test void size() { - URI geoParquet = TestFiles.GEOPARQUET.toUri(); + Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri()); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); assertEquals(5, geoParquetReader.size()); } @Test void count() { - URI geoParquet = TestFiles.GEOPARQUET.toUri(); + Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri()); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); assertEquals(5, geoParquetReader.read().count()); } @Test void validateSchemas() { - URI geoParquet = TestFiles.GEOPARQUET.toUri(); + Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri()); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet); assertTrue(geoParquetReader.validateSchemasAreIdentical()); } diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java new file mode 100644 index 000000000..fc20d3ff4 --- /dev/null +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.baremaps.geoparquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.baremaps.geoparquet.GeoParquetMetadata.Column; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Point; + +class GeoParquetWriterTest { + + @Test + @Tag("integration") + void testWriteAndReadGeoParquet() throws IOException { + // Create the output file + Configuration conf = new Configuration(); + Path outputPath = new Path("target/test-output/geoparquet-test.parquet"); + + try { + // Define the Parquet schema + MessageType schema = Types.buildMessage() + .required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name") + .required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("city") + .optional(PrimitiveTypeName.BINARY).named("geometry") + .named("GeoParquetSchema"); + + // Create GeoParquet metadata + Map columns = new HashMap<>(); + columns.put("geometry", new GeoParquetMetadata.Column( + "WKB", + Arrays.asList("Point"), + null, + null, + null, + null)); + + GeoParquetMetadata metadata = new GeoParquetMetadata( + "1.0", + "geometry", + columns, + null, + null, + null, + null, + null, + null, + null); + + // Create a Point geometry + GeometryFactory geometryFactory = new GeometryFactory(); + Point point = geometryFactory.createPoint(new Coordinate(1.0, 2.0)); + + // Create the GeoParquetWriter + try (GeoParquetWriter writer = new GeoParquetWriter(outputPath, schema, metadata)) { + // Create a GeoParquetGroup and write it + GeoParquetSchema geoParquetSchema = + GeoParquetGroupFactory.createGeoParquetSchema(schema, metadata); + GeoParquetGroup group = + new GeoParquetGroup(schema.asGroupType(), metadata, geoParquetSchema); + group.add("name", "Test Point"); + group.add("city", "Test City"); + group.add("geometry", point); + + // Write the group + writer.write(group); + } + + // Now read back the file using GeoParquetReader + GeoParquetReader reader = new GeoParquetReader(outputPath, null, conf); + GeoParquetGroup readGroup = reader.read().findFirst().orElse(null); + + assertNotNull(readGroup, "Read group should not be null"); + + // Verify the data + assertEquals("Test Point", readGroup.getStringValue("name")); + assertEquals("Test City", readGroup.getStringValue("city")); + + Point readPoint = (Point) readGroup.getGeometryValue("geometry"); + assertEquals(point.getX(), readPoint.getX(), 0.0001); + assertEquals(point.getY(), readPoint.getY(), 0.0001); + } finally { + outputPath.getFileSystem(conf).delete(outputPath, false); + } + } + +} diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/OvertureMapsTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/OvertureMapsTest.java index 0fb5b9205..206aa9f19 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/OvertureMapsTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/OvertureMapsTest.java @@ -20,8 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.net.URI; -import java.net.URISyntaxException; +import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.locationtech.jts.geom.Envelope; @@ -30,9 +29,9 @@ class OvertureMapsTest { @Disabled("Requires access to the Internet") @Test - void countAddresses() throws URISyntaxException { - URI geoParquet = new URI( - "s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); + void countAddressesInSwitzerland() { + Path geoParquet = + new Path("s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); Envelope switzerland = new Envelope(6.02260949059, 10.4427014502, 45.7769477403, 47.8308275417); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet, switzerland, OvertureMaps.configuration()); @@ -41,20 +40,9 @@ void countAddresses() throws URISyntaxException { @Disabled("Requires access to the Internet") @Test - void countAddressesInSwitzerland() throws URISyntaxException { - URI geoParquet = new URI( - "s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); - Envelope switzerland = new Envelope(6.02260949059, 10.4427014502, 45.7769477403, 47.8308275417); - GeoParquetReader geoParquetReader = - new GeoParquetReader(geoParquet, switzerland, OvertureMaps.configuration()); - assertEquals(10397434, geoParquetReader.readParallel().count()); - } - - @Disabled("Requires access to the Internet") - @Test - void validateSchemas() throws URISyntaxException { - URI geoParquet = new URI( - "s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); + void validateSchemas() { + Path geoParquet = + new Path("s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet, null, OvertureMaps.configuration()); assertTrue(geoParquetReader.validateSchemasAreIdentical(), "Schemas are identical"); @@ -62,9 +50,9 @@ void validateSchemas() throws URISyntaxException { @Disabled("Requires access to the Internet") @Test - void size() throws URISyntaxException { - URI geoParquet = new URI( - "s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); + void size() { + Path geoParquet = + new Path("s3a://overturemaps-us-west-2/release/2024-09-18.0/theme=addresses/**/*.parquet"); GeoParquetReader geoParquetReader = new GeoParquetReader(geoParquet, null, OvertureMaps.configuration()); assertEquals(213535887L, geoParquetReader.size()); From e733d95674f641e055d245bb67c4a06e895f5416 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Mon, 21 Oct 2024 21:53:33 +0200 Subject: [PATCH 2/6] Implement the builder --- .../baremaps/geoparquet/GeoParquetWriter.java | 73 +++++++++++++++---- 1 file changed, 58 insertions(+), 15 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java index 39fef2085..7d23d8857 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java @@ -18,11 +18,14 @@ package org.apache.baremaps.geoparquet; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.example.ExampleParquetWriter.Builder; +import org.apache.parquet.io.OutputFile; import org.apache.parquet.schema.MessageType; /** @@ -35,24 +38,17 @@ public class GeoParquetWriter implements AutoCloseable { /** * Constructs a new GeoParquetWriter. * - * @param outputFile the output file + * @param path the output file * @param schema the Parquet schema * @param metadata the GeoParquet metadata * @throws IOException if an I/O error occurs */ - public GeoParquetWriter(Path outputFile, MessageType schema, GeoParquetMetadata metadata) + public GeoParquetWriter(Path path, MessageType schema, GeoParquetMetadata metadata) throws IOException { - this.parquetWriter = new ParquetWriter<>( - outputFile, - new GeoParquetWriteSupport(schema, metadata), - CompressionCodecName.UNCOMPRESSED, - ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, - ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, - ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, - WriterVersion.PARQUET_2_0, - new Configuration()); + parquetWriter = new Builder(path) + .withType(schema) + .withMetadata(metadata) + .build(); } /** @@ -70,7 +66,54 @@ public void write(GeoParquetGroup group) throws IOException { * * @throws IOException if an I/O error occurs */ + @Override public void close() throws IOException { parquetWriter.close(); } + + public static class Builder + extends ParquetWriter.Builder { + + private MessageType type = null; + + private GeoParquetMetadata metadata = null; + + private Builder(Path file) { + super(file); + } + + private Builder(OutputFile file) { + super(file); + } + + public GeoParquetWriter.Builder withType(MessageType type) { + this.type = type; + return this; + } + + public GeoParquetWriter.Builder withMetadata(GeoParquetMetadata metadata) { + this.metadata = metadata; + return this; + } + + @Override + protected GeoParquetWriter.Builder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return getWriteSupport((ParquetConfiguration) null); + } + + @Override + protected WriteSupport getWriteSupport(ParquetConfiguration conf) { + return new GeoParquetWriteSupport(type, metadata); + } + + @Override + public GeoParquetWriter.Builder withExtraMetaData(Map extraMetaData) { + return super.withExtraMetaData(extraMetaData); + } + } } From 5cb3cd20c013c80aa7c00a38375d692173470b3a Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Mon, 21 Oct 2024 22:00:23 +0200 Subject: [PATCH 3/6] Fix deprecation notices --- .../baremaps/geoparquet/GeoParquetWriterTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java index fc20d3ff4..98ccd5248 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java @@ -21,14 +21,14 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.baremaps.geoparquet.GeoParquetMetadata.Column; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Types; import org.junit.jupiter.api.Tag; @@ -49,8 +49,8 @@ void testWriteAndReadGeoParquet() throws IOException { try { // Define the Parquet schema MessageType schema = Types.buildMessage() - .required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("name") - .required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("city") + .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("name") + .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("city") .optional(PrimitiveTypeName.BINARY).named("geometry") .named("GeoParquetSchema"); @@ -58,7 +58,7 @@ void testWriteAndReadGeoParquet() throws IOException { Map columns = new HashMap<>(); columns.put("geometry", new GeoParquetMetadata.Column( "WKB", - Arrays.asList("Point"), + List.of("Point"), null, null, null, From dcc468e6adf4ba69ac1986042dc3132f19126922 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Mon, 21 Oct 2024 22:25:25 +0200 Subject: [PATCH 4/6] Use the builder instead of the constructor --- .../geoparquet/GeoParquetWriteSupport.java | 2 +- .../baremaps/geoparquet/GeoParquetWriter.java | 57 +++---------------- .../geoparquet/GeoParquetWriterTest.java | 13 +++-- 3 files changed, 17 insertions(+), 55 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java index 236737058..362d898a8 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java @@ -127,7 +127,7 @@ private String serializeMetadata(GeoParquetMetadata metadata) { try { return objectMapper.writeValueAsString(metadata); } catch (JsonProcessingException e) { - throw new RuntimeException("Failed to serialize GeoParquet metadata", e); + throw new GeoParquetException("Failed to serialize GeoParquet metadata", e); } } } diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java index 7d23d8857..16243eac8 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java @@ -17,58 +17,24 @@ package org.apache.baremaps.geoparquet; -import java.io.IOException; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.hadoop.example.ExampleParquetWriter.Builder; -import org.apache.parquet.io.OutputFile; import org.apache.parquet.schema.MessageType; /** * A writer for GeoParquet files that writes GeoParquetGroup instances to a Parquet file. */ -public class GeoParquetWriter implements AutoCloseable { +public class GeoParquetWriter { - private final ParquetWriter parquetWriter; - - /** - * Constructs a new GeoParquetWriter. - * - * @param path the output file - * @param schema the Parquet schema - * @param metadata the GeoParquet metadata - * @throws IOException if an I/O error occurs - */ - public GeoParquetWriter(Path path, MessageType schema, GeoParquetMetadata metadata) - throws IOException { - parquetWriter = new Builder(path) - .withType(schema) - .withMetadata(metadata) - .build(); - } - - /** - * Writes a GeoParquetGroup to the Parquet file. - * - * @param group the GeoParquetGroup to write - * @throws IOException if an I/O error occurs - */ - public void write(GeoParquetGroup group) throws IOException { - parquetWriter.write(group); + private GeoParquetWriter() { + // Prevent instantiation } - /** - * Closes the writer and releases any system resources associated with it. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - parquetWriter.close(); + public static Builder builder(Path file) { + return new Builder(file); } public static class Builder @@ -82,10 +48,6 @@ private Builder(Path file) { super(file); } - private Builder(OutputFile file) { - super(file); - } - public GeoParquetWriter.Builder withType(MessageType type) { this.type = type; return this; @@ -96,11 +58,6 @@ public GeoParquetWriter.Builder withMetadata(GeoParquetMetadata metadata) { return this; } - @Override - protected GeoParquetWriter.Builder self() { - return this; - } - @Override protected WriteSupport getWriteSupport(Configuration conf) { return getWriteSupport((ParquetConfiguration) null); @@ -112,8 +69,8 @@ protected WriteSupport getWriteSupport(ParquetConfiguration con } @Override - public GeoParquetWriter.Builder withExtraMetaData(Map extraMetaData) { - return super.withExtraMetaData(extraMetaData); + protected GeoParquetWriter.Builder self() { + return this; } } } diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java index 98ccd5248..9dbeae9d3 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java @@ -27,6 +27,7 @@ import org.apache.baremaps.geoparquet.GeoParquetMetadata.Column; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; @@ -48,7 +49,7 @@ void testWriteAndReadGeoParquet() throws IOException { try { // Define the Parquet schema - MessageType schema = Types.buildMessage() + MessageType type = Types.buildMessage() .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("name") .required(PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("city") .optional(PrimitiveTypeName.BINARY).named("geometry") @@ -81,12 +82,16 @@ void testWriteAndReadGeoParquet() throws IOException { Point point = geometryFactory.createPoint(new Coordinate(1.0, 2.0)); // Create the GeoParquetWriter - try (GeoParquetWriter writer = new GeoParquetWriter(outputPath, schema, metadata)) { + try (ParquetWriter writer = GeoParquetWriter.builder(outputPath) + .withType(type) + .withMetadata(metadata) + .build()) { + // Create a GeoParquetGroup and write it GeoParquetSchema geoParquetSchema = - GeoParquetGroupFactory.createGeoParquetSchema(schema, metadata); + GeoParquetGroupFactory.createGeoParquetSchema(type, metadata); GeoParquetGroup group = - new GeoParquetGroup(schema.asGroupType(), metadata, geoParquetSchema); + new GeoParquetGroup(type.asGroupType(), metadata, geoParquetSchema); group.add("name", "Test Point"); group.add("city", "Test City"); group.add("geometry", point); From 3987f2c30129fb098492bd27a14077d0a6fc75f5 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Thu, 24 Oct 2024 22:59:26 +0200 Subject: [PATCH 5/6] Fix problem when writing groups --- .../geoparquet/GeoParquetWriteSupport.java | 18 ++++++----- .../baremaps/geoparquet/GeoParquetWriter.java | 2 +- .../geoparquet/GeoParquetWriterTest.java | 31 ++++++++++++++++++- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java index 362d898a8..f23dae76d 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java @@ -51,10 +51,8 @@ public GeoParquetWriteSupport(MessageType schema, GeoParquetMetadata metadata) { @Override public WriteContext init(Configuration configuration) { Map extraMetadata = new HashMap<>(); - // Serialize the GeoParquet metadata to JSON and add it to the file metadata String geoMetadataJson = serializeMetadata(metadata); extraMetadata.put("geo", geoMetadataJson); - return new WriteContext(schema, extraMetadata); } @@ -65,11 +63,15 @@ public void prepareForWrite(RecordConsumer recordConsumer) { @Override public void write(GeoParquetGroup group) { - writeGroup(group, schema); + recordConsumer.startMessage(); + writeGroup(group, schema, true); + recordConsumer.endMessage(); } - private void writeGroup(GeoParquetGroup group, GroupType groupType) { - recordConsumer.startMessage(); + private void writeGroup(GeoParquetGroup group, GroupType groupType, boolean isRoot) { + if (!isRoot) { + recordConsumer.startGroup(); + } for (int i = 0; i < groupType.getFieldCount(); i++) { Type fieldType = groupType.getType(i); String fieldName = fieldType.getName(); @@ -84,12 +86,14 @@ private void writeGroup(GeoParquetGroup group, GroupType groupType) { writePrimitive(value, fieldType.asPrimitiveType()); } else { GeoParquetGroup childGroup = group.getGroup(i, j); - writeGroup(childGroup, fieldType.asGroupType()); + writeGroup(childGroup, fieldType.asGroupType(), false); } recordConsumer.endField(fieldName, i); } } - recordConsumer.endMessage(); + if (!isRoot) { + recordConsumer.endGroup(); + } } private void writePrimitive(Object value, PrimitiveType primitiveType) { diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java index 16243eac8..72852a4cf 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java @@ -53,7 +53,7 @@ public GeoParquetWriter.Builder withType(MessageType type) { return this; } - public GeoParquetWriter.Builder withMetadata(GeoParquetMetadata metadata) { + public GeoParquetWriter.Builder withGeoParquetMetadata(GeoParquetMetadata metadata) { this.metadata = metadata; return this; } diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java index 9dbeae9d3..ae2bf94f6 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.baremaps.geoparquet.GeoParquetMetadata.Column; +import org.apache.baremaps.testing.TestFiles; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetWriter; @@ -84,7 +85,7 @@ void testWriteAndReadGeoParquet() throws IOException { // Create the GeoParquetWriter try (ParquetWriter writer = GeoParquetWriter.builder(outputPath) .withType(type) - .withMetadata(metadata) + .withGeoParquetMetadata(metadata) .build()) { // Create a GeoParquetGroup and write it @@ -118,4 +119,32 @@ void testWriteAndReadGeoParquet() throws IOException { } } + @Test + @Tag("integration") + void copy() throws IOException { + Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri()); + + Configuration conf = new Configuration(); + Path outputPath = new Path("target/test-output/geoparquet-copy.parquet"); + + try { + GeoParquetReader reader = new GeoParquetReader(geoParquet, null, conf); + GeoParquetWriter.Builder builder = GeoParquetWriter.builder(outputPath); + ParquetWriter writer = builder.withType(reader.getParquetSchema()) + .withGeoParquetMetadata(reader.getGeoParquetMetadata()).build(); + reader.read().forEach(group -> { + System.out.println(group); + try { + writer.write(group); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } catch (IOException e) { + e.printStackTrace(); + } finally { + outputPath.getFileSystem(conf).delete(outputPath, false); + } + } + } From 8fcc1a7dd0631c6ad697b385469ce7c8af4476e0 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Tue, 5 Nov 2024 21:38:22 +0100 Subject: [PATCH 6/6] Address the review comments --- .../geoparquet/GeoParquetWriteSupport.java | 3 ++- .../baremaps/geoparquet/GeoParquetWriter.java | 22 ++++++++++++++++ .../geoparquet/GeoParquetWriterTest.java | 26 +++++++++++-------- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java index f23dae76d..b6be38165 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriteSupport.java @@ -32,9 +32,10 @@ */ public class GeoParquetWriteSupport extends WriteSupport { - private RecordConsumer recordConsumer; + private Configuration configuration; private final MessageType schema; private final GeoParquetMetadata metadata; + private RecordConsumer recordConsumer; private final ObjectMapper objectMapper = new ObjectMapper(); /** diff --git a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java index 72852a4cf..e2e4292a8 100644 --- a/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java +++ b/baremaps-geoparquet/src/main/java/org/apache/baremaps/geoparquet/GeoParquetWriter.java @@ -48,26 +48,48 @@ private Builder(Path file) { super(file); } + /** + * Replace the message type with the specified one. + * + * @param type the message type + * @return the builder + */ public GeoParquetWriter.Builder withType(MessageType type) { this.type = type; return this; } + /** + * Replace the metadata with the specified one. + * + * @param metadata the metadata + * @return the builder + */ public GeoParquetWriter.Builder withGeoParquetMetadata(GeoParquetMetadata metadata) { this.metadata = metadata; return this; } + /** + * {@inheritDoc} + */ @Override protected WriteSupport getWriteSupport(Configuration conf) { + // We don't need access to the hadoop configuration for now return getWriteSupport((ParquetConfiguration) null); } + /** + * {@inheritDoc} + */ @Override protected WriteSupport getWriteSupport(ParquetConfiguration conf) { return new GeoParquetWriteSupport(type, metadata); } + /** + * {@inheritDoc} + */ @Override protected GeoParquetWriter.Builder self() { return this; diff --git a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java index ae2bf94f6..f1c577fd7 100644 --- a/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java +++ b/baremaps-geoparquet/src/test/java/org/apache/baremaps/geoparquet/GeoParquetWriterTest.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.baremaps.geoparquet.GeoParquetMetadata.Column; @@ -121,30 +122,33 @@ void testWriteAndReadGeoParquet() throws IOException { @Test @Tag("integration") - void copy() throws IOException { + void copyGeoParquetData() throws IOException { Path geoParquet = new Path(TestFiles.GEOPARQUET.toUri()); Configuration conf = new Configuration(); Path outputPath = new Path("target/test-output/geoparquet-copy.parquet"); try { + // Write the GeoParquet file GeoParquetReader reader = new GeoParquetReader(geoParquet, null, conf); GeoParquetWriter.Builder builder = GeoParquetWriter.builder(outputPath); ParquetWriter writer = builder.withType(reader.getParquetSchema()) .withGeoParquetMetadata(reader.getGeoParquetMetadata()).build(); - reader.read().forEach(group -> { - System.out.println(group); - try { - writer.write(group); - } catch (IOException e) { - e.printStackTrace(); - } - }); - } catch (IOException e) { - e.printStackTrace(); + Iterator iterator = reader.read().iterator(); + while (iterator.hasNext()) { + writer.write(iterator.next()); + } + writer.close(); + + // Read the copied file + GeoParquetReader copiedReader = new GeoParquetReader(outputPath, null, conf); + assertEquals(5, copiedReader.read().count()); } finally { outputPath.getFileSystem(conf).delete(outputPath, false); } + + + } }