Skip to content

Commit bc293de

Browse files
committed
Switch to tokio for multi threaded writing test
1 parent 2d42ca0 commit bc293de

File tree

2 files changed

+45
-32
lines changed

2 files changed

+45
-32
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ impl PageWriter for ArrowPageWriter {
558558
}
559559

560560
/// A leaf column that can be encoded by [`ArrowColumnWriter`]
561-
#[derive(Debug)]
561+
#[derive(Debug, Clone)]
562562
pub struct ArrowLeafColumn(ArrayLevels);
563563

564564
/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]

parquet/tests/encryption/encryption.rs

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ use parquet::arrow::arrow_reader::{
2828
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection,
2929
RowSelector,
3030
};
31-
use parquet::arrow::arrow_writer::{
32-
compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory,
33-
};
31+
use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn, ArrowRowGroupWriterFactory};
3432
use parquet::arrow::{ArrowSchemaConverter, ArrowWriter};
3533
use parquet::data_type::{ByteArray, ByteArrayType};
3634
use parquet::encryption::decrypt::FileDecryptionProperties;
@@ -1165,41 +1163,56 @@ async fn test_multi_threaded_encrypted_writing() {
11651163

11661164
// Get column writers with encryptor from ArrowRowGroupWriter
11671165
let col_writers = arrow_row_group_writer.writers;
1166+
let num_columns = col_writers.len();
1167+
1168+
// Create a channel for each column writer to send ArrowLeafColumn data to
1169+
let mut col_writer_tasks = Vec::with_capacity(num_columns);
1170+
let mut col_array_channels = Vec::with_capacity(num_columns);
1171+
for mut writer in col_writers.into_iter() {
1172+
let (send_array, mut receive_array) = tokio::sync::mpsc::channel::<ArrowLeafColumn>(100);
1173+
col_array_channels.push(send_array);
1174+
let handle = tokio::spawn(async move {
1175+
while let Some(col) = receive_array.recv().await {
1176+
let _ = writer.write(&col);
1177+
}
1178+
writer.close()
1179+
});
1180+
col_writer_tasks.push(handle);
1181+
}
11681182

1169-
let mut workers: Vec<_> = col_writers
1170-
.into_iter()
1171-
.map(|mut col_writer| {
1172-
let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
1173-
let handle = std::thread::spawn(move || {
1174-
// receive Arrays to encode via the channel
1175-
for col in recv {
1176-
col_writer.write(&col)?;
1177-
}
1178-
// once the input is complete, close the writer
1179-
// to return the newly created ArrowColumnChunk
1180-
col_writer.close()
1181-
});
1182-
(handle, send)
1183-
})
1184-
.collect();
1185-
1186-
let mut worker_iter = workers.iter_mut();
1187-
for (arr, field) in to_write.iter().zip(&schema.fields) {
1188-
for leaves in compute_leaves(field, arr).unwrap() {
1189-
worker_iter.next().unwrap().1.send(leaves).unwrap();
1183+
// Send the ArrowLeafColumn data to the respective column writer channels
1184+
let mut next_channel = 0;
1185+
for (array, field) in to_write.iter().zip(schema.fields()) {
1186+
for c in compute_leaves(field, array).iter().flat_map(|x| x) {
1187+
if col_array_channels[next_channel]
1188+
.send(c.clone())
1189+
.await
1190+
.is_err()
1191+
{
1192+
break;
1193+
}
11901194
}
1195+
next_channel += 1;
1196+
}
1197+
drop(col_array_channels);
1198+
1199+
// Wait for all column writers to finish writing
1200+
let mut finalized_rg = Vec::with_capacity(num_columns);
1201+
for task in col_writer_tasks.into_iter() {
1202+
finalized_rg.push(task.await);
11911203
}
11921204

1193-
// Wait for the workers to complete encoding, and append
1205+
// Wait for the workers to complete writing then append
11941206
// the resulting column chunks to the row group (and the file)
11951207
let mut row_group_writer = file_writer.next_row_group().unwrap();
1196-
1197-
for (handle, send) in workers {
1198-
drop(send); // Drop send side to signal termination
1199-
// wait for the worker to send the completed chunk
1200-
let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
1201-
chunk.append_to_row_group(&mut row_group_writer).unwrap();
1208+
for chunk in finalized_rg {
1209+
chunk
1210+
.unwrap()
1211+
.unwrap()
1212+
.append_to_row_group(&mut row_group_writer)
1213+
.unwrap();
12021214
}
1215+
12031216
// Close the row group which writes to the underlying file
12041217
row_group_writer.close().unwrap();
12051218

0 commit comments

Comments
 (0)