Skip to content

Commit a521204

Browse files
authored
feat: sepearate parquet compression (#150)
Closes #149
1 parent f7a45fa commit a521204

File tree

7 files changed

+47
-4
lines changed

7 files changed

+47
-4
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ tokio = { version = "1.44.0", features = ["rt-multi-thread"] }
4343
pyo3-log = "0.12.1"
4444
tracing = "0.1.41"
4545
pyo3-object_store = "0.2.0"
46+
parquet = "55.1.0"
4647

4748
[build-dependencies]
4849
cargo-lock = "10"

python/rustac/rustac.pyi

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ async def search_to(
350350
filter: str | dict[str, Any] | None = None,
351351
query: dict[str, Any] | None = None,
352352
format: str | None = None,
353+
parquet_compression: str | None = None,
353354
store: AnyObjectStore | None = None,
354355
use_duckdb: bool | None = None,
355356
) -> int:
@@ -389,6 +390,10 @@ async def search_to(
389390
It is recommended to use filter instead, if possible.
390391
format: The output format. If none, will be inferred from
391392
the outfile extension, and if that fails will fall back to compact JSON.
393+
parquet_compression: If writing stac-geoparquet, sets the compression
394+
algorithm.
395+
https://docs.rs/parquet/latest/parquet/basic/enum.Compression.html
396+
is a list of what's available.
392397
store: An optional [ObjectStore][]
393398
use_duckdb: Query with DuckDB. If None and the href has a
394399
'parquet' or 'geoparquet' extension, will be set to True. Defaults
@@ -428,6 +433,7 @@ async def write(
428433
value: dict[str, Any] | Sequence[dict[str, Any]],
429434
*,
430435
format: str | None = None,
436+
parquet_compression: str | None = None,
431437
store: AnyObjectStore | None = None,
432438
) -> dict[str, str] | None:
433439
"""
@@ -439,6 +445,10 @@ async def write(
439445
can be a STAC dictionary or a list of items.
440446
format: The output format to write. If not provided, will be
441447
inferred from the href's extension.
448+
parquet_compression: If writing stac-geoparquet, sets the compression
449+
algorithm.
450+
https://docs.rs/parquet/latest/parquet/basic/enum.Compression.html
451+
is a list of what's available.
442452
store: The object store to use for writing.
443453
444454
Returns:

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ pub enum Error {
4141
#[error(transparent)]
4242
StacDuckdb(#[from] stac_duckdb::Error),
4343

44+
#[error(transparent)]
45+
Parquet(#[from] parquet::errors::ParquetError),
46+
4447
#[error(transparent)]
4548
TokioTaskJon(#[from] tokio::task::JoinError),
4649
}

src/search.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub fn search<'py>(
5858
}
5959

6060
#[pyfunction]
61-
#[pyo3(signature = (outfile, href, *, intersects=None, ids=None, collections=None, max_items=None, limit=None, bbox=None, datetime=None, include=None, exclude=None, sortby=None, filter=None, query=None, format=None, store=None, use_duckdb=None, **kwargs))]
61+
#[pyo3(signature = (outfile, href, *, intersects=None, ids=None, collections=None, max_items=None, limit=None, bbox=None, datetime=None, include=None, exclude=None, sortby=None, filter=None, query=None, format=None, parquet_compression=None, store=None, use_duckdb=None, **kwargs))]
6262
#[allow(clippy::too_many_arguments)]
6363
pub fn search_to<'py>(
6464
py: Python<'py>,
@@ -77,6 +77,7 @@ pub fn search_to<'py>(
7777
filter: Option<StringOrDict>,
7878
query: Option<Bound<'py, PyDict>>,
7979
format: Option<String>,
80+
parquet_compression: Option<String>,
8081
store: Option<AnyObjectStore>,
8182
use_duckdb: Option<bool>,
8283
kwargs: Option<Bound<'_, PyDict>>,
@@ -95,12 +96,18 @@ pub fn search_to<'py>(
9596
query,
9697
kwargs,
9798
)?;
98-
let format = format
99+
let mut format = format
99100
.map(|s| s.parse())
100101
.transpose()
101102
.map_err(Error::from)?
102103
.or_else(|| Format::infer_from_href(&outfile))
103104
.unwrap_or_default();
105+
if matches!(format, Format::Geoparquet(_)) {
106+
if let Some(parquet_compression) = parquet_compression {
107+
tracing::debug!("setting parquet compression: {parquet_compression}");
108+
format = Format::Geoparquet(Some(parquet_compression.parse().map_err(Error::from)?));
109+
}
110+
}
104111
if use_duckdb
105112
.unwrap_or_else(|| matches!(Format::infer_from_href(&href), Some(Format::Geoparquet(_))))
106113
{

src/write.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ use stac::{Item, ItemCollection};
66
use stac_io::{Format, StacStore};
77

88
#[pyfunction]
9-
#[pyo3(signature = (href, value, *, format=None, store=None))]
9+
#[pyo3(signature = (href, value, *, format=None, parquet_compression=None, store=None))]
1010
pub fn write<'py>(
1111
py: Python<'py>,
1212
href: String,
1313
value: Bound<'_, PyAny>,
1414
format: Option<String>,
15+
parquet_compression: Option<String>,
1516
store: Option<AnyObjectStore>,
1617
) -> PyResult<Bound<'py, PyAny>> {
1718
let value: Value = pythonize::depythonize(&value)?;
@@ -25,10 +26,17 @@ pub fn write<'py>(
2526
serde_json::from_value(value).map_err(Error::from)?
2627
};
2728
pyo3_async_runtimes::tokio::future_into_py(py, async move {
28-
let format = format
29+
let mut format = format
2930
.and_then(|f| f.parse::<Format>().ok())
3031
.or_else(|| Format::infer_from_href(&href))
3132
.unwrap_or_default();
33+
if matches!(format, Format::Geoparquet(_)) {
34+
if let Some(parquet_compression) = parquet_compression {
35+
tracing::debug!("setting parquet compression: {parquet_compression}");
36+
format =
37+
Format::Geoparquet(Some(parquet_compression.parse().map_err(Error::from)?));
38+
}
39+
}
3240
let (stac_store, path) = if let Some(store) = store {
3341
(StacStore::from(store.into_dyn()), None)
3442
} else {

tests/test_write.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pyarrow.parquet
66
import rustac
77
import stac_geoparquet
8+
from pyarrow.parquet import ParquetFile
89
from rustac.store import LocalStore
910

1011

@@ -35,3 +36,14 @@ async def test_write_includes_type(tmp_path: Path, item: dict[str, Any]) -> None
3536
await rustac.write(str(tmp_path / "out.parquet"), [item])
3637
data_frame = pandas.read_parquet(str(tmp_path / "out.parquet"))
3738
assert "type" in data_frame.columns
39+
40+
41+
async def test_write_parquet_compression(tmp_path: Path, item: dict[str, Any]) -> None:
42+
await rustac.write(
43+
str(tmp_path / "out.parquet"), [item], parquet_compression="zstd(1)"
44+
)
45+
parquet_file = ParquetFile(tmp_path / "out.parquet")
46+
metadata = parquet_file.metadata
47+
for row_group in range(metadata.num_row_groups):
48+
for column in range(metadata.num_columns):
49+
assert metadata.row_group(row_group).column(column).compression == "ZSTD"

0 commit comments

Comments
 (0)