Skip to content

feat: support multi-threaded writing of Parquet files with modular encryption #16738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

rok
Copy link
Member

@rok rok commented Jul 10, 2025

Which issue does this PR close?

Rationale for this change

#16351 added modular encryption reading and writing. This builds on top of #16351 and uses apache/arrow-rs#7818 to enable multi threaded encrypted writing.

What changes are included in this PR?

This uses a lower level ArrowRowGroupWriterFactory API to create encryption-aware column writers that are then run multi threaded.

Are these changes tested?

Yes.

Are there any user-facing changes?

Previously parquet_opts.global.allow_single_file_parallelism == true would be ignored and encrypted write would always be single threaded. Now it will run multithreaded.

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions core Core DataFusion crate common Related to common crate datasource Changes to the datasource crate labels Jul 10, 2025
@rok
Copy link
Member Author

rok commented Jul 10, 2025

Note: this includes some unrelated changes so as to be able to use changes in apache/arrow-rs#7818.
Also note: row_group_index at write time is not handled correctly yet. Do not merge.

@adamreeve @corwinjoy thought?

@rok rok changed the title Initial commit feat: support multi-threaded writing of Parquet files with modular encryption Jul 10, 2025
@rok rok force-pushed the multithreaded_encrypted_parquet_write branch 2 times, most recently from b7e1398 to d8709fc Compare July 10, 2025 13:37
@github-actions github-actions bot added the proto Related to proto crate label Jul 10, 2025
@rok rok force-pushed the multithreaded_encrypted_parquet_write branch from d8709fc to bb475bf Compare July 10, 2025 13:58
@github-actions github-actions bot added the substrait Changes to the substrait crate label Jul 10, 2025
@rok rok force-pushed the multithreaded_encrypted_parquet_write branch 3 times, most recently from 8e872c3 to 5929724 Compare July 10, 2025 15:49
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jul 10, 2025
@rok rok force-pushed the multithreaded_encrypted_parquet_write branch from 5929724 to f1f6d63 Compare July 10, 2025 16:21
@adamreeve
Copy link
Contributor

This approach looks good to me! Do the existing tests hit the parallel write code path? We probably want to make sure there are encryption tests for both the parallel and serial code paths.

@corwinjoy
Copy link
Contributor

corwinjoy commented Jul 11, 2025

Via copilot:
This pull request introduces several updates across multiple components of the codebase, focusing on dependency management, feature enhancements, and code cleanup. The most significant changes include switching Arrow-related dependencies to a custom fork, deprecating the use of max_statistics_size in Parquet options, and extending support for additional decimal data types.

Dependency Updates:

  • Updated Arrow-related dependencies (arrow, arrow-buffer, arrow-flight, etc.) in Cargo.toml to use a custom fork hosted on GitHub (https://github.com/rok/arrow-rs.git) with the multi-threaded_encrypted_writing branch. This change enables multi-threaded encrypted writing features. [1] [2]

Feature Enhancements:

  • Added support for Decimal32 and Decimal64 data types in various parts of the codebase, including scalar value creation (datafusion/common/src/scalar/mod.rs), native type conversion (datafusion/common/src/types/native.rs), and hashing utilities (datafusion/expr/src/utils.rs). [1] [2] [3]
  • Modified the Parquet writer to allow single-file parallelism explicitly in tests and serialization tasks, while introducing a new ArrowRowGroupWriterFactory for parallel row group writing. [1] [2] [3]

Code Cleanup:

  • Deprecated the use of max_statistics_size in Parquet writer options and removed related code, including references to the deprecated constant and associated logic. [1] [2] [3]
  • Removed unused example files (flight_sql_server, flight_server, flight_client) from datafusion-examples/Cargo.toml.

Protobuf Updates:

  • Extended the protobuf conversion logic to handle Decimal32 and Decimal64 types in datafusion/proto-common/src/to_proto/mod.rs.

max_buffer_rb,
&pool,
)?;
let mut current_rg_rows = 0;
// TODO: row_group_writer should use the correct row group index. Currently this would fail if
// multiple row groups were written.
// let mut rg_index = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adamreeve Definitely something to note. We will want to resolve this before the final PR.

@corwinjoy
Copy link
Contributor

Looks good to me, with the exception of multi-row group writing being missing. When you go to rebase to the latest datafusion the diff should get a lot simpler since they have upgraded to support arrow-55.3 and added the missing statitistics and types. I also like @adamreeve suggestion of having a test for the parallel write path.

launch_serialization_task
.join_unwind()
.await
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
Ok(file_metadata)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot comments:

  • Row Group Indexing: The TODO comment indicates that the row group writer currently assumes a single row group index (always 0). If multiple row groups are written in parallel, this might cause incorrect output or data corruption. Consider addressing this before merging.
  • Encryption Parallelism: The removal of the explicit check for parallelism when encryption is enabled relies on arrow-rs’ internal handling. Double-check upstream to ensure there’s a clear error or fallback if parallelism is not supported with encryption.
  • Testing: These changes affect core serialization logic. Ensure thorough integration and performance tests, especially for edge cases (encryption, large datasets, multiple row groups).
  • Documentation: Consider updating related docs or comments, as the code flow and APIs used have changed significantly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate logical-expr Logical plan and expressions proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support multi-threaded writing of encrypted Parquet files
3 participants