Skip to content

Commit 6e5946f

Browse files
authored
Polishing (#6)
* Adjust variable names * Add test case for optional string codec (WIP) * Refactor validateSchema * Exhaustive list of primitive types in decoder * Exhaustive list of primitive types in encoder (WIP) * Finish exhaustive list of primitive types in encoder
1 parent 3a2dc93 commit 6e5946f

File tree

11 files changed

+1012
-119
lines changed

11 files changed

+1012
-119
lines changed

modules/core/src/main/scala/me/mnedokushev/zio/apache/arrow/core/codec/SchemaEncoder.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,20 @@ object SchemaEncoder {
4747
}
4848

4949
private def encodePrimitive[A](name: String, standardType: StandardType[A], nullable: Boolean): Field = {
50-
def field0(arrowType: ArrowType) =
50+
def namedField(arrowType: ArrowType) =
5151
field(name, arrowType, nullable)
5252

5353
standardType match {
5454
case StandardType.IntType =>
55-
field0(new ArrowType.Int(32, true))
55+
namedField(new ArrowType.Int(32, true))
5656
case StandardType.LongType =>
57-
field0(new ArrowType.Int(64, true))
57+
namedField(new ArrowType.Int(64, true))
5858
case StandardType.FloatType =>
59-
field0(new ArrowType.FloatingPoint(FloatingPointPrecision.HALF))
59+
namedField(new ArrowType.FloatingPoint(FloatingPointPrecision.HALF))
6060
case StandardType.DoubleType =>
61-
field0(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
61+
namedField(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE))
6262
case StandardType.StringType =>
63-
field0(new ArrowType.Utf8)
63+
namedField(new ArrowType.Utf8)
6464
case other =>
6565
throw EncoderError(s"Unsupported ZIO Schema StandardType $other")
6666
}

modules/core/src/main/scala/me/mnedokushev/zio/apache/arrow/core/codec/ValueVectorDecoder.scala

Lines changed: 96 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,25 @@ import org.apache.arrow.vector.complex.{ ListVector, StructVector }
66
import zio._
77
import zio.schema._
88

9+
import java.nio.ByteBuffer
10+
import java.time.{
11+
DayOfWeek,
12+
Instant,
13+
LocalDate,
14+
LocalDateTime,
15+
LocalTime,
16+
Month,
17+
MonthDay,
18+
OffsetDateTime,
19+
OffsetTime,
20+
Period,
21+
Year,
22+
YearMonth,
23+
ZoneId,
24+
ZoneOffset,
25+
ZonedDateTime
26+
}
27+
import java.util.UUID
928
import scala.annotation.tailrec
1029
import scala.collection.immutable.ListMap
1130
import scala.util.control.NonFatal
@@ -127,58 +146,108 @@ object ValueVectorDecoder {
127146
}
128147

129148
@tailrec
130-
private[codec] def decodeSchema[A](name: Option[String], schema0: Schema[A], reader0: FieldReader): DynamicValue = {
131-
val reader = name.fold[FieldReader](reader0.reader())(reader0.reader(_))
149+
private[codec] def decodeSchema[A](name: Option[String], schema: Schema[A], reader: FieldReader): DynamicValue = {
150+
val reader0 = name.fold[FieldReader](reader.reader())(reader.reader(_))
132151

133-
schema0 match {
152+
schema match {
134153
case Schema.Primitive(standardType, _) =>
135-
decodePrimitive(standardType, reader)
154+
decodePrimitive(standardType, reader0)
136155
case record: Schema.Record[A] =>
137-
decodeCaseClass(record.fields, reader)
156+
decodeCaseClass(record.fields, reader0)
138157
case Schema.Sequence(elemSchema, _, _, _, _) =>
139-
decodeSequence(elemSchema, reader)
158+
decodeSequence(elemSchema, reader0)
140159
case lzy: Schema.Lazy[_] =>
141-
decodeSchema(name, lzy.schema, reader0)
160+
decodeSchema(name, lzy.schema, reader)
142161
case other =>
143162
throw DecoderError(s"Unsupported ZIO Schema type $other")
144163
}
145164
}
146165

147-
private[codec] def decodeCaseClass[A](fields: Chunk[Schema.Field[A, _]], reader0: FieldReader): DynamicValue = {
166+
private[codec] def decodeCaseClass[A](fields: Chunk[Schema.Field[A, _]], reader: FieldReader): DynamicValue = {
148167
val values = ListMap(fields.map { case Schema.Field(name, schema0, _, _, _, _) =>
149-
val value: DynamicValue = decodeSchema(Some(name), schema0, reader0)
168+
val value: DynamicValue = decodeSchema(Some(name), schema0, reader)
150169

151170
name -> value
152171
}: _*)
153172

154173
DynamicValue.Record(TypeId.Structural, values)
155174
}
156175

157-
private[codec] def decodeSequence[A](schema0: Schema[A], reader0: FieldReader): DynamicValue = {
176+
private[codec] def decodeSequence[A](schema: Schema[A], reader: FieldReader): DynamicValue = {
158177
val builder = ChunkBuilder.make[DynamicValue]()
159178

160-
while (reader0.next())
161-
if (reader0.isSet)
162-
builder.addOne(decodeSchema(None, schema0, reader0))
179+
while (reader.next())
180+
if (reader.isSet)
181+
builder.addOne(decodeSchema(None, schema, reader))
163182

164183
DynamicValue.Sequence(builder.result())
165184
}
166185

167-
private[codec] def decodePrimitive[A](standardType: StandardType[A], reader0: FieldReader): DynamicValue =
186+
private[codec] def decodePrimitive[A](standardType: StandardType[A], reader: FieldReader): DynamicValue =
168187
standardType match {
169-
case t: StandardType.BoolType.type =>
170-
DynamicValue.Primitive[Boolean](reader0.readBoolean(), t)
171-
case t: StandardType.IntType.type =>
172-
DynamicValue.Primitive[Int](reader0.readInteger(), t)
173-
case t: StandardType.LongType.type =>
174-
DynamicValue.Primitive[Long](reader0.readLong(), t)
175-
case t: StandardType.FloatType.type =>
176-
DynamicValue.Primitive[Float](reader0.readFloat(), t)
177-
case t: StandardType.DoubleType.type =>
178-
DynamicValue.Primitive[Double](reader0.readDouble(), t)
179-
case t: StandardType.StringType.type =>
180-
DynamicValue.Primitive[String](reader0.readText().toString, t)
181-
case other =>
188+
case t: StandardType.StringType.type =>
189+
DynamicValue.Primitive[String](reader.readText().toString, t)
190+
case t: StandardType.BoolType.type =>
191+
DynamicValue.Primitive[Boolean](reader.readBoolean(), t)
192+
case t: StandardType.ByteType.type =>
193+
DynamicValue.Primitive[Byte](reader.readByte(), t)
194+
case t: StandardType.ShortType.type =>
195+
DynamicValue.Primitive[Short](reader.readShort(), t)
196+
case t: StandardType.IntType.type =>
197+
DynamicValue.Primitive[Int](reader.readInteger(), t)
198+
case t: StandardType.LongType.type =>
199+
DynamicValue.Primitive[Long](reader.readLong(), t)
200+
case t: StandardType.FloatType.type =>
201+
DynamicValue.Primitive[Float](reader.readFloat(), t)
202+
case t: StandardType.DoubleType.type =>
203+
DynamicValue.Primitive[Double](reader.readDouble(), t)
204+
case t: StandardType.BinaryType.type =>
205+
DynamicValue.Primitive[Chunk[Byte]](Chunk.fromArray(reader.readByteArray()), t)
206+
case t: StandardType.CharType.type =>
207+
DynamicValue.Primitive[Char](reader.readCharacter(), t)
208+
case t: StandardType.UUIDType.type =>
209+
val bb = ByteBuffer.wrap(reader.readByteArray())
210+
DynamicValue.Primitive[UUID](new UUID(bb.getLong, bb.getLong), t)
211+
case t: StandardType.BigDecimalType.type =>
212+
DynamicValue.Primitive[java.math.BigDecimal](reader.readBigDecimal(), t)
213+
case t: StandardType.BigIntegerType.type =>
214+
DynamicValue.Primitive[java.math.BigInteger](new java.math.BigInteger(reader.readByteArray()), t)
215+
case t: StandardType.DayOfWeekType.type =>
216+
DynamicValue.Primitive[DayOfWeek](DayOfWeek.of(reader.readInteger()), t)
217+
case t: StandardType.MonthType.type =>
218+
DynamicValue.Primitive[Month](Month.of(reader.readInteger()), t)
219+
case t: StandardType.MonthDayType.type =>
220+
val bb = ByteBuffer.allocate(8).putLong(reader.readLong())
221+
DynamicValue.Primitive[MonthDay](MonthDay.of(bb.getInt(0), bb.getInt(4)), t)
222+
case t: StandardType.PeriodType.type =>
223+
val bb = ByteBuffer.wrap(reader.readByteArray())
224+
DynamicValue.Primitive[Period](Period.of(bb.getInt(0), bb.getInt(4), bb.getInt(8)), t)
225+
case t: StandardType.YearType.type =>
226+
DynamicValue.Primitive[Year](Year.of(reader.readInteger()), t)
227+
case t: StandardType.YearMonthType.type =>
228+
val bb = ByteBuffer.allocate(8).putLong(reader.readLong())
229+
DynamicValue.Primitive[YearMonth](YearMonth.of(bb.getInt(0), bb.getInt(4)), t)
230+
case t: StandardType.ZoneIdType.type =>
231+
DynamicValue.Primitive[ZoneId](ZoneId.of(reader.readText().toString), t)
232+
case t: StandardType.ZoneOffsetType.type =>
233+
DynamicValue.Primitive[ZoneOffset](ZoneOffset.of(reader.readText().toString), t)
234+
case t: StandardType.DurationType.type =>
235+
DynamicValue.Primitive[Duration](Duration.fromMillis(reader.readLong()), t)
236+
case t: StandardType.InstantType.type =>
237+
DynamicValue.Primitive[Instant](Instant.ofEpochMilli(reader.readLong()), t)
238+
case t: StandardType.LocalDateType.type =>
239+
DynamicValue.Primitive[LocalDate](LocalDate.parse(reader.readText().toString), t)
240+
case t: StandardType.LocalTimeType.type =>
241+
DynamicValue.Primitive[LocalTime](LocalTime.parse(reader.readText().toString), t)
242+
case t: StandardType.LocalDateTimeType.type =>
243+
DynamicValue.Primitive[LocalDateTime](LocalDateTime.parse(reader.readText().toString), t)
244+
case t: StandardType.OffsetTimeType.type =>
245+
DynamicValue.Primitive[OffsetTime](OffsetTime.parse(reader.readText().toString), t)
246+
case t: StandardType.OffsetDateTimeType.type =>
247+
DynamicValue.Primitive[OffsetDateTime](OffsetDateTime.parse(reader.readText().toString), t)
248+
case t: StandardType.ZonedDateTimeType.type =>
249+
DynamicValue.Primitive[ZonedDateTime](ZonedDateTime.parse(reader.readText().toString), t)
250+
case other =>
182251
throw DecoderError(s"Unsupported ZIO Schema type $other")
183252
}
184253

0 commit comments

Comments
 (0)