Skip to content

Commit b7e1398

Browse files
committed
Initial commit
1 parent 54592e8 commit b7e1398

File tree

11 files changed

+331
-193
lines changed

11 files changed

+331
-193
lines changed

Cargo.lock

Lines changed: 252 additions & 132 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,22 @@ ahash = { version = "0.8", default-features = false, features = [
8989
"runtime-rng",
9090
] }
9191
apache-avro = { version = "0.17", default-features = false }
92-
arrow = { version = "55.2.0", features = [
92+
arrow = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing" , features = [
9393
"prettyprint",
9494
"chrono-tz",
9595
] }
96-
arrow-buffer = { version = "55.2.0", default-features = false }
97-
arrow-flight = { version = "55.2.0", features = [
96+
97+
98+
arrow-buffer = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing", default-features = false }
99+
arrow-flight = {git = "https://github.com/rok/arrow-rs.git", features = [
98100
"flight-sql-experimental",
99101
] }
100-
arrow-ipc = { version = "55.2.0", default-features = false, features = [
102+
arrow-ipc = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing" , default-features = false, features = [
101103
"lz4",
102104
"zstd",
103105
] }
104-
arrow-ord = { version = "55.2.0", default-features = false }
105-
arrow-schema = { version = "55.2.0", default-features = false }
106+
arrow-ord = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing" , default-features = false }
107+
arrow-schema = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing" , default-features = false }
106108
async-trait = "0.1.88"
107109
bigdecimal = "0.4.8"
108110
bytes = "1.10"
@@ -155,7 +157,7 @@ itertools = "0.14"
155157
log = "^0.4"
156158
object_store = { version = "0.12.2", default-features = false }
157159
parking_lot = "0.12"
158-
parquet = { version = "55.2.0", default-features = false, features = [
160+
parquet = { git = "https://github.com/rok/arrow-rs.git", branch = "multi-threaded_encrypted_writing" , default-features = false, features = [
159161
"arrow",
160162
"async",
161163
"object_store",

datafusion/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ log = { workspace = true }
6565
object_store = { workspace = true, optional = true }
6666
parquet = { workspace = true, optional = true, default-features = true }
6767
paste = "1.0.15"
68-
pyo3 = { version = "0.24.2", optional = true }
68+
pyo3 = { version = "0.25.1", optional = true }
6969
recursive = { workspace = true, optional = true }
7070
sqlparser = { workspace = true }
7171
tokio = { workspace = true }

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use crate::{
2525
DataFusionError, Result, _internal_datafusion_err,
2626
};
2727

28+
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
29+
2830
use arrow::datatypes::Schema;
2931
// TODO: handle once deprecated
3032
#[allow(deprecated)]
@@ -35,7 +37,7 @@ use parquet::{
3537
metadata::KeyValue,
3638
properties::{
3739
EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion,
38-
DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED,
40+
DEFAULT_STATISTICS_ENABLED,
3941
},
4042
},
4143
schema::types::ColumnPath,
@@ -167,13 +169,13 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
167169

168170
// max_statistics_size is deprecated, currently it is not being used
169171
// TODO: remove once deprecated
170-
#[allow(deprecated)]
171-
if let Some(max_statistics_size) = options.max_statistics_size {
172-
builder = {
173-
#[allow(deprecated)]
174-
builder.set_column_max_statistics_size(path, max_statistics_size)
175-
}
176-
}
172+
// #[allow(deprecated)]
173+
// if let Some(max_statistics_size) = options.max_statistics_size {
174+
// builder = {
175+
// #[allow(deprecated)]
176+
// builder.set_column_max_statistics_size(path, max_statistics_size)
177+
// }
178+
// }
177179
}
178180

179181
Ok(builder)
@@ -222,7 +224,7 @@ impl ParquetOptions {
222224
dictionary_enabled,
223225
dictionary_page_size_limit,
224226
statistics_enabled,
225-
max_statistics_size,
227+
max_statistics_size: _max_statistics_size,
226228
max_row_group_size,
227229
created_by,
228230
column_index_truncate_length,
@@ -268,12 +270,12 @@ impl ParquetOptions {
268270
.set_data_page_row_count_limit(*data_page_row_count_limit)
269271
.set_bloom_filter_enabled(*bloom_filter_on_write);
270272

271-
builder = {
272-
#[allow(deprecated)]
273-
builder.set_max_statistics_size(
274-
max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
275-
)
276-
};
273+
// builder = {
274+
// #[allow(deprecated)]
275+
// builder.set_max_statistics_size(
276+
// max_statistics_size.unwrap_or(DEFAULT_MAX_STATISTICS_SIZE),
277+
// )
278+
// };
277279

278280
if let Some(bloom_filter_fpp) = bloom_filter_fpp {
279281
builder = builder.set_bloom_filter_fpp(*bloom_filter_fpp);

datafusion/common/src/scalar/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2134,7 +2134,9 @@ impl ScalarValue {
21342134
| DataType::Time64(TimeUnit::Millisecond)
21352135
| DataType::RunEndEncoded(_, _)
21362136
| DataType::ListView(_)
2137-
| DataType::LargeListView(_) => {
2137+
| DataType::LargeListView(_)
2138+
| DataType::Decimal32(_, _)
2139+
| DataType::Decimal64(_, _) => {
21382140
return _not_impl_err!(
21392141
"Unsupported creation of {:?} array from ScalarValue {:?}",
21402142
data_type,

datafusion/common/src/types/native.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ impl From<DataType> for NativeType {
407407
DataType::Union(union_fields, _) => {
408408
Union(LogicalUnionFields::from(&union_fields))
409409
}
410+
DataType::Decimal32(p, s) | DataType::Decimal64(p, s) |
410411
DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => Decimal(p, s),
411412
DataType::Map(field, _) => Map(Arc::new(field.as_ref().into())),
412413
DataType::Dictionary(_, data_type) => data_type.as_ref().clone().into(),

datafusion/core/src/dataframe/parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ mod tests {
277277
// Write encrypted parquet using write_parquet
278278
let mut options = TableParquetOptions::default();
279279
options.crypto.file_encryption = Some((&encrypt).into());
280+
options.global.allow_single_file_parallelism = true;
280281

281282
df.write_parquet(
282283
tempfile_str.as_str(),

datafusion/datasource-avro/src/avro_to_arrow/schema.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ fn default_field_name(dt: &DataType) -> &str {
237237
}
238238
DataType::Decimal128(_, _) => "decimal",
239239
DataType::Decimal256(_, _) => "decimal",
240+
DataType::Decimal32(_, _) => "decimal",
241+
DataType::Decimal64(_, _) => "decimal",
240242
}
241243
}
242244

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ use object_store::path::Path;
7272
use object_store::{ObjectMeta, ObjectStore};
7373
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
7474
use parquet::arrow::arrow_writer::{
75-
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
76-
ArrowLeafColumn, ArrowWriterOptions,
75+
compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn,
76+
ArrowRowGroupWriterFactory, ArrowWriterOptions,
7777
};
7878
use parquet::arrow::async_reader::MetadataFetch;
7979
use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter};
@@ -1306,14 +1306,6 @@ impl FileSink for ParquetSink {
13061306
object_store: Arc<dyn ObjectStore>,
13071307
) -> Result<u64> {
13081308
let parquet_opts = &self.parquet_options;
1309-
let mut allow_single_file_parallelism =
1310-
parquet_opts.global.allow_single_file_parallelism;
1311-
1312-
if parquet_opts.crypto.file_encryption.is_some() {
1313-
// For now, arrow-rs does not support parallel writes with encryption
1314-
// See https://github.com/apache/arrow-rs/issues/7359
1315-
allow_single_file_parallelism = false;
1316-
}
13171309

13181310
let mut file_write_tasks: JoinSet<
13191311
std::result::Result<(Path, FileMetaData), DataFusionError>,
@@ -1330,7 +1322,7 @@ impl FileSink for ParquetSink {
13301322
};
13311323

13321324
while let Some((path, mut rx)) = file_stream_rx.recv().await {
1333-
if !allow_single_file_parallelism {
1325+
if !parquet_opts.global.allow_single_file_parallelism {
13341326
let mut writer = self
13351327
.create_async_arrow_writer(
13361328
&path,
@@ -1458,13 +1450,13 @@ type ColSender = Sender<ArrowLeafColumn>;
14581450
/// Returns join handles for each columns serialization task along with a send channel
14591451
/// to send arrow arrays to each serialization task.
14601452
fn spawn_column_parallel_row_group_writer(
1461-
schema: Arc<Schema>,
1462-
parquet_props: Arc<WriterProperties>,
1453+
arrow_row_group_writer_factory: Arc<ArrowRowGroupWriterFactory>,
14631454
max_buffer_size: usize,
14641455
pool: &Arc<dyn MemoryPool>,
14651456
) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1466-
let schema_desc = ArrowSchemaConverter::new().convert(&schema)?;
1467-
let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?;
1457+
let arrow_row_group_writer =
1458+
arrow_row_group_writer_factory.create_row_group_writer(0)?;
1459+
let col_writers = arrow_row_group_writer.into_column_writers();
14681460
let num_columns = col_writers.len();
14691461

14701462
let mut col_writer_tasks = Vec::with_capacity(num_columns);
@@ -1559,6 +1551,7 @@ fn spawn_rg_join_and_finalize_task(
15591551
/// across both columns and row_groups, with a theoretical max number of parallel tasks
15601552
/// given by n_columns * num_row_groups.
15611553
fn spawn_parquet_parallel_serialization_task(
1554+
arrow_row_group_writer_factory: Arc<ArrowRowGroupWriterFactory>,
15621555
mut data: Receiver<RecordBatch>,
15631556
serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
15641557
schema: Arc<Schema>,
@@ -1571,12 +1564,14 @@ fn spawn_parquet_parallel_serialization_task(
15711564
let max_row_group_rows = writer_props.max_row_group_size();
15721565
let (mut column_writer_handles, mut col_array_channels) =
15731566
spawn_column_parallel_row_group_writer(
1574-
Arc::clone(&schema),
1575-
Arc::clone(&writer_props),
1567+
arrow_row_group_writer_factory.clone(),
15761568
max_buffer_rb,
15771569
&pool,
15781570
)?;
15791571
let mut current_rg_rows = 0;
1572+
// TODO: row_group_writer should use the correct row group index. Currently this would fail if
1573+
// multiple row groups were written.
1574+
// let mut rg_index = 0;
15801575

15811576
while let Some(mut rb) = data.recv().await {
15821577
// This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
@@ -1623,8 +1618,7 @@ fn spawn_parquet_parallel_serialization_task(
16231618

16241619
(column_writer_handles, col_array_channels) =
16251620
spawn_column_parallel_row_group_writer(
1626-
Arc::clone(&schema),
1627-
Arc::clone(&writer_props),
1621+
arrow_row_group_writer_factory.clone(),
16281622
max_buffer_rb,
16291623
&pool,
16301624
)?;
@@ -1655,24 +1649,15 @@ fn spawn_parquet_parallel_serialization_task(
16551649
/// Consume RowGroups serialized by other parallel tasks and concatenate them in
16561650
/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
16571651
async fn concatenate_parallel_row_groups(
1652+
mut parquet_writer: SerializedFileWriter<SharedBuffer>,
1653+
merged_buff: SharedBuffer,
16581654
mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1659-
schema: Arc<Schema>,
1660-
writer_props: Arc<WriterProperties>,
16611655
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
16621656
pool: Arc<dyn MemoryPool>,
16631657
) -> Result<FileMetaData> {
1664-
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1665-
16661658
let mut file_reservation =
16671659
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
16681660

1669-
let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?;
1670-
let mut parquet_writer = SerializedFileWriter::new(
1671-
merged_buff.clone(),
1672-
schema_desc.root_schema_ptr(),
1673-
writer_props,
1674-
)?;
1675-
16761661
while let Some(task) = serialize_rx.recv().await {
16771662
let result = task.join_unwind().await;
16781663
let mut rg_out = parquet_writer.next_row_group()?;
@@ -1723,28 +1708,47 @@ async fn output_single_parquet_file_parallelized(
17231708
let (serialize_tx, serialize_rx) =
17241709
mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
17251710

1711+
let parquet_schema = ArrowSchemaConverter::new()
1712+
.with_coerce_types(parquet_props.coerce_types())
1713+
.convert(&output_schema)?;
1714+
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1715+
let parquet_writer = SerializedFileWriter::new(
1716+
merged_buff.clone(),
1717+
parquet_schema.root_schema_ptr(),
1718+
parquet_props.clone().into(),
1719+
)?;
1720+
let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(
1721+
&parquet_writer,
1722+
parquet_schema,
1723+
output_schema.clone(),
1724+
parquet_props.clone().into(),
1725+
);
1726+
17261727
let arc_props = Arc::new(parquet_props.clone());
17271728
let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1729+
Arc::new(arrow_row_group_writer_factory),
17281730
data,
17291731
serialize_tx,
17301732
Arc::clone(&output_schema),
17311733
Arc::clone(&arc_props),
17321734
parallel_options,
17331735
Arc::clone(&pool),
17341736
);
1737+
1738+
launch_serialization_task
1739+
.join_unwind()
1740+
.await
1741+
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1742+
17351743
let file_metadata = concatenate_parallel_row_groups(
1744+
parquet_writer,
1745+
merged_buff,
17361746
serialize_rx,
1737-
Arc::clone(&output_schema),
1738-
Arc::clone(&arc_props),
17391747
object_store_writer,
17401748
pool,
17411749
)
17421750
.await?;
17431751

1744-
launch_serialization_task
1745-
.join_unwind()
1746-
.await
1747-
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
17481752
Ok(file_metadata)
17491753
}
17501754

datafusion/expr/src/utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,8 @@ pub fn can_hash(data_type: &DataType) -> bool {
816816
DataType::Float64 => true,
817817
DataType::Decimal128(_, _) => true,
818818
DataType::Decimal256(_, _) => true,
819+
DataType::Decimal32(_, _) => true,
820+
DataType::Decimal64(_, _) => true,
819821
DataType::Timestamp(_, _) => true,
820822
DataType::Utf8 => true,
821823
DataType::LargeUtf8 => true,

0 commit comments

Comments
 (0)