Skip to content

Commit 98dc06b

Browse files
authored
feat: Support Parquet writer options (#1123)
* feat: Support Parquet writer options * Create dedicated write_parquet_options function * Rename write_parquet_options to write_parquet_with_options * Merge remote-tracking branch 'origin/main' into write_parquet_options * Fix ruff errors
1 parent 9b6acec commit 98dc06b

File tree

5 files changed

+757
-2
lines changed

5 files changed

+757
-2
lines changed

python/datafusion/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
SessionContext,
4747
SQLOptions,
4848
)
49-
from .dataframe import DataFrame
49+
from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
5050
from .expr import (
5151
Expr,
5252
WindowFrame,
@@ -80,6 +80,8 @@
8080
"ExecutionPlan",
8181
"Expr",
8282
"LogicalPlan",
83+
"ParquetColumnOptions",
84+
"ParquetWriterOptions",
8385
"RecordBatch",
8486
"RecordBatchStream",
8587
"RuntimeEnvBuilder",

python/datafusion/dataframe.py

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
from typing_extensions import deprecated # Python 3.12
3939

4040
from datafusion._internal import DataFrame as DataFrameInternal
41+
from datafusion._internal import ParquetColumnOptions as ParquetColumnOptionsInternal
42+
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4143
from datafusion.expr import Expr, SortExpr, sort_or_default
4244
from datafusion.plan import ExecutionPlan, LogicalPlan
4345
from datafusion.record_batch import RecordBatchStream
@@ -114,6 +116,173 @@ def get_default_level(self) -> Optional[int]:
114116
return None
115117

116118

119+
class ParquetWriterOptions:
120+
"""Advanced parquet writer options.
121+
122+
Allows settings the writer options that apply to the entire file. Some options can
123+
also be set on a column by column basis, with the field `column_specific_options`
124+
(see `ParquetColumnOptions`).
125+
126+
Attributes:
127+
data_pagesize_limit: Sets best effort maximum size of data page in bytes.
128+
write_batch_size: Sets write_batch_size in bytes.
129+
writer_version: Sets parquet writer version. Valid values are `1.0` and
130+
`2.0`.
131+
skip_arrow_metadata: Skip encoding the embedded arrow metadata in the
132+
KV_meta.
133+
compression: Compression type to use. Default is "zstd(3)".
134+
Available compression types are
135+
- "uncompressed": No compression.
136+
- "snappy": Snappy compression.
137+
- "gzip(n)": Gzip compression with level n.
138+
- "brotli(n)": Brotli compression with level n.
139+
- "lz4": LZ4 compression.
140+
- "lz4_raw": LZ4_RAW compression.
141+
- "zstd(n)": Zstandard compression with level n.
142+
dictionary_enabled: Sets if dictionary encoding is enabled. If None, uses
143+
the default parquet writer setting.
144+
dictionary_page_size_limit: Sets best effort maximum dictionary page size,
145+
in bytes.
146+
statistics_enabled: Sets if statistics are enabled for any column Valid
147+
values are `none`, `chunk`, and `page`. If None, uses the default
148+
parquet writer setting.
149+
max_row_group_size: Target maximum number of rows in each row group
150+
(defaults to 1M rows). Writing larger row groups requires more memory to
151+
write, but can get better compression and be faster to read.
152+
created_by: Sets "created by" property.
153+
column_index_truncate_length: Sets column index truncate length.
154+
statistics_truncate_length: Sets statistics truncate length. If None, uses
155+
the default parquet writer setting.
156+
data_page_row_count_limit: Sets best effort maximum number of rows in a data
157+
page.
158+
encoding: Sets default encoding for any column. Valid values are `plain`,
159+
`plain_dictionary`, `rle`, `bit_packed`, `delta_binary_packed`,
160+
`delta_length_byte_array`, `delta_byte_array`, `rle_dictionary`, and
161+
`byte_stream_split`. If None, uses the default parquet writer setting.
162+
bloom_filter_on_write: Write bloom filters for all columns when creating
163+
parquet files.
164+
bloom_filter_fpp: Sets bloom filter false positive probability. If None,
165+
uses the default parquet writer setting
166+
bloom_filter_ndv: Sets bloom filter number of distinct values. If None, uses
167+
the default parquet writer setting.
168+
allow_single_file_parallelism: Controls whether DataFusion will attempt to
169+
speed up writing parquet files by serializing them in parallel. Each
170+
column in each row group in each output file are serialized in parallel
171+
leveraging a maximum possible core count of n_files * n_row_groups *
172+
n_columns.
173+
maximum_parallel_row_group_writers: By default parallel parquet writer is
174+
tuned for minimum memory usage in a streaming execution plan. You may
175+
see a performance benefit when writing large parquet files by increasing
176+
`maximum_parallel_row_group_writers` and
177+
`maximum_buffered_record_batches_per_stream` if your system has idle
178+
cores and can tolerate additional memory usage. Boosting these values is
179+
likely worthwhile when writing out already in-memory data, such as from
180+
a cached data frame.
181+
maximum_buffered_record_batches_per_stream: See
182+
`maximum_parallel_row_group_writers`.
183+
column_specific_options: Overrides options for specific columns. If a column
184+
is not a part of this dictionary, it will use the parameters provided here.
185+
"""
186+
187+
def __init__(
188+
self,
189+
data_pagesize_limit: int = 1024 * 1024,
190+
write_batch_size: int = 1024,
191+
writer_version: str = "1.0",
192+
skip_arrow_metadata: bool = False,
193+
compression: Optional[str] = "zstd(3)",
194+
dictionary_enabled: Optional[bool] = True,
195+
dictionary_page_size_limit: int = 1024 * 1024,
196+
statistics_enabled: Optional[str] = "page",
197+
max_row_group_size: int = 1024 * 1024,
198+
created_by: str = "datafusion-python",
199+
column_index_truncate_length: Optional[int] = 64,
200+
statistics_truncate_length: Optional[int] = None,
201+
data_page_row_count_limit: int = 20_000,
202+
encoding: Optional[str] = None,
203+
bloom_filter_on_write: bool = False,
204+
bloom_filter_fpp: Optional[float] = None,
205+
bloom_filter_ndv: Optional[int] = None,
206+
allow_single_file_parallelism: bool = True,
207+
maximum_parallel_row_group_writers: int = 1,
208+
maximum_buffered_record_batches_per_stream: int = 2,
209+
column_specific_options: Optional[dict[str, ParquetColumnOptions]] = None,
210+
) -> None:
211+
"""Initialize the ParquetWriterOptions."""
212+
self.data_pagesize_limit = data_pagesize_limit
213+
self.write_batch_size = write_batch_size
214+
self.writer_version = writer_version
215+
self.skip_arrow_metadata = skip_arrow_metadata
216+
self.compression = compression
217+
self.dictionary_enabled = dictionary_enabled
218+
self.dictionary_page_size_limit = dictionary_page_size_limit
219+
self.statistics_enabled = statistics_enabled
220+
self.max_row_group_size = max_row_group_size
221+
self.created_by = created_by
222+
self.column_index_truncate_length = column_index_truncate_length
223+
self.statistics_truncate_length = statistics_truncate_length
224+
self.data_page_row_count_limit = data_page_row_count_limit
225+
self.encoding = encoding
226+
self.bloom_filter_on_write = bloom_filter_on_write
227+
self.bloom_filter_fpp = bloom_filter_fpp
228+
self.bloom_filter_ndv = bloom_filter_ndv
229+
self.allow_single_file_parallelism = allow_single_file_parallelism
230+
self.maximum_parallel_row_group_writers = maximum_parallel_row_group_writers
231+
self.maximum_buffered_record_batches_per_stream = (
232+
maximum_buffered_record_batches_per_stream
233+
)
234+
self.column_specific_options = column_specific_options
235+
236+
237+
class ParquetColumnOptions:
238+
"""Parquet options for individual columns.
239+
240+
Contains the available options that can be applied for an individual Parquet column,
241+
replacing the global options in `ParquetWriterOptions`.
242+
243+
Attributes:
244+
encoding: Sets encoding for the column path. Valid values are: `plain`,
245+
`plain_dictionary`, `rle`, `bit_packed`, `delta_binary_packed`,
246+
`delta_length_byte_array`, `delta_byte_array`, `rle_dictionary`, and
247+
`byte_stream_split`. These values are not case-sensitive. If `None`, uses
248+
the default parquet options
249+
dictionary_enabled: Sets if dictionary encoding is enabled for the column path.
250+
If `None`, uses the default parquet options
251+
compression: Sets default parquet compression codec for the column path. Valid
252+
values are `uncompressed`, `snappy`, `gzip(level)`, `lzo`, `brotli(level)`,
253+
`lz4`, `zstd(level)`, and `lz4_raw`. These values are not case-sensitive. If
254+
`None`, uses the default parquet options.
255+
statistics_enabled: Sets if statistics are enabled for the column Valid values
256+
are: `none`, `chunk`, and `page` These values are not case sensitive. If
257+
`None`, uses the default parquet options.
258+
bloom_filter_enabled: Sets if bloom filter is enabled for the column path. If
259+
`None`, uses the default parquet options.
260+
bloom_filter_fpp: Sets bloom filter false positive probability for the column
261+
path. If `None`, uses the default parquet options.
262+
bloom_filter_ndv: Sets bloom filter number of distinct values. If `None`, uses
263+
the default parquet options.
264+
"""
265+
266+
def __init__(
267+
self,
268+
encoding: Optional[str] = None,
269+
dictionary_enabled: Optional[bool] = None,
270+
compression: Optional[str] = None,
271+
statistics_enabled: Optional[str] = None,
272+
bloom_filter_enabled: Optional[bool] = None,
273+
bloom_filter_fpp: Optional[float] = None,
274+
bloom_filter_ndv: Optional[int] = None,
275+
) -> None:
276+
"""Initialize the ParquetColumnOptions."""
277+
self.encoding = encoding
278+
self.dictionary_enabled = dictionary_enabled
279+
self.compression = compression
280+
self.statistics_enabled = statistics_enabled
281+
self.bloom_filter_enabled = bloom_filter_enabled
282+
self.bloom_filter_fpp = bloom_filter_fpp
283+
self.bloom_filter_ndv = bloom_filter_ndv
284+
285+
117286
class DataFrame:
118287
"""Two dimensional table representation of data.
119288
@@ -737,6 +906,58 @@ def write_parquet(
737906

738907
self.df.write_parquet(str(path), compression.value, compression_level)
739908

909+
def write_parquet_with_options(
910+
self, path: str | pathlib.Path, options: ParquetWriterOptions
911+
) -> None:
912+
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.
913+
914+
Allows advanced writer options to be set with `ParquetWriterOptions`.
915+
916+
Args:
917+
path: Path of the Parquet file to write.
918+
options: Sets the writer parquet options (see `ParquetWriterOptions`).
919+
"""
920+
options_internal = ParquetWriterOptionsInternal(
921+
options.data_pagesize_limit,
922+
options.write_batch_size,
923+
options.writer_version,
924+
options.skip_arrow_metadata,
925+
options.compression,
926+
options.dictionary_enabled,
927+
options.dictionary_page_size_limit,
928+
options.statistics_enabled,
929+
options.max_row_group_size,
930+
options.created_by,
931+
options.column_index_truncate_length,
932+
options.statistics_truncate_length,
933+
options.data_page_row_count_limit,
934+
options.encoding,
935+
options.bloom_filter_on_write,
936+
options.bloom_filter_fpp,
937+
options.bloom_filter_ndv,
938+
options.allow_single_file_parallelism,
939+
options.maximum_parallel_row_group_writers,
940+
options.maximum_buffered_record_batches_per_stream,
941+
)
942+
943+
column_specific_options_internal = {}
944+
for column, opts in (options.column_specific_options or {}).items():
945+
column_specific_options_internal[column] = ParquetColumnOptionsInternal(
946+
bloom_filter_enabled=opts.bloom_filter_enabled,
947+
encoding=opts.encoding,
948+
dictionary_enabled=opts.dictionary_enabled,
949+
compression=opts.compression,
950+
statistics_enabled=opts.statistics_enabled,
951+
bloom_filter_fpp=opts.bloom_filter_fpp,
952+
bloom_filter_ndv=opts.bloom_filter_ndv,
953+
)
954+
955+
self.df.write_parquet_with_options(
956+
str(path),
957+
options_internal,
958+
column_specific_options_internal,
959+
)
960+
740961
def write_json(self, path: str | pathlib.Path) -> None:
741962
"""Execute the :py:class:`DataFrame` and write the results to a JSON file.
742963

0 commit comments

Comments
 (0)