Skip to content

Commit b57a5d4

Browse files
committed
Switch to tokio for multi threaded writing test
1 parent 2d42ca0 commit b57a5d4

File tree

1 file changed

+31
-30
lines changed

1 file changed

+31
-30
lines changed

parquet/tests/encryption/encryption.rs

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ use parquet::arrow::arrow_reader::{
2828
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection,
2929
RowSelector,
3030
};
31-
use parquet::arrow::arrow_writer::{
32-
compute_leaves, ArrowColumnChunk, ArrowLeafColumn, ArrowRowGroupWriterFactory,
33-
};
31+
use parquet::arrow::arrow_writer::{compute_leaves, ArrowLeafColumn, ArrowRowGroupWriterFactory};
3432
use parquet::arrow::{ArrowSchemaConverter, ArrowWriter};
3533
use parquet::data_type::{ByteArray, ByteArrayType};
3634
use parquet::encryption::decrypt::FileDecryptionProperties;
@@ -1165,41 +1163,44 @@ async fn test_multi_threaded_encrypted_writing() {
11651163

11661164
// Get column writers with encryptor from ArrowRowGroupWriter
11671165
let col_writers = arrow_row_group_writer.writers;
1166+
let num_columns = col_writers.len();
1167+
1168+
// Create a channel for each column writer to send ArrowLeafColumn data to
1169+
let mut col_writer_tasks = Vec::with_capacity(num_columns);
1170+
let mut col_array_channels = Vec::with_capacity(num_columns);
1171+
for mut writer in col_writers.into_iter() {
1172+
let (send_array, mut receive_array) = tokio::sync::mpsc::channel::<ArrowLeafColumn>(100);
1173+
col_array_channels.push(send_array);
1174+
let handle = tokio::spawn(async move {
1175+
while let Some(col) = receive_array.recv().await {
1176+
let _ = writer.write(&col);
1177+
}
1178+
writer.close().unwrap()
1179+
});
1180+
col_writer_tasks.push(handle);
1181+
}
11681182

1169-
let mut workers: Vec<_> = col_writers
1170-
.into_iter()
1171-
.map(|mut col_writer| {
1172-
let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
1173-
let handle = std::thread::spawn(move || {
1174-
// receive Arrays to encode via the channel
1175-
for col in recv {
1176-
col_writer.write(&col)?;
1177-
}
1178-
// once the input is complete, close the writer
1179-
// to return the newly created ArrowColumnChunk
1180-
col_writer.close()
1181-
});
1182-
(handle, send)
1183-
})
1184-
.collect();
1185-
1186-
let mut worker_iter = workers.iter_mut();
1187-
for (arr, field) in to_write.iter().zip(&schema.fields) {
1188-
for leaves in compute_leaves(field, arr).unwrap() {
1189-
worker_iter.next().unwrap().1.send(leaves).unwrap();
1183+
// Send the ArrowLeafColumn data to the respective column writer channels
1184+
for (channel_idx, (array, field)) in to_write.iter().zip(schema.fields()).enumerate() {
1185+
for c in compute_leaves(field, array).into_iter().flatten() {
1186+
let _ = col_array_channels[channel_idx].send(c).await;
11901187
}
11911188
}
1189+
drop(col_array_channels);
1190+
1191+
// Wait for all column writers to finish writing
1192+
let mut finalized_rg = Vec::with_capacity(num_columns);
1193+
for task in col_writer_tasks.into_iter() {
1194+
finalized_rg.push(task.await.unwrap());
1195+
}
11921196

1193-
// Wait for the workers to complete encoding, and append
1197+
// Wait for the workers to complete writing then append
11941198
// the resulting column chunks to the row group (and the file)
11951199
let mut row_group_writer = file_writer.next_row_group().unwrap();
1196-
1197-
for (handle, send) in workers {
1198-
drop(send); // Drop send side to signal termination
1199-
// wait for the worker to send the completed chunk
1200-
let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
1200+
for chunk in finalized_rg {
12011201
chunk.append_to_row_group(&mut row_group_writer).unwrap();
12021202
}
1203+
12031204
// Close the row group which writes to the underlying file
12041205
row_group_writer.close().unwrap();
12051206

0 commit comments

Comments
 (0)