Skip to content

Commit ec81db3

Browse files
CurtHagenlocheralambmbrobbel
authored
Add decimal32 and decimal64 support to Parquet, JSON and CSV readers and writers (#7841)
# Which issue does this PR close? - Finishes remaining work and closes #6661. # What changes are included in this PR? This change adds `decimal32` and `decimal64` support to Parquet, JSON and CSV readers and writers. It does not change the current default behavior of the Parquet reader which (in the absence of a specification that says otherwise) will still translate the INT32 physical type with a logical DECIMAL type into a `decimal128` instead of a `decimal32`. # Are these changes tested? Yes. # Are there any user-facing changes? The `decimal32` and `decimal64` types are now supported in Parquet, JSON and CSV readers and writers. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com>
1 parent f39461c commit ec81db3

File tree

15 files changed

+616
-56
lines changed

15 files changed

+616
-56
lines changed

arrow-cast/src/cast/dictionary.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,20 @@ pub(crate) fn cast_to_dictionary<K: ArrowDictionaryKeyType>(
214214
UInt16 => pack_numeric_to_dictionary::<K, UInt16Type>(array, dict_value_type, cast_options),
215215
UInt32 => pack_numeric_to_dictionary::<K, UInt32Type>(array, dict_value_type, cast_options),
216216
UInt64 => pack_numeric_to_dictionary::<K, UInt64Type>(array, dict_value_type, cast_options),
217+
Decimal32(p, s) => pack_decimal_to_dictionary::<K, Decimal32Type>(
218+
array,
219+
dict_value_type,
220+
p,
221+
s,
222+
cast_options,
223+
),
224+
Decimal64(p, s) => pack_decimal_to_dictionary::<K, Decimal64Type>(
225+
array,
226+
dict_value_type,
227+
p,
228+
s,
229+
cast_options,
230+
),
217231
Decimal128(p, s) => pack_decimal_to_dictionary::<K, Decimal128Type>(
218232
array,
219233
dict_value_type,

arrow-csv/src/reader/mod.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,22 @@ fn parse(
654654
let field = &fields[i];
655655
match field.data_type() {
656656
DataType::Boolean => build_boolean_array(line_number, rows, i, null_regex),
657+
DataType::Decimal32(precision, scale) => build_decimal_array::<Decimal32Type>(
658+
line_number,
659+
rows,
660+
i,
661+
*precision,
662+
*scale,
663+
null_regex,
664+
),
665+
DataType::Decimal64(precision, scale) => build_decimal_array::<Decimal64Type>(
666+
line_number,
667+
rows,
668+
i,
669+
*precision,
670+
*scale,
671+
null_regex,
672+
),
657673
DataType::Decimal128(precision, scale) => build_decimal_array::<Decimal128Type>(
658674
line_number,
659675
rows,
@@ -1315,6 +1331,54 @@ mod tests {
13151331
assert_eq!("0.290472", lng.value_as_string(9));
13161332
}
13171333

1334+
#[test]
1335+
fn test_csv_reader_with_decimal_3264() {
1336+
let schema = Arc::new(Schema::new(vec![
1337+
Field::new("city", DataType::Utf8, false),
1338+
Field::new("lat", DataType::Decimal32(9, 6), false),
1339+
Field::new("lng", DataType::Decimal64(16, 6), false),
1340+
]));
1341+
1342+
let file = File::open("test/data/decimal_test.csv").unwrap();
1343+
1344+
let mut csv = ReaderBuilder::new(schema).build(file).unwrap();
1345+
let batch = csv.next().unwrap().unwrap();
1346+
// access data from a primitive array
1347+
let lat = batch
1348+
.column(1)
1349+
.as_any()
1350+
.downcast_ref::<Decimal32Array>()
1351+
.unwrap();
1352+
1353+
assert_eq!("57.653484", lat.value_as_string(0));
1354+
assert_eq!("53.002666", lat.value_as_string(1));
1355+
assert_eq!("52.412811", lat.value_as_string(2));
1356+
assert_eq!("51.481583", lat.value_as_string(3));
1357+
assert_eq!("12.123456", lat.value_as_string(4));
1358+
assert_eq!("50.760000", lat.value_as_string(5));
1359+
assert_eq!("0.123000", lat.value_as_string(6));
1360+
assert_eq!("123.000000", lat.value_as_string(7));
1361+
assert_eq!("123.000000", lat.value_as_string(8));
1362+
assert_eq!("-50.760000", lat.value_as_string(9));
1363+
1364+
let lng = batch
1365+
.column(2)
1366+
.as_any()
1367+
.downcast_ref::<Decimal64Array>()
1368+
.unwrap();
1369+
1370+
assert_eq!("-3.335724", lng.value_as_string(0));
1371+
assert_eq!("-2.179404", lng.value_as_string(1));
1372+
assert_eq!("-1.778197", lng.value_as_string(2));
1373+
assert_eq!("-3.179090", lng.value_as_string(3));
1374+
assert_eq!("-3.179090", lng.value_as_string(4));
1375+
assert_eq!("0.290472", lng.value_as_string(5));
1376+
assert_eq!("0.290472", lng.value_as_string(6));
1377+
assert_eq!("0.290472", lng.value_as_string(7));
1378+
assert_eq!("0.290472", lng.value_as_string(8));
1379+
assert_eq!("0.290472", lng.value_as_string(9));
1380+
}
1381+
13181382
#[test]
13191383
fn test_csv_from_buf_reader() {
13201384
let schema = Schema::new(vec![

arrow-csv/src/writer.rs

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -418,8 +418,8 @@ mod tests {
418418

419419
use crate::ReaderBuilder;
420420
use arrow_array::builder::{
421-
BinaryBuilder, Decimal128Builder, Decimal256Builder, FixedSizeBinaryBuilder,
422-
LargeBinaryBuilder,
421+
BinaryBuilder, Decimal128Builder, Decimal256Builder, Decimal32Builder, Decimal64Builder,
422+
FixedSizeBinaryBuilder, LargeBinaryBuilder,
423423
};
424424
use arrow_array::types::*;
425425
use arrow_buffer::i256;
@@ -496,25 +496,38 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
496496
#[test]
497497
fn test_write_csv_decimal() {
498498
let schema = Schema::new(vec![
499-
Field::new("c1", DataType::Decimal128(38, 6), true),
500-
Field::new("c2", DataType::Decimal256(76, 6), true),
499+
Field::new("c1", DataType::Decimal32(9, 6), true),
500+
Field::new("c2", DataType::Decimal64(17, 6), true),
501+
Field::new("c3", DataType::Decimal128(38, 6), true),
502+
Field::new("c4", DataType::Decimal256(76, 6), true),
501503
]);
502504

503-
let mut c1_builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(38, 6));
505+
let mut c1_builder = Decimal32Builder::new().with_data_type(DataType::Decimal32(9, 6));
504506
c1_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
505507
let c1 = c1_builder.finish();
506508

507-
let mut c2_builder = Decimal256Builder::new().with_data_type(DataType::Decimal256(76, 6));
508-
c2_builder.extend(vec![
509+
let mut c2_builder = Decimal64Builder::new().with_data_type(DataType::Decimal64(17, 6));
510+
c2_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
511+
let c2 = c2_builder.finish();
512+
513+
let mut c3_builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(38, 6));
514+
c3_builder.extend(vec![Some(-3335724), Some(2179404), None, Some(290472)]);
515+
let c3 = c3_builder.finish();
516+
517+
let mut c4_builder = Decimal256Builder::new().with_data_type(DataType::Decimal256(76, 6));
518+
c4_builder.extend(vec![
509519
Some(i256::from_i128(-3335724)),
510520
Some(i256::from_i128(2179404)),
511521
None,
512522
Some(i256::from_i128(290472)),
513523
]);
514-
let c2 = c2_builder.finish();
524+
let c4 = c4_builder.finish();
515525

516-
let batch =
517-
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap();
526+
let batch = RecordBatch::try_new(
527+
Arc::new(schema),
528+
vec![Arc::new(c1), Arc::new(c2), Arc::new(c3), Arc::new(c4)],
529+
)
530+
.unwrap();
518531

519532
let mut file = tempfile::tempfile().unwrap();
520533

@@ -530,15 +543,15 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555,23:46:03,foo
530543
let mut buffer: Vec<u8> = vec![];
531544
file.read_to_end(&mut buffer).unwrap();
532545

533-
let expected = r#"c1,c2
534-
-3.335724,-3.335724
535-
2.179404,2.179404
536-
,
537-
0.290472,0.290472
538-
-3.335724,-3.335724
539-
2.179404,2.179404
540-
,
541-
0.290472,0.290472
546+
let expected = r#"c1,c2,c3,c4
547+
-3.335724,-3.335724,-3.335724,-3.335724
548+
2.179404,2.179404,2.179404,2.179404
549+
,,,
550+
0.290472,0.290472,0.290472,0.290472
551+
-3.335724,-3.335724,-3.335724,-3.335724
552+
2.179404,2.179404,2.179404,2.179404
553+
,,,
554+
0.290472,0.290472,0.290472,0.290472
542555
"#;
543556
assert_eq!(expected, str::from_utf8(&buffer).unwrap());
544557
}

arrow-json/src/reader/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,8 @@ fn make_decoder(
730730
DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
731731
DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
732732
DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
733+
DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
734+
DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
733735
DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
734736
DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
735737
DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
@@ -1345,6 +1347,8 @@ mod tests {
13451347

13461348
#[test]
13471349
fn test_decimals() {
1350+
test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1351+
test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
13481352
test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
13491353
test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
13501354
}

arrow-json/src/writer/encoder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ pub fn make_encoder<'a>(
339339
let nulls = array.nulls().cloned();
340340
NullableEncoder::new(Box::new(encoder) as Box<dyn Encoder + 'a>, nulls)
341341
}
342-
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
342+
DataType::Decimal32(_, _) | DataType::Decimal64(_, _) | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
343343
let options = FormatOptions::new().with_display_error(true);
344344
let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?);
345345
NullableEncoder::new(Box::new(RawArrayFormatter(formatter)) as Box<dyn Encoder + 'a>, nulls)

arrow-json/src/writer/mod.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,6 +1929,54 @@ mod tests {
19291929
)
19301930
}
19311931

1932+
#[test]
1933+
fn test_decimal32_encoder() {
1934+
let array = Decimal32Array::from_iter_values([1234, 5678, 9012])
1935+
.with_precision_and_scale(8, 2)
1936+
.unwrap();
1937+
let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1938+
let schema = Schema::new(vec![field]);
1939+
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1940+
1941+
let mut buf = Vec::new();
1942+
{
1943+
let mut writer = LineDelimitedWriter::new(&mut buf);
1944+
writer.write_batches(&[&batch]).unwrap();
1945+
}
1946+
1947+
assert_json_eq(
1948+
&buf,
1949+
r#"{"decimal":12.34}
1950+
{"decimal":56.78}
1951+
{"decimal":90.12}
1952+
"#,
1953+
);
1954+
}
1955+
1956+
#[test]
1957+
fn test_decimal64_encoder() {
1958+
let array = Decimal64Array::from_iter_values([1234, 5678, 9012])
1959+
.with_precision_and_scale(10, 2)
1960+
.unwrap();
1961+
let field = Arc::new(Field::new("decimal", array.data_type().clone(), true));
1962+
let schema = Schema::new(vec![field]);
1963+
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap();
1964+
1965+
let mut buf = Vec::new();
1966+
{
1967+
let mut writer = LineDelimitedWriter::new(&mut buf);
1968+
writer.write_batches(&[&batch]).unwrap();
1969+
}
1970+
1971+
assert_json_eq(
1972+
&buf,
1973+
r#"{"decimal":12.34}
1974+
{"decimal":56.78}
1975+
{"decimal":90.12}
1976+
"#,
1977+
);
1978+
}
1979+
19321980
#[test]
19331981
fn test_decimal128_encoder() {
19341982
let array = Decimal128Array::from_iter_values([1234, 5678, 9012])

parquet/src/arrow/array_reader/fixed_len_byte_array.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use crate::column::reader::decoder::ColumnValueDecoder;
2727
use crate::errors::{ParquetError, Result};
2828
use crate::schema::types::ColumnDescPtr;
2929
use arrow_array::{
30-
ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
31-
IntervalDayTimeArray, IntervalYearMonthArray,
30+
ArrayRef, Decimal128Array, Decimal256Array, Decimal32Array, Decimal64Array,
31+
FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray,
3232
};
3333
use arrow_buffer::{i256, Buffer, IntervalDayTime};
3434
use arrow_data::ArrayDataBuilder;
@@ -64,6 +64,22 @@ pub fn make_fixed_len_byte_array_reader(
6464
};
6565
match &data_type {
6666
ArrowType::FixedSizeBinary(_) => {}
67+
ArrowType::Decimal32(_, _) => {
68+
if byte_length > 4 {
69+
return Err(general_err!(
70+
"decimal 32 type too large, must be less then 4 bytes, got {}",
71+
byte_length
72+
));
73+
}
74+
}
75+
ArrowType::Decimal64(_, _) => {
76+
if byte_length > 8 {
77+
return Err(general_err!(
78+
"decimal 64 type too large, must be less then 8 bytes, got {}",
79+
byte_length
80+
));
81+
}
82+
}
6783
ArrowType::Decimal128(_, _) => {
6884
if byte_length > 16 {
6985
return Err(general_err!(
@@ -168,6 +184,16 @@ impl ArrayReader for FixedLenByteArrayReader {
168184
// conversion lambdas are all infallible. This improves performance by avoiding a branch in
169185
// the inner loop (see docs for `PrimitiveArray::from_unary`).
170186
let array: ArrayRef = match &self.data_type {
187+
ArrowType::Decimal32(p, s) => {
188+
let f = |b: &[u8]| i32::from_be_bytes(sign_extend_be(b));
189+
Arc::new(Decimal32Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
190+
as ArrayRef
191+
}
192+
ArrowType::Decimal64(p, s) => {
193+
let f = |b: &[u8]| i64::from_be_bytes(sign_extend_be(b));
194+
Arc::new(Decimal64Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
195+
as ArrayRef
196+
}
171197
ArrowType::Decimal128(p, s) => {
172198
let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
173199
Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)

0 commit comments

Comments
 (0)