Skip to content

Commit 7386293

Browse files
authored
feat: support specifying compression when unloading to parquet. (#17664)
compression = zstd | snappy default is zstd.
1 parent 261f0d1 commit 7386293

File tree

14 files changed

+172
-19
lines changed

14 files changed

+172
-19
lines changed

src/meta/app/src/principal/file_format.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl FileFormatParams {
109109
FileFormatParams::NdJson(v) => v.compression,
110110
FileFormatParams::Json(v) => v.compression,
111111
FileFormatParams::Xml(v) => v.compression,
112-
FileFormatParams::Parquet(_) => StageFileCompression::None,
112+
FileFormatParams::Parquet(v) => v.compression,
113113
FileFormatParams::Orc(_) => StageFileCompression::None,
114114
FileFormatParams::Avro(_) => StageFileCompression::None,
115115
}
@@ -153,18 +153,18 @@ impl FileFormatParams {
153153
StageFileFormatType::Xml => {
154154
let default = XmlFileFormatParams::default();
155155
let row_tag = reader.take_string(OPT_ROW_TAG, default.row_tag);
156-
let compression = reader.take_compression()?;
156+
let compression = reader.take_compression_default_none()?;
157157
FileFormatParams::Xml(XmlFileFormatParams {
158158
compression,
159159
row_tag,
160160
})
161161
}
162162
StageFileFormatType::Json => {
163-
let compression = reader.take_compression()?;
163+
let compression = reader.take_compression_default_none()?;
164164
FileFormatParams::Json(JsonFileFormatParams { compression })
165165
}
166166
StageFileFormatType::NdJson => {
167-
let compression = reader.take_compression()?;
167+
let compression = reader.take_compression_default_none()?;
168168
let missing_field_as = reader.options.remove(MISSING_FIELD_AS);
169169
let null_field_as = reader.options.remove(NULL_FIELD_AS);
170170
let null_if = parse_null_if(reader.options.remove(NULL_IF))?;
@@ -176,7 +176,7 @@ impl FileFormatParams {
176176
)?)
177177
}
178178
StageFileFormatType::Avro => {
179-
let compression = reader.take_compression()?;
179+
let compression = reader.take_compression_default_none()?;
180180
let missing_field_as = reader.options.remove(MISSING_FIELD_AS);
181181
let null_if = parse_null_if(reader.options.remove(NULL_IF))?;
182182
FileFormatParams::Avro(AvroFileFormatParams::try_create(
@@ -186,9 +186,11 @@ impl FileFormatParams {
186186
)?)
187187
}
188188
StageFileFormatType::Parquet => {
189+
let compression = reader.take_compression(StageFileCompression::Zstd)?;
189190
let missing_field_as = reader.options.remove(MISSING_FIELD_AS);
190191
let null_if = parse_null_if(reader.options.remove(NULL_IF))?;
191192
FileFormatParams::Parquet(ParquetFileFormatParams::try_create(
193+
compression,
192194
missing_field_as.as_deref(),
193195
null_if,
194196
)?)
@@ -201,7 +203,7 @@ impl FileFormatParams {
201203
}
202204
StageFileFormatType::Csv => {
203205
let default = CsvFileFormatParams::default();
204-
let compression = reader.take_compression()?;
206+
let compression = reader.take_compression_default_none()?;
205207
let headers = reader.take_u64(OPT_SKIP_HEADER, default.headers)?;
206208
let field_delimiter =
207209
reader.take_string(OPT_FIELD_DELIMITER, default.field_delimiter);
@@ -246,7 +248,7 @@ impl FileFormatParams {
246248
}
247249
StageFileFormatType::Tsv => {
248250
let default = TsvFileFormatParams::default();
249-
let compression = reader.take_compression()?;
251+
let compression = reader.take_compression_default_none()?;
250252
let headers = reader.take_u64(OPT_SKIP_HEADER, default.headers)?;
251253
let field_delimiter =
252254
reader.take_string(OPT_FIELD_DELIMITER, default.field_delimiter);
@@ -329,6 +331,7 @@ impl FileFormatParams {
329331
impl Default for FileFormatParams {
330332
fn default() -> Self {
331333
FileFormatParams::Parquet(ParquetFileFormatParams {
334+
compression: StageFileCompression::Zstd,
332335
missing_field_as: NullAs::Error,
333336
null_if: vec![],
334337
})
@@ -384,10 +387,13 @@ impl FileFormatOptionsReader {
384387
}
385388
}
386389

387-
fn take_compression(&mut self) -> Result<StageFileCompression> {
390+
fn take_compression_default_none(&mut self) -> Result<StageFileCompression> {
391+
self.take_compression(StageFileCompression::None)
392+
}
393+
fn take_compression(&mut self, default: StageFileCompression) -> Result<StageFileCompression> {
388394
match self.options.remove("compression") {
389395
Some(c) => StageFileCompression::from_str(&c).map_err(ErrorCode::IllegalFileFormat),
390-
None => Ok(StageFileCompression::None),
396+
None => Ok(default),
391397
}
392398
}
393399

@@ -752,16 +758,41 @@ impl AvroFileFormatParams {
752758
}
753759
}
754760

755-
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
761+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
756762
pub struct ParquetFileFormatParams {
763+
// used only for unload
764+
pub compression: StageFileCompression,
757765
pub missing_field_as: NullAs,
758766
pub null_if: Vec<String>,
759767
}
760768

769+
impl Default for ParquetFileFormatParams {
770+
fn default() -> Self {
771+
Self {
772+
compression: StageFileCompression::Zstd,
773+
missing_field_as: Default::default(),
774+
null_if: Default::default(),
775+
}
776+
}
777+
}
778+
761779
impl ParquetFileFormatParams {
762-
pub fn try_create(missing_field_as: Option<&str>, null_if: Vec<String>) -> Result<Self> {
780+
pub fn try_create(
781+
compression: StageFileCompression,
782+
missing_field_as: Option<&str>,
783+
null_if: Vec<String>,
784+
) -> Result<Self> {
785+
if !matches!(
786+
compression,
787+
StageFileCompression::Zstd | StageFileCompression::Snappy
788+
) {
789+
return Err(ErrorCode::InvalidArgument(format!(
790+
"compression algorithm {compression} not supported, only support Zstd and Snappy."
791+
)));
792+
}
763793
let missing_field_as = NullAs::parse(missing_field_as, MISSING_FIELD_AS, NullAs::Error)?;
764794
Ok(Self {
795+
compression,
765796
missing_field_as,
766797
null_if,
767798
})

src/meta/proto-conv/src/file_format_from_to_protobuf_impl.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_io::GeometryDataType;
2121
use databend_common_meta_app as mt;
2222
use databend_common_meta_app::principal::BinaryFormat;
2323
use databend_common_meta_app::principal::EmptyFieldAs;
24+
use databend_common_meta_app::principal::StageFileCompression;
2425
use databend_common_protos::pb;
2526
use num::FromPrimitive;
2627

@@ -335,14 +336,29 @@ impl FromToProto for mt::principal::ParquetFileFormatParams {
335336
fn from_pb(p: pb::ParquetFileFormatParams) -> Result<Self, Incompatible>
336337
where Self: Sized {
337338
reader_check_msg(p.ver, p.min_reader_ver)?;
338-
mt::principal::ParquetFileFormatParams::try_create(p.missing_field_as.as_deref(), p.null_if)
339-
.map_err(|e| Incompatible::new(format!("{e}")))
339+
let mut compression = mt::principal::StageFileCompression::from_pb_enum(
340+
FromPrimitive::from_i32(p.compression).ok_or_else(|| {
341+
Incompatible::new(format!("invalid StageFileCompression: {}", p.compression))
342+
})?,
343+
)?;
344+
if compression == StageFileCompression::Auto {
345+
compression = StageFileCompression::Zstd;
346+
};
347+
mt::principal::ParquetFileFormatParams::try_create(
348+
compression,
349+
p.missing_field_as.as_deref(),
350+
p.null_if,
351+
)
352+
.map_err(|e| Incompatible::new(format!("{e}")))
340353
}
341354

342355
fn to_pb(&self) -> Result<pb::ParquetFileFormatParams, Incompatible> {
356+
let compression =
357+
mt::principal::StageFileCompression::to_pb_enum(&self.compression)? as i32;
343358
Ok(pb::ParquetFileFormatParams {
344359
ver: VER,
345360
min_reader_ver: MIN_READER_VER,
361+
compression,
346362
missing_field_as: Some(self.missing_field_as.to_string()),
347363
null_if: self.null_if.clone(),
348364
})

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
152152
(120, "2025-02-11: Add: Add new UserPrivilege CreateWarehouse and new OwnershipObject::Warehouse"),
153153
(121, "2025-03-03: Add: Add new FileFormat AvroFileFormatParams"),
154154
(122, "2025-03-11: Add: table_meta and virtual_data_schema"),
155+
(123, "2025-03-27: Add: add compression in user.proto/ParquetFileFormatParam"),
155156
// Dear developer:
156157
// If you're gonna add a new metadata version, you'll have to add a test for it.
157158
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,4 @@ mod v119_virtual_column;
117117
mod v120_warehouse_ownershipobject;
118118
mod v121_avro_format_params;
119119
mod v122_virtual_schema;
120+
mod v123_parquet_format_params;

src/meta/proto-conv/tests/it/v032_file_format_params.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ fn test_decode_v32_parquet_file_format_params() -> anyhow::Result<()> {
138138

139139
let want = || {
140140
mt::principal::FileFormatParams::Parquet(ParquetFileFormatParams {
141+
compression: StageFileCompression::Zstd,
141142
missing_field_as: Default::default(),
142143
null_if: vec![],
143144
})

src/meta/proto-conv/tests/it/v066_stage_create_on.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use chrono::DateTime;
1616
use chrono::Utc;
1717
use databend_common_meta_app as mt;
18+
use databend_common_meta_app::principal::StageFileCompression;
1819
use databend_common_meta_app::principal::UserIdentity;
1920
use databend_common_meta_app::storage::StorageParams;
2021
use databend_common_meta_app::storage::StorageS3Config;
@@ -55,6 +56,7 @@ fn test_decode_v66_stage() -> anyhow::Result<()> {
5556
is_temporary: false,
5657
file_format_params: mt::principal::FileFormatParams::Parquet(
5758
mt::principal::ParquetFileFormatParams {
59+
compression: StageFileCompression::Zstd,
5860
missing_field_as: Default::default(),
5961
null_if: vec![],
6062
},

src/meta/proto-conv/tests/it/v093_parquet_format_params.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use databend_common_meta_app::principal::ParquetFileFormatParams;
16+
use databend_common_meta_app::principal::StageFileCompression;
1617
use fastrace::func_name;
1718

1819
use crate::common;
@@ -31,6 +32,7 @@ use crate::common;
3132
fn test_decode_v93_parquet_file_format_params() -> anyhow::Result<()> {
3233
let parquet_file_format_params_v93 = vec![34, 0, 34, 1, 97, 160, 6, 93, 168, 6, 24];
3334
let want = || ParquetFileFormatParams {
35+
compression: StageFileCompression::Zstd,
3436
missing_field_as: Default::default(),
3537
null_if: vec!["".to_string(), "a".to_string()],
3638
};

src/meta/proto-conv/tests/it/v099_parquet_format_params.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use databend_common_meta_app::principal::NullAs;
1616
use databend_common_meta_app::principal::ParquetFileFormatParams;
17+
use databend_common_meta_app::principal::StageFileCompression;
1718
use fastrace::func_name;
1819

1920
use crate::common;
@@ -35,6 +36,7 @@ fn test_decode_v99_parquet_file_format_params() -> anyhow::Result<()> {
3536
168, 6, 24,
3637
];
3738
let want = || ParquetFileFormatParams {
39+
compression: StageFileCompression::Zstd,
3840
missing_field_as: NullAs::FieldDefault,
3941
null_if: vec!["".to_string(), "a".to_string()],
4042
};
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_meta_app::principal::NullAs;
16+
use databend_common_meta_app::principal::ParquetFileFormatParams;
17+
use databend_common_meta_app::principal::StageFileCompression;
18+
use fastrace::func_name;
19+
20+
use crate::common;
21+
22+
// These bytes are built when a new version in introduced,
23+
24+
// and are kept for backward compatibility test.
25+
//
26+
// *************************************************************
27+
// * These messages should never be updated, *
28+
// * only be added when a new version is added, *
29+
// * or be removed when an old version is no longer supported. *
30+
// *************************************************************
31+
//
32+
#[test]
33+
fn test_decode_v123_parquet_file_format_params() -> anyhow::Result<()> {
34+
let parquet_file_format_params_v123 = vec![
35+
10, 13, 70, 73, 69, 76, 68, 95, 68, 69, 70, 65, 85, 76, 84, 16, 8, 34, 0, 34, 1, 97, 160,
36+
6, 123, 168, 6, 24,
37+
];
38+
39+
let want = || ParquetFileFormatParams {
40+
compression: StageFileCompression::Snappy,
41+
missing_field_as: NullAs::FieldDefault,
42+
null_if: vec!["".to_string(), "a".to_string()],
43+
};
44+
common::test_load_old(
45+
func_name!(),
46+
parquet_file_format_params_v123.as_slice(),
47+
123,
48+
want(),
49+
)?;
50+
common::test_pb_from_to(func_name!(), want())?;
51+
Ok(())
52+
}

src/meta/protos/proto/file_format.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ message ParquetFileFormatParams {
9393
uint64 ver = 100;
9494
uint64 min_reader_ver = 101;
9595
optional string missing_field_as = 1;
96+
StageFileCompression compression = 2;
9697
repeated string null_if = 4;
9798
}
9899

0 commit comments

Comments
 (0)