Skip to content

Commit 646f7ef

Browse files
authored
feat: Add ZIP compression type (#18257)
* feat: Add ZIP compression type * chore: add meta test for Zip
1 parent 6a3cd13 commit 646f7ef

File tree

19 files changed

+281
-10
lines changed

19 files changed

+281
-10
lines changed

Cargo.lock

Lines changed: 95 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ wiremock = "0.6"
540540
wkt = "0.11.1"
541541
xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] }
542542
xorfilter-rs = "0.5"
543+
zip = "3.0.0"
543544
zstd = "0.12.3"
544545

545546
# AST needed

src/common/compress/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ futures = { workspace = true }
1616
log = { workspace = true }
1717
pin-project = { workspace = true }
1818
serde = { workspace = true }
19+
zip = { workspace = true }
1920

2021
[dev-dependencies]
2122
env_logger = { workspace = true }

src/common/compress/src/compress_algorithms.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ pub enum CompressAlgorithm {
4040
///
4141
/// Similar to [`CompressAlgorithm::Deflate`] and [`CompressAlgorithm::Gzip`]
4242
Zlib,
43+
/// [Zip](https://pkware.cachefly.net/webdocs/APPNOTE/APPNOTE-6.3.10.TXT) compress format.
44+
Zip,
4345
/// [Zstd](https://github.com/facebook/zstd) compression algorithm
4446
Zstd,
4547
}
@@ -56,6 +58,7 @@ impl CompressAlgorithm {
5658
CompressAlgorithm::Xz => "xz",
5759
CompressAlgorithm::Zlib => "zl",
5860
CompressAlgorithm::Zstd => "zstd",
61+
CompressAlgorithm::Zip => "zip",
5962
}
6063
}
6164

@@ -72,6 +75,7 @@ impl CompressAlgorithm {
7275
"xz" => Some(CompressAlgorithm::Xz),
7376
"zl" => Some(CompressAlgorithm::Zlib),
7477
"zstd" | "zst" => Some(CompressAlgorithm::Zstd),
78+
"zip" => Some(CompressAlgorithm::Zip),
7579
_ => None,
7680
}
7781
}

src/common/compress/src/decode.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::io::Cursor;
16+
use std::io::Read;
1517
use std::io::Result;
1618
use std::pin::Pin;
1719
use std::task::Context;
@@ -36,6 +38,7 @@ use futures::AsyncBufRead;
3638
use futures::AsyncRead;
3739
use log::trace;
3840
use pin_project::pin_project;
41+
use zip::ZipArchive;
3942

4043
use crate::CompressAlgorithm;
4144

@@ -73,6 +76,9 @@ impl From<CompressAlgorithm> for DecompressCodec {
7376
CompressAlgorithm::Xz => DecompressCodec::Xz(XzDecoder::new()),
7477
CompressAlgorithm::Zlib => DecompressCodec::Zlib(ZlibDecoder::new()),
7578
CompressAlgorithm::Zstd => DecompressCodec::Zstd(ZstdDecoder::new()),
79+
CompressAlgorithm::Zip => {
80+
unreachable!("Zip type requires additional judgment and use `decompress_all_zip`")
81+
}
7682
}
7783
}
7884
}
@@ -350,6 +356,25 @@ impl DecompressDecoder {
350356
main.extend_from_slice(&tail);
351357
Ok(main)
352358
}
359+
360+
pub fn decompress_all_zip(compressed: &[u8]) -> databend_common_exception::Result<Vec<u8>> {
361+
let mut zip = ZipArchive::new(Cursor::new(compressed)).map_err(|e| {
362+
ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}"))
363+
})?;
364+
if zip.len() > 1 {
365+
return Err(ErrorCode::InvalidCompressionData(
366+
"Zip only supports single file",
367+
));
368+
}
369+
let mut file = zip.by_index(0).map_err(|e| {
370+
ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}"))
371+
})?;
372+
let mut bytes = Vec::new();
373+
file.read_to_end(&mut bytes)?;
374+
375+
Ok(bytes)
376+
}
377+
353378
// need to finish the decoding by adding a empty input
354379
pub fn decompress_batch(
355380
&mut self,
@@ -577,6 +602,22 @@ mod tests {
577602
Ok(())
578603
}
579604

605+
#[tokio::test]
606+
async fn test_decompress_reader_zip() -> databend_common_exception::Result<()> {
607+
let _ = env_logger::try_init();
608+
609+
let mut rng = ThreadRng::default();
610+
let mut content = vec![0; 16 * 1024 * 1024];
611+
rng.fill_bytes(&mut content);
612+
613+
let compressed_content = CompressCodec::compress_all_zip(&content)?;
614+
let result = DecompressDecoder::decompress_all_zip(&compressed_content)?;
615+
616+
assert_eq!(result, content);
617+
618+
Ok(())
619+
}
620+
580621
#[tokio::test]
581622
async fn test_decompress_reader_ontime_gzip() -> Result<()> {
582623
let _ = env_logger::try_init();

src/common/compress/src/encode.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::io::Cursor;
1516
use std::io::Result;
17+
use std::io::Write;
1618

1719
use async_compression::codec::BrotliEncoder;
1820
use async_compression::codec::BzEncoder;
@@ -27,6 +29,8 @@ use async_compression::util::PartialBuffer;
2729
use async_compression::Level;
2830
use brotli::enc::backward_references::BrotliEncoderParams;
2931
use databend_common_exception::ErrorCode;
32+
use zip::write::FileOptions;
33+
use zip::ZipWriter;
3034

3135
use crate::CompressAlgorithm;
3236

@@ -78,6 +82,9 @@ impl From<CompressAlgorithm> for CompressCodec {
7882
CompressAlgorithm::Zstd => {
7983
CompressCodec::Zstd(ZstdEncoder::new(Level::Default.into_zstd()))
8084
}
85+
CompressAlgorithm::Zip => {
86+
unreachable!("Zip type requires additional judgment and use `compress_all_zip`")
87+
}
8188
}
8289
}
8390
}
@@ -175,6 +182,26 @@ impl CompressCodec {
175182
}
176183
Ok(compress_bufs.concat())
177184
}
185+
186+
pub fn compress_all_zip(to_compress: &[u8]) -> databend_common_exception::Result<Vec<u8>> {
187+
let mut cursor = Cursor::new(Vec::new());
188+
189+
let mut zip = ZipWriter::new(&mut cursor);
190+
let options: FileOptions<()> = FileOptions::default()
191+
.compression_method(zip::CompressionMethod::Deflated)
192+
.unix_permissions(0o644);
193+
194+
// zip for archive files
195+
zip.start_file("tmp", options)
196+
.map_err(|e| ErrorCode::InvalidCompressionData(format!("zip start_file error: {e}")))?;
197+
198+
zip.write_all(to_compress)
199+
.map_err(|e| ErrorCode::InvalidCompressionData(format!("zip write error: {e}")))?;
200+
201+
zip.finish()
202+
.map_err(|e| ErrorCode::InvalidCompressionData(format!("zip finish error: {e}")))?;
203+
Ok(cursor.into_inner())
204+
}
178205
}
179206

180207
#[cfg(test)]

src/meta/app/src/principal/user_stage.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ pub enum StageFileCompression {
108108
Lzo,
109109
Snappy,
110110
Xz,
111+
Zip,
111112
None,
112113
}
113114

@@ -138,7 +139,8 @@ impl FromStr for StageFileCompression {
138139
"snappy" => Ok(StageFileCompression::Snappy),
139140
"xz" => Ok(StageFileCompression::Xz),
140141
"none" => Ok(StageFileCompression::None),
141-
_ => Err("Unknown file compression type, must one of { auto | gzip | bz2 | brotli | zstd | deflate | raw_deflate | lzo | snappy | xz | none }"
142+
"zip" => Ok(StageFileCompression::Zip),
143+
_ => Err("Unknown file compression type, must one of { auto | gzip | bz2 | brotli | zstd | deflate | raw_deflate | lzo | snappy | xz | zip | none }"
142144
.to_string()),
143145
}
144146
}
@@ -157,6 +159,7 @@ impl Display for StageFileCompression {
157159
StageFileCompression::Lzo => write!(f, "lzo"),
158160
StageFileCompression::Snappy => write!(f, "snappy"),
159161
StageFileCompression::Xz => write!(f, "xz"),
162+
StageFileCompression::Zip => write!(f, "zip"),
160163
StageFileCompression::None => write!(f, "none"),
161164
}
162165
}

src/meta/proto-conv/src/file_format_from_to_protobuf_impl.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl FromToProtoEnum for mt::principal::StageFileCompression {
8383
pb::StageFileCompression::Snappy => Ok(mt::principal::StageFileCompression::Snappy),
8484
pb::StageFileCompression::None => Ok(mt::principal::StageFileCompression::None),
8585
pb::StageFileCompression::Xz => Ok(mt::principal::StageFileCompression::Xz),
86+
pb::StageFileCompression::Zip => Ok(mt::principal::StageFileCompression::Zip),
8687
}
8788
}
8889

@@ -101,6 +102,7 @@ impl FromToProtoEnum for mt::principal::StageFileCompression {
101102
mt::principal::StageFileCompression::Snappy => Ok(pb::StageFileCompression::Snappy),
102103
mt::principal::StageFileCompression::None => Ok(pb::StageFileCompression::None),
103104
mt::principal::StageFileCompression::Xz => Ok(pb::StageFileCompression::Xz),
105+
mt::principal::StageFileCompression::Zip => Ok(pb::StageFileCompression::Zip),
104106
}
105107
}
106108
}

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
162162
(130, "2025-06-19: Add: New UDF imports and packages in udf definition"),
163163
(131, "2025-06-24: Add: add use_logic_type in ParquetFileFormatParam and AvroFileFormatParam"),
164164
(132, "2025-06-25: Remove: SequenceMeta.start"),
165+
(133, "2025-06-25: Add: Add new StageFileCompression Zip"),
165166
// Dear developer:
166167
// If you're gonna add a new metadata version, you'll have to add a test for it.
167168
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

0 commit comments

Comments
 (0)