Skip to content

Commit b5b40f7

Browse files
authored
chore: upgrade arrow-format (#14675)
upgrade arrow-format
1 parent 4bc2c77 commit b5b40f7

File tree

4 files changed

+50
-2
lines changed

4 files changed

+50
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ rpath = false
253253

254254
[patch.crates-io]
255255
# If there are dependencies that need patching, they can be listed below.
256-
arrow-format = { git = "https://github.com/everpcpc/arrow-format", rev = "588d371" }
256+
arrow-format = { git = "https://github.com/everpcpc/arrow-format", rev = "ad8f2dd" }
257257
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "b0e6545" }
258258
metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "fc2ecd1" }
259259
icelake = { git = "https://github.com/icelake-io/icelake", rev = "f06cdf3" }

src/common/arrow/src/arrow/io/ipc/read/schema.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ fn get_data_type(
364364
Struct(_) => deserialize_struct(field)?,
365365
Union(union_) => deserialize_union(union_, field)?,
366366
Map(map) => deserialize_map(map, field)?,
367+
_ => unimplemented!(),
367368
})
368369
}
369370

src/common/arrow/src/arrow/io/ipc/write/common.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,33 @@ fn serialize_compression(
247247
}
248248
}
249249

250+
fn set_variadic_buffer_counts(_counts: &mut Vec<i64>, array: &dyn Array) {
251+
match array.data_type() {
252+
DataType::Struct(_) => {
253+
let array = array.as_any().downcast_ref::<StructArray>().unwrap();
254+
for array in array.values() {
255+
set_variadic_buffer_counts(_counts, array.as_ref())
256+
}
257+
}
258+
DataType::LargeList(_) => {
259+
let array = array.as_any().downcast_ref::<ListArray<i64>>().unwrap();
260+
set_variadic_buffer_counts(_counts, array.values().as_ref())
261+
}
262+
DataType::FixedSizeList(_, _) => {
263+
let array = array.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
264+
set_variadic_buffer_counts(_counts, array.values().as_ref())
265+
}
266+
DataType::Dictionary(_, _, _) => {
267+
let array = array
268+
.as_any()
269+
.downcast_ref::<DictionaryArray<u32>>()
270+
.unwrap();
271+
set_variadic_buffer_counts(_counts, array.values().as_ref())
272+
}
273+
_ => (),
274+
}
275+
}
276+
250277
/// Write [`Chunk`] into two sets of bytes, one for the header (ipc::Schema::Message) and the
251278
/// other for the batch's data
252279
fn chunk_to_bytes_amortized(
@@ -260,7 +287,10 @@ fn chunk_to_bytes_amortized(
260287
arrow_data.clear();
261288

262289
let mut offset = 0;
290+
let mut variadic_buffer_counts = vec![];
291+
263292
for array in chunk.arrays() {
293+
set_variadic_buffer_counts(&mut variadic_buffer_counts, array.as_ref());
264294
write(
265295
array.as_ref(),
266296
&mut buffers,
@@ -272,6 +302,12 @@ fn chunk_to_bytes_amortized(
272302
)
273303
}
274304

305+
let variadic_buffer_counts = if variadic_buffer_counts.is_empty() {
306+
None
307+
} else {
308+
Some(variadic_buffer_counts)
309+
};
310+
275311
let compression = serialize_compression(options.compression);
276312

277313
let message = arrow_format::ipc::Message {
@@ -282,6 +318,7 @@ fn chunk_to_bytes_amortized(
282318
nodes: Some(nodes),
283319
buffers: Some(buffers),
284320
compression,
321+
variadic_buffer_counts,
285322
},
286323
))),
287324
body_length: arrow_data.len() as i64,
@@ -306,6 +343,15 @@ fn dictionary_batch_to_bytes<K: DictionaryKey>(
306343
let mut buffers: Vec<arrow_format::ipc::Buffer> = vec![];
307344
let mut arrow_data: Vec<u8> = vec![];
308345

346+
let mut variadic_buffer_counts = vec![];
347+
set_variadic_buffer_counts(&mut variadic_buffer_counts, array.values().as_ref());
348+
349+
let variadic_buffer_counts = if variadic_buffer_counts.is_empty() {
350+
None
351+
} else {
352+
Some(variadic_buffer_counts)
353+
};
354+
309355
let length = write_dictionary(
310356
array,
311357
&mut buffers,
@@ -329,6 +375,7 @@ fn dictionary_batch_to_bytes<K: DictionaryKey>(
329375
nodes: Some(nodes),
330376
buffers: Some(buffers),
331377
compression,
378+
variadic_buffer_counts,
332379
})),
333380
is_delta: false,
334381
},

0 commit comments

Comments
 (0)