Skip to content

Commit a3584e5

Browse files
authored
Add Enum type support to arrow-avro and Minor Decimal type fix (#7852)
# Which issue does this PR close? - Part of #4886 - Related to #6965 # Rationale for this change The `arrow-avro` crate currently lacks support for the Avro `enum` type, which is a standard and commonly used type in Avro schemas. This omission prevents users from reading Avro files containing enums, limiting the crate's utility. This change introduces support for decoding Avro enums by mapping them to the Arrow `DictionaryArray` type. This is a logical and efficient representation. Implementing this feature brings the `arrow-avro` crate closer to full Avro specification compliance and makes it more robust for real-world use cases. # What changes are included in this PR? This PR introduces comprehensive support for Avro enum decoding along with a minor Avro decimal decoding fix. The key changes are: 1. **Schema Parsing (`codec.rs`):** * A new `Codec::Enum(Arc<[String]>)` variant was added to represent a parsed enum and its associated symbols. * The `make_data_type` function now parses `ComplexType::Enum` schemas. It also stores the original symbols as a JSON string in the `Field`'s metadata under the key `"avro.enum.symbols"` to ensure schema fidelity and enable lossless round-trip conversions. * The `Codec::data_type` method was updated to map the internal `Codec::Enum` to the corresponding Arrow `DataType::Dictionary(Box<Int32>, Box<Utf8>)`. 2. **Decoding Logic (`reader/record.rs`):** * A new `Decoder::Enum(Vec<i32>, Arc<[String]>)` variant was added to manage the state of decoding enum values. * The `Decoder` was enhanced to create, decode, and flush `Enum` types: * `try_new` creates the decoder. * `decode` reads the Avro `int` index from the byte buffer. * `flush` constructs the final `DictionaryArray<Int32Type>` using the collected indices as keys and the stored symbols as the dictionary values. * `append_null` was extended to handle nullable enums. 3. **Minor Decimal Type Decoding Fix (`codec.rs`)** * A minor decimal decoding fix was implemented in `make_data_type` due to the `(Some("decimal"), c @ Codec::Fixed(sz))` branch of `match (t.attributes.logical_type, &mut field.codec)` not being reachable. This issue was caught by the new decimal integration tests in `arrow-avro/src/reader/mod.rs`. # Are these changes tested? * Yes, test coverage was provided for the new `Enum` type: * New unit tests were added to `record.rs` to specifically validate both non-nullable and nullable enum decoding logic. * The existing integration test suite in `arrow-avro/src/reader/mod.rs` was used to validate the end-to-end functionality with a new `avro/simple_enum.avro` test case, ensuring compatibility with the overall reader infrastructure. * New tests were also included for the `Decimal` and `Fixed` types: * This integration test suite was also extended to include tests for `avro/simple_fixed.avro`, `avro/fixed_length_decimal.avro`, `avro/fixed_length_decimal_legacy.avro`, `avro/int32_decimal.avro`, `avro/int64_decimal.avro` # Are there any user-facing changes? N/A
1 parent 6de3881 commit a3584e5

File tree

3 files changed

+273
-43
lines changed

3 files changed

+273
-43
lines changed

arrow-avro/src/codec.rs

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ pub enum Codec {
203203
Decimal(usize, Option<usize>, Option<usize>),
204204
/// Represents Avro Uuid type, a FixedSizeBinary with a length of 16
205205
Uuid,
206+
/// Represents an Avro enum, maps to Arrow's Dictionary(Int32, Utf8) type.
207+
///
208+
/// The enclosed value contains the enum's symbols.
209+
Enum(Arc<[String]>),
206210
/// Represents Avro array type, maps to Arrow's List data type
207211
List(Arc<AvroDataType>),
208212
/// Represents Avro record type, maps to Arrow's Struct data type
@@ -253,6 +257,9 @@ impl Codec {
253257
}
254258
}
255259
Self::Uuid => DataType::FixedSizeBinary(16),
260+
Self::Enum(_) => {
261+
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
262+
}
256263
Self::List(f) => {
257264
DataType::List(Arc::new(f.field_with_name(Field::LIST_FIELD_DEFAULT_NAME)))
258265
}
@@ -441,7 +448,6 @@ fn make_data_type<'a>(
441448
})
442449
})
443450
.collect::<Result<_, ArrowError>>()?;
444-
445451
let field = AvroDataType {
446452
nullability: None,
447453
codec: Codec::Struct(fields),
@@ -463,17 +469,47 @@ fn make_data_type<'a>(
463469
let size = f.size.try_into().map_err(|e| {
464470
ArrowError::ParseError(format!("Overflow converting size to i32: {e}"))
465471
})?;
472+
let md = f.attributes.field_metadata();
473+
let field = match f.attributes.logical_type {
474+
Some("decimal") => {
475+
let (precision, scale, _) =
476+
parse_decimal_attributes(&f.attributes, Some(size as usize), true)?;
477+
AvroDataType {
478+
nullability: None,
479+
metadata: md,
480+
codec: Codec::Decimal(precision, Some(scale), Some(size as usize)),
481+
}
482+
}
483+
_ => AvroDataType {
484+
nullability: None,
485+
metadata: md,
486+
codec: Codec::Fixed(size),
487+
},
488+
};
489+
resolver.register(f.name, namespace, field.clone());
490+
Ok(field)
491+
}
492+
ComplexType::Enum(e) => {
493+
let namespace = e.namespace.or(namespace);
494+
let symbols = e
495+
.symbols
496+
.iter()
497+
.map(|s| s.to_string())
498+
.collect::<Arc<[String]>>();
499+
500+
let mut metadata = e.attributes.field_metadata();
501+
let symbols_json = serde_json::to_string(&e.symbols).map_err(|e| {
502+
ArrowError::ParseError(format!("Failed to serialize enum symbols: {e}"))
503+
})?;
504+
metadata.insert("avro.enum.symbols".to_string(), symbols_json);
466505
let field = AvroDataType {
467506
nullability: None,
468-
metadata: f.attributes.field_metadata(),
469-
codec: Codec::Fixed(size),
507+
metadata,
508+
codec: Codec::Enum(symbols),
470509
};
471-
resolver.register(f.name, namespace, field.clone());
510+
resolver.register(e.name, namespace, field.clone());
472511
Ok(field)
473512
}
474-
ComplexType::Enum(e) => Err(ArrowError::NotYetImplemented(format!(
475-
"Enum of {e:?} not currently supported"
476-
))),
477513
ComplexType::Map(m) => {
478514
let val = make_data_type(&m.values, namespace, resolver, use_utf8view)?;
479515
Ok(AvroDataType {
@@ -493,27 +529,10 @@ fn make_data_type<'a>(
493529

494530
// https://avro.apache.org/docs/1.11.1/specification/#logical-types
495531
match (t.attributes.logical_type, &mut field.codec) {
496-
(Some("decimal"), c) => match *c {
497-
Codec::Fixed(sz_val) => {
498-
let (prec, sc, size_opt) =
499-
parse_decimal_attributes(&t.attributes, Some(sz_val as usize), true)?;
500-
let final_sz = if let Some(sz_actual) = size_opt {
501-
sz_actual
502-
} else {
503-
sz_val as usize
504-
};
505-
*c = Codec::Decimal(prec, Some(sc), Some(final_sz));
506-
}
507-
Codec::Binary => {
508-
let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
509-
*c = Codec::Decimal(prec, Some(sc), None);
510-
}
511-
_ => {
512-
return Err(ArrowError::SchemaError(format!(
513-
"Decimal logical type can only be backed by Fixed or Bytes, found {c:?}"
514-
)))
515-
}
516-
},
532+
(Some("decimal"), c @ Codec::Binary) => {
533+
let (prec, sc, _) = parse_decimal_attributes(&t.attributes, None, false)?;
534+
*c = Codec::Decimal(prec, Some(sc), None);
535+
}
517536
(Some("date"), c @ Codec::Int32) => *c = Codec::Date32,
518537
(Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis,
519538
(Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros,

arrow-avro/src/reader/mod.rs

Lines changed: 141 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,9 @@ mod test {
121121
use crate::reader::record::RecordDecoder;
122122
use crate::reader::{read_blocks, read_header};
123123
use crate::test_util::arrow_test_data;
124+
use arrow_array::types::Int32Type;
124125
use arrow_array::*;
125-
use arrow_schema::{DataType, Field};
126+
use arrow_schema::{DataType, Field, Schema};
126127
use std::collections::HashMap;
127128
use std::fs::File;
128129
use std::io::BufReader;
@@ -150,20 +151,26 @@ mod test {
150151
for result in read_blocks(reader) {
151152
let block = result.unwrap();
152153
assert_eq!(block.sync, header.sync());
153-
if let Some(c) = compression {
154-
let decompressed = c.decompress(&block.data).unwrap();
155154

155+
let mut decode_data = |data: &[u8]| {
156156
let mut offset = 0;
157157
let mut remaining = block.count;
158158
while remaining > 0 {
159-
let to_read = remaining.max(batch_size);
160-
offset += decoder
161-
.decode(&decompressed[offset..], block.count)
162-
.unwrap();
163-
159+
let to_read = remaining.min(batch_size);
160+
if to_read == 0 {
161+
break;
162+
}
163+
offset += decoder.decode(&data[offset..], to_read).unwrap();
164164
remaining -= to_read;
165165
}
166-
assert_eq!(offset, decompressed.len());
166+
assert_eq!(offset, data.len());
167+
};
168+
169+
if let Some(c) = compression {
170+
let decompressed = c.decompress(&block.data).unwrap();
171+
decode_data(&decompressed);
172+
} else {
173+
decode_data(&block.data);
167174
}
168175
}
169176
decoder.flush().unwrap()
@@ -308,4 +315,129 @@ mod test {
308315
assert_eq!(read_file(&file, 3), expected);
309316
}
310317
}
318+
319+
#[test]
320+
fn test_decimal() {
321+
let files = [
322+
("avro/fixed_length_decimal.avro", 25, 2),
323+
("avro/fixed_length_decimal_legacy.avro", 13, 2),
324+
("avro/int32_decimal.avro", 4, 2),
325+
("avro/int64_decimal.avro", 10, 2),
326+
];
327+
let decimal_values: Vec<i128> = (1..=24).map(|n| n as i128 * 100).collect();
328+
for (file, precision, scale) in files {
329+
let file_path = arrow_test_data(file);
330+
let actual_batch = read_file(&file_path, 8);
331+
let expected_array = Decimal128Array::from_iter_values(decimal_values.clone())
332+
.with_precision_and_scale(precision, scale)
333+
.unwrap();
334+
let mut meta = HashMap::new();
335+
meta.insert("precision".to_string(), precision.to_string());
336+
meta.insert("scale".to_string(), scale.to_string());
337+
let field_with_meta = Field::new("value", DataType::Decimal128(precision, scale), true)
338+
.with_metadata(meta);
339+
let expected_schema = Arc::new(Schema::new(vec![field_with_meta]));
340+
let expected_batch =
341+
RecordBatch::try_new(expected_schema.clone(), vec![Arc::new(expected_array)])
342+
.expect("Failed to build expected RecordBatch");
343+
assert_eq!(
344+
actual_batch, expected_batch,
345+
"Decoded RecordBatch does not match the expected Decimal128 data for file {file}"
346+
);
347+
let actual_batch_small = read_file(&file_path, 3);
348+
assert_eq!(
349+
actual_batch_small,
350+
expected_batch,
351+
"Decoded RecordBatch does not match the expected Decimal128 data for file {file} with batch size 3"
352+
);
353+
}
354+
}
355+
356+
#[test]
357+
fn test_simple() {
358+
let tests = [
359+
("avro/simple_enum.avro", 4, build_expected_enum(), 2),
360+
("avro/simple_fixed.avro", 2, build_expected_fixed(), 1),
361+
];
362+
363+
fn build_expected_enum() -> RecordBatch {
364+
// Build the DictionaryArrays for f1, f2, f3
365+
let keys_f1 = Int32Array::from(vec![0, 1, 2, 3]);
366+
let vals_f1 = StringArray::from(vec!["a", "b", "c", "d"]);
367+
let f1_dict =
368+
DictionaryArray::<Int32Type>::try_new(keys_f1, Arc::new(vals_f1)).unwrap();
369+
let keys_f2 = Int32Array::from(vec![2, 3, 0, 1]);
370+
let vals_f2 = StringArray::from(vec!["e", "f", "g", "h"]);
371+
let f2_dict =
372+
DictionaryArray::<Int32Type>::try_new(keys_f2, Arc::new(vals_f2)).unwrap();
373+
let keys_f3 = Int32Array::from(vec![Some(1), Some(2), None, Some(0)]);
374+
let vals_f3 = StringArray::from(vec!["i", "j", "k"]);
375+
let f3_dict =
376+
DictionaryArray::<Int32Type>::try_new(keys_f3, Arc::new(vals_f3)).unwrap();
377+
let dict_type =
378+
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
379+
let mut md_f1 = HashMap::new();
380+
md_f1.insert(
381+
"avro.enum.symbols".to_string(),
382+
r#"["a","b","c","d"]"#.to_string(),
383+
);
384+
let f1_field = Field::new("f1", dict_type.clone(), false).with_metadata(md_f1);
385+
let mut md_f2 = HashMap::new();
386+
md_f2.insert(
387+
"avro.enum.symbols".to_string(),
388+
r#"["e","f","g","h"]"#.to_string(),
389+
);
390+
let f2_field = Field::new("f2", dict_type.clone(), false).with_metadata(md_f2);
391+
let mut md_f3 = HashMap::new();
392+
md_f3.insert(
393+
"avro.enum.symbols".to_string(),
394+
r#"["i","j","k"]"#.to_string(),
395+
);
396+
let f3_field = Field::new("f3", dict_type.clone(), true).with_metadata(md_f3);
397+
let expected_schema = Arc::new(Schema::new(vec![f1_field, f2_field, f3_field]));
398+
RecordBatch::try_new(
399+
expected_schema,
400+
vec![
401+
Arc::new(f1_dict) as Arc<dyn Array>,
402+
Arc::new(f2_dict) as Arc<dyn Array>,
403+
Arc::new(f3_dict) as Arc<dyn Array>,
404+
],
405+
)
406+
.unwrap()
407+
}
408+
409+
fn build_expected_fixed() -> RecordBatch {
410+
let f1 =
411+
FixedSizeBinaryArray::try_from_iter(vec![b"abcde", b"12345"].into_iter()).unwrap();
412+
let f2 =
413+
FixedSizeBinaryArray::try_from_iter(vec![b"fghijklmno", b"1234567890"].into_iter())
414+
.unwrap();
415+
let f3 = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
416+
vec![Some(b"ABCDEF" as &[u8]), None].into_iter(),
417+
6,
418+
)
419+
.unwrap();
420+
let expected_schema = Arc::new(Schema::new(vec![
421+
Field::new("f1", DataType::FixedSizeBinary(5), false),
422+
Field::new("f2", DataType::FixedSizeBinary(10), false),
423+
Field::new("f3", DataType::FixedSizeBinary(6), true),
424+
]));
425+
RecordBatch::try_new(
426+
expected_schema,
427+
vec![
428+
Arc::new(f1) as Arc<dyn Array>,
429+
Arc::new(f2) as Arc<dyn Array>,
430+
Arc::new(f3) as Arc<dyn Array>,
431+
],
432+
)
433+
.unwrap()
434+
}
435+
for (file_name, batch_size, expected, alt_batch_size) in tests {
436+
let file = arrow_test_data(file_name);
437+
let actual = read_file(&file, batch_size);
438+
assert_eq!(actual, expected);
439+
let actual2 = read_file(&file, alt_batch_size);
440+
assert_eq!(actual2, expected);
441+
}
442+
}
311443
}

0 commit comments

Comments
 (0)