Skip to content

Commit b4ac876

Browse files
authored
fix: [iceberg] Add LogicalTypeAnnotation in ParquetColumnSpec (#2000)
In order to pass and reassemble the `ColumnDescriptor` from Iceberg to Comet, we need to include `LogicalTypeAnnotation` too.
1 parent 76cf5af commit b4ac876

File tree

3 files changed

+319
-6
lines changed

3 files changed

+319
-6
lines changed

common/src/main/java/org/apache/comet/parquet/ParquetColumnSpec.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,45 @@
1919

2020
package org.apache.comet.parquet;
2121

22+
import java.util.Map;
23+
2224
public class ParquetColumnSpec {
2325

26+
private final int fieldId;
2427
private final String[] path;
2528
private final String physicalType;
2629
private final int typeLength;
2730
private final boolean isRepeated;
2831
private final int maxDefinitionLevel;
2932
private final int maxRepetitionLevel;
3033

34+
// Logical type info
35+
private String logicalTypeName;
36+
private Map<String, String> logicalTypeParams;
37+
3138
public ParquetColumnSpec(
39+
int fieldId,
3240
String[] path,
3341
String physicalType,
3442
int typeLength,
3543
boolean isRepeated,
3644
int maxDefinitionLevel,
37-
int maxRepetitionLevel) {
45+
int maxRepetitionLevel,
46+
String logicalTypeName,
47+
Map<String, String> logicalTypeParams) {
48+
this.fieldId = fieldId;
3849
this.path = path;
3950
this.physicalType = physicalType;
4051
this.typeLength = typeLength;
4152
this.isRepeated = isRepeated;
4253
this.maxDefinitionLevel = maxDefinitionLevel;
4354
this.maxRepetitionLevel = maxRepetitionLevel;
55+
this.logicalTypeName = logicalTypeName;
56+
this.logicalTypeParams = logicalTypeParams;
57+
}
58+
59+
public int getFieldId() {
60+
return fieldId;
4461
}
4562

4663
public String[] getPath() {
@@ -66,4 +83,12 @@ public int getMaxRepetitionLevel() {
6683
public int getMaxDefinitionLevel() {
6784
return maxDefinitionLevel;
6885
}
86+
87+
public String getLogicalTypeName() {
88+
return logicalTypeName;
89+
}
90+
91+
public Map<String, String> getLogicalTypeParams() {
92+
return logicalTypeParams;
93+
}
6994
}

common/src/main/java/org/apache/comet/parquet/Utils.java

Lines changed: 144 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121

2222
import org.apache.parquet.column.ColumnDescriptor;
2323
import org.apache.parquet.schema.LogicalTypeAnnotation;
24-
import org.apache.parquet.schema.MessageType;
2524
import org.apache.parquet.schema.PrimitiveType;
2625
import org.apache.parquet.schema.Type;
26+
import org.apache.parquet.schema.Types;
2727
import org.apache.spark.sql.types.*;
2828

2929
import org.apache.comet.CometSchemaImporter;
@@ -290,15 +290,154 @@ public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpe
290290
}
291291

292292
String name = columnSpec.getPath()[columnSpec.getPath().length - 1];
293+
// Reconstruct the logical type from parameters
294+
LogicalTypeAnnotation logicalType = null;
295+
if (columnSpec.getLogicalTypeName() != null) {
296+
logicalType =
297+
reconstructLogicalType(
298+
columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams());
299+
}
293300

294301
PrimitiveType primitiveType;
295302
if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
296-
primitiveType = new PrimitiveType(repetition, primType, columnSpec.getTypeLength(), name);
303+
primitiveType =
304+
Types.primitive(primType, repetition)
305+
.length(columnSpec.getTypeLength())
306+
.as(logicalType)
307+
.id(columnSpec.getFieldId())
308+
.named(name);
297309
} else {
298-
primitiveType = new PrimitiveType(repetition, primType, name);
310+
primitiveType =
311+
Types.primitive(primType, repetition)
312+
.as(logicalType)
313+
.id(columnSpec.getFieldId())
314+
.named(name);
299315
}
300316

301-
MessageType schema = new MessageType("root", primitiveType);
302-
return schema.getColumnDescription(columnSpec.getPath());
317+
return new ColumnDescriptor(
318+
columnSpec.getPath(),
319+
primitiveType,
320+
columnSpec.getMaxRepetitionLevel(),
321+
columnSpec.getMaxDefinitionLevel());
322+
}
323+
324+
private static LogicalTypeAnnotation reconstructLogicalType(
325+
String logicalTypeName, java.util.Map<String, String> params) {
326+
327+
switch (logicalTypeName) {
328+
// MAP
329+
case "MapLogicalTypeAnnotation":
330+
return LogicalTypeAnnotation.mapType();
331+
332+
// LIST
333+
case "ListLogicalTypeAnnotation":
334+
return LogicalTypeAnnotation.listType();
335+
336+
// STRING
337+
case "StringLogicalTypeAnnotation":
338+
return LogicalTypeAnnotation.stringType();
339+
340+
// MAP_KEY_VALUE
341+
case "MapKeyValueLogicalTypeAnnotation":
342+
return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();
343+
344+
// ENUM
345+
case "EnumLogicalTypeAnnotation":
346+
return LogicalTypeAnnotation.enumType();
347+
348+
// DECIMAL
349+
case "DecimalLogicalTypeAnnotation":
350+
if (!params.containsKey("scale") || !params.containsKey("precision")) {
351+
throw new IllegalArgumentException(
352+
"Missing required parameters for DecimalLogicalTypeAnnotation: " + params);
353+
}
354+
int scale = Integer.parseInt(params.get("scale"));
355+
int precision = Integer.parseInt(params.get("precision"));
356+
return LogicalTypeAnnotation.decimalType(scale, precision);
357+
358+
// DATE
359+
case "DateLogicalTypeAnnotation":
360+
return LogicalTypeAnnotation.dateType();
361+
362+
// TIME
363+
case "TimeLogicalTypeAnnotation":
364+
if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) {
365+
throw new IllegalArgumentException(
366+
"Missing required parameters for TimeLogicalTypeAnnotation: " + params);
367+
}
368+
369+
boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
370+
String timeUnitStr = params.get("unit");
371+
372+
LogicalTypeAnnotation.TimeUnit timeUnit;
373+
switch (timeUnitStr) {
374+
case "MILLIS":
375+
timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS;
376+
break;
377+
case "MICROS":
378+
timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS;
379+
break;
380+
case "NANOS":
381+
timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS;
382+
break;
383+
default:
384+
throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr);
385+
}
386+
return LogicalTypeAnnotation.timeType(isUTC, timeUnit);
387+
388+
// TIMESTAMP
389+
case "TimestampLogicalTypeAnnotation":
390+
if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) {
391+
throw new IllegalArgumentException(
392+
"Missing required parameters for TimestampLogicalTypeAnnotation: " + params);
393+
}
394+
boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
395+
String unitStr = params.get("unit");
396+
397+
LogicalTypeAnnotation.TimeUnit unit;
398+
switch (unitStr) {
399+
case "MILLIS":
400+
unit = LogicalTypeAnnotation.TimeUnit.MILLIS;
401+
break;
402+
case "MICROS":
403+
unit = LogicalTypeAnnotation.TimeUnit.MICROS;
404+
break;
405+
case "NANOS":
406+
unit = LogicalTypeAnnotation.TimeUnit.NANOS;
407+
break;
408+
default:
409+
throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr);
410+
}
411+
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit);
412+
413+
// INTEGER
414+
case "IntLogicalTypeAnnotation":
415+
if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) {
416+
throw new IllegalArgumentException(
417+
"Missing required parameters for IntLogicalTypeAnnotation: " + params);
418+
}
419+
boolean isSigned = Boolean.parseBoolean(params.get("isSigned"));
420+
int bitWidth = Integer.parseInt(params.get("bitWidth"));
421+
return LogicalTypeAnnotation.intType(bitWidth, isSigned);
422+
423+
// JSON
424+
case "JsonLogicalTypeAnnotation":
425+
return LogicalTypeAnnotation.jsonType();
426+
427+
// BSON
428+
case "BsonLogicalTypeAnnotation":
429+
return LogicalTypeAnnotation.bsonType();
430+
431+
// UUID
432+
case "UUIDLogicalTypeAnnotation":
433+
return LogicalTypeAnnotation.uuidType();
434+
435+
// INTERVAL
436+
case "IntervalLogicalTypeAnnotation":
437+
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
438+
439+
default:
440+
throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName);
441+
}
303442
}
304443
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet;
21+
22+
import java.util.Collections;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
26+
import org.junit.Test;
27+
28+
import org.apache.parquet.column.ColumnDescriptor;
29+
import org.apache.parquet.schema.LogicalTypeAnnotation;
30+
import org.apache.parquet.schema.PrimitiveType;
31+
32+
import static org.junit.Assert.*;
33+
34+
public class TestUtils {
35+
36+
@Test
37+
public void testBuildColumnDescriptorWithTimestamp() {
38+
Map<String, String> params = new HashMap<>();
39+
params.put("isAdjustedToUTC", "true");
40+
params.put("unit", "MICROS");
41+
42+
ParquetColumnSpec spec =
43+
new ParquetColumnSpec(
44+
10,
45+
new String[] {"event_time"},
46+
"INT64",
47+
0,
48+
false,
49+
0,
50+
0,
51+
"TimestampLogicalTypeAnnotation",
52+
params);
53+
54+
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(spec);
55+
assertNotNull(descriptor);
56+
57+
PrimitiveType primitiveType = descriptor.getPrimitiveType();
58+
assertEquals(PrimitiveType.PrimitiveTypeName.INT64, primitiveType.getPrimitiveTypeName());
59+
assertTrue(
60+
primitiveType.getLogicalTypeAnnotation()
61+
instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation);
62+
63+
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation ts =
64+
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
65+
primitiveType.getLogicalTypeAnnotation();
66+
assertTrue(ts.isAdjustedToUTC());
67+
assertEquals(LogicalTypeAnnotation.TimeUnit.MICROS, ts.getUnit());
68+
}
69+
70+
@Test
71+
public void testBuildColumnDescriptorWithDecimal() {
72+
Map<String, String> params = new HashMap<>();
73+
params.put("precision", "10");
74+
params.put("scale", "2");
75+
76+
ParquetColumnSpec spec =
77+
new ParquetColumnSpec(
78+
11,
79+
new String[] {"price"},
80+
"FIXED_LEN_BYTE_ARRAY",
81+
5,
82+
false,
83+
0,
84+
0,
85+
"DecimalLogicalTypeAnnotation",
86+
params);
87+
88+
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(spec);
89+
PrimitiveType primitiveType = descriptor.getPrimitiveType();
90+
assertEquals(
91+
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, primitiveType.getPrimitiveTypeName());
92+
93+
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation dec =
94+
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation)
95+
primitiveType.getLogicalTypeAnnotation();
96+
assertEquals(10, dec.getPrecision());
97+
assertEquals(2, dec.getScale());
98+
}
99+
100+
@Test
101+
public void testBuildColumnDescriptorWithIntLogicalType() {
102+
Map<String, String> params = new HashMap<>();
103+
params.put("bitWidth", "32");
104+
params.put("isSigned", "true");
105+
106+
ParquetColumnSpec spec =
107+
new ParquetColumnSpec(
108+
12,
109+
new String[] {"count"},
110+
"INT32",
111+
0,
112+
false,
113+
0,
114+
0,
115+
"IntLogicalTypeAnnotation",
116+
params);
117+
118+
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(spec);
119+
PrimitiveType primitiveType = descriptor.getPrimitiveType();
120+
assertEquals(PrimitiveType.PrimitiveTypeName.INT32, primitiveType.getPrimitiveTypeName());
121+
122+
LogicalTypeAnnotation.IntLogicalTypeAnnotation ann =
123+
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) primitiveType.getLogicalTypeAnnotation();
124+
assertEquals(32, ann.getBitWidth());
125+
assertTrue(ann.isSigned());
126+
}
127+
128+
@Test
129+
public void testBuildColumnDescriptorWithStringLogicalType() {
130+
ParquetColumnSpec spec =
131+
new ParquetColumnSpec(
132+
13,
133+
new String[] {"name"},
134+
"BINARY",
135+
0,
136+
false,
137+
0,
138+
0,
139+
"StringLogicalTypeAnnotation",
140+
Collections.emptyMap());
141+
142+
ColumnDescriptor descriptor = Utils.buildColumnDescriptor(spec);
143+
PrimitiveType primitiveType = descriptor.getPrimitiveType();
144+
assertEquals(PrimitiveType.PrimitiveTypeName.BINARY, primitiveType.getPrimitiveTypeName());
145+
assertTrue(
146+
primitiveType.getLogicalTypeAnnotation()
147+
instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation);
148+
}
149+
}

0 commit comments

Comments
 (0)