Skip to content

Commit 3eb3798

Browse files
authored
chore: Vendor parquet2 instead (#15422)
* chore: Vendor parquet2 instead Signed-off-by: Xuanwo <github@xuanwo.io> * Add files Signed-off-by: Xuanwo <github@xuanwo.io> * Fix license Signed-off-by: Xuanwo <github@xuanwo.io> * ignore typos from upstream Signed-off-by: Xuanwo <github@xuanwo.io> * Format toml Signed-off-by: Xuanwo <github@xuanwo.io> * Format arrow Signed-off-by: Xuanwo <github@xuanwo.io> * Remove tests Signed-off-by: Xuanwo <github@xuanwo.io> * Fix test fail under all features Signed-off-by: Xuanwo <github@xuanwo.io> --------- Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent 4e2227c commit 3eb3798

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+14799
-33
lines changed

.typos.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ extend-exclude = [
2121
"src/meta",
2222
"src/query",
2323
"src/binaries",
24+
# Forked from upstream
25+
"src/common/parquet2"
2426
]

Cargo.lock

Lines changed: 33 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ members = [
2929
"src/common/storage",
3030
"src/common/vector",
3131
"src/common/license",
32+
"src/common/parquet2",
3233
# Query
3334
"src/query/ast",
3435
"src/query/async_functions",
@@ -271,7 +272,6 @@ rpath = false
271272
[patch.crates-io]
272273
# If there are dependencies that need patching, they can be listed below.
273274
arrow-format = { git = "https://github.com/everpcpc/arrow-format", rev = "ad8f2dd" }
274-
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "b0e6545" }
275275
metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "fc2ecd1" }
276276
icelake = { git = "https://github.com/icelake-io/icelake", rev = "54fd72f" }
277277
micromarshal = { git = "https://github.com/ariesdevil/opensrv", rev = "6c96813" }

src/common/arrow/Cargo.toml

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
[package]
2+
description = "Arrow implementation forked from arrow2 and native format implementation forked from strawboat."
23
edition = "2021"
34
license = "Apache-2.0"
45
name = "databend-common-arrow"
56
publish = false
67
version = "0.1.0"
7-
description = "Arrow implementation forked from arrow2 and native format implementation forked from strawboat."
88

99
[lib]
1010
doctest = false
@@ -14,9 +14,9 @@ test = true
1414
default = ["arrow-default", "parquet-default"]
1515

1616
arrow = ["arrow-buffer", "arrow-schema", "arrow-data", "arrow-array"]
17+
io_flight = ["io_ipc", "arrow-format/flight-data"]
1718
io_ipc = []
1819
io_ipc_compression = []
19-
io_flight = ["io_ipc", "arrow-format/flight-data"]
2020

2121
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
2222
io_parquet = ["io_ipc", "base64", "streaming-iterator", "fallible-streaming-iterator"]
@@ -34,21 +34,15 @@ io_parquet_compression = [
3434
io_parquet_sample_test = ["io_parquet_async"]
3535

3636
# compression backends
37-
io_parquet_zstd = ["parquet2/zstd"]
37+
io_parquet_brotli = ["parquet2/brotli"]
3838
io_parquet_gzip = ["parquet2/gzip"]
39-
io_parquet_snappy = ["parquet2/snappy"]
4039
io_parquet_lz4 = ["parquet2/lz4"]
41-
io_parquet_brotli = ["parquet2/brotli"]
40+
io_parquet_snappy = ["parquet2/snappy"]
41+
io_parquet_zstd = ["parquet2/zstd"]
4242

4343
# parquet bloom filter functions
4444
io_parquet_bloom_filter = ["parquet2/bloom_filter"]
4545

46-
compute_aggregate = []
47-
compute_cast = ["lexical-core", "compute_take"]
48-
compute_concatenate = []
49-
compute_merge_sort = ["itertools", "compute_sort"]
50-
compute_sort = ["compute_take"]
51-
compute_take = []
5246
compute = [
5347
"compute_aggregate",
5448
"compute_cast",
@@ -57,6 +51,12 @@ compute = [
5751
"compute_sort",
5852
"compute_take",
5953
]
54+
compute_aggregate = []
55+
compute_cast = ["lexical-core", "compute_take"]
56+
compute_concatenate = []
57+
compute_merge_sort = ["itertools", "compute_sort"]
58+
compute_sort = ["compute_take"]
59+
compute_take = []
6060

6161
serde_types = ["serde", "serde_derive"]
6262
simd = []
@@ -104,7 +104,10 @@ dyn-clone = "1"
104104
either = "1.9"
105105
foreign_vec = "0.1.0"
106106
num-traits = "0.2"
107-
parquet2 = { version = "0.17.0", default_features = false, features = ["serde_types", "async"] }
107+
parquet2 = { package = "databend-common-parquet2", path = "../parquet2", default_features = false, features = [
108+
"serde_types",
109+
"async",
110+
] }
108111

109112
# for decimal i256
110113
ethnum = { workspace = true }

src/common/parquet2/.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
target
2+
Cargo.lock
3+
.idea
4+
venv
5+
fixtures/

src/common/parquet2/Cargo.toml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
[package]
2+
description = "Safe implementation of parquet IO, forked from parquet2."
3+
edition = "2021"
4+
license = "Apache-2.0"
5+
name = "databend-common-parquet2"
6+
version = "0.1.0"
7+
8+
[lib]
9+
bench = false
10+
name = "parquet2"
11+
12+
[dependencies]
13+
parquet-format-safe = "0.2"
14+
seq-macro = { version = "0.3", default-features = false }
15+
streaming-decompression = "0.1"
16+
17+
async-stream = { version = "0.3.3", optional = true }
18+
futures = { version = "0.3", optional = true }
19+
20+
brotli = { version = "^3.3", optional = true }
21+
flate2 = { version = "^1.0", optional = true, default-features = false }
22+
lz4 = { version = "1.24", optional = true }
23+
serde = { version = "^1.0", optional = true, features = ["derive"] }
24+
snap = { version = "^1.1", optional = true }
25+
zstd = { version = "^0.12", optional = true, default-features = false }
26+
27+
xxhash-rust = { version = "0.8", optional = true, features = ["xxh64"] }
28+
29+
[dev-dependencies]
30+
criterion = "0.4"
31+
rand = "0.8"
32+
tokio = { version = "1", features = ["macros", "rt"] }
33+
34+
[features]
35+
async = ["async-stream", "futures", "parquet-format-safe/async"]
36+
bloom_filter = ["xxhash-rust"]
37+
default = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter"]
38+
full = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter", "async"]
39+
gzip = ["flate2/rust_backend"]
40+
gzip_zlib_ng = ["flate2/zlib-ng"]
41+
serde_types = ["serde"]
42+
snappy = ["snap"]

src/common/parquet2/LICENSE

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
Copyright [2021] [Jorge C Leitao]
2+
Copyright 2021 Datafuse Labs
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright [2021] [Jorge C Leitao]
2+
// Copyright 2021 Datafuse Labs
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
use xxhash_rust::xxh64::xxh64;
17+
18+
use crate::types::NativeType;
19+
20+
const SEED: u64 = 0;
21+
22+
/// (xxh64) hash of a [`NativeType`].
23+
#[inline]
24+
pub fn hash_native<T: NativeType>(value: T) -> u64 {
25+
xxh64(value.to_le_bytes().as_ref(), SEED)
26+
}
27+
28+
/// (xxh64) hash of a sequence of bytes (e.g. ByteArray).
29+
#[inline]
30+
pub fn hash_byte<A: AsRef<[u8]>>(value: A) -> u64 {
31+
xxh64(value.as_ref(), SEED)
32+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright [2021] [Jorge C Leitao]
2+
// Copyright 2021 Datafuse Labs
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
//! API to read and use bloom filters
17+
mod hash;
18+
mod read;
19+
mod split_block;
20+
21+
pub use hash::hash_byte;
22+
pub use hash::hash_native;
23+
pub use read::read;
24+
pub use split_block::insert;
25+
pub use split_block::is_in_set;
26+
27+
#[cfg(test)]
28+
mod tests {
29+
use super::*;
30+
31+
#[test]
32+
fn basics() {
33+
let mut bitset = vec![0; 32];
34+
35+
// insert
36+
for a in 0..10i64 {
37+
let hash = hash_native(a);
38+
insert(&mut bitset, hash);
39+
}
40+
41+
// bloom filter produced by parquet-mr/spark for a column of i64 (0..=10)
42+
// import pyspark.sql // 3.2.1
43+
// spark = pyspark.sql.SparkSession.builder.getOrCreate()
44+
// spark.conf.set("parquet.bloom.filter.enabled", True)
45+
// spark.conf.set("parquet.bloom.filter.expected.ndv", 10)
46+
// spark.conf.set("parquet.bloom.filter.max.bytes", 32)
47+
//
48+
// data = [(i % 10,) for i in range(100)]
49+
// df = spark.createDataFrame(data, ["id"]).repartition(1)
50+
//
51+
// df.write.parquet("bla.parquet", mode = "overwrite")
52+
let expected: &[u8] = &[
53+
24, 130, 24, 8, 134, 8, 68, 6, 2, 101, 128, 10, 64, 2, 38, 78, 114, 1, 64, 38, 1, 192,
54+
194, 152, 64, 70, 0, 36, 56, 121, 64, 0,
55+
];
56+
assert_eq!(bitset, expected);
57+
58+
// check
59+
for a in 0..11i64 {
60+
let hash = hash_native(a);
61+
62+
let valid = is_in_set(&bitset, hash);
63+
64+
assert_eq!(a < 10, valid);
65+
}
66+
}
67+
68+
#[test]
69+
fn binary() {
70+
let mut bitset = vec![0; 32];
71+
72+
// insert
73+
for a in 0..10i64 {
74+
let value = format!("a{}", a);
75+
let hash = hash_byte(value);
76+
insert(&mut bitset, hash);
77+
}
78+
79+
// bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10
80+
let expected: &[u8] = &[
81+
200, 1, 80, 20, 64, 68, 8, 109, 6, 37, 4, 67, 144, 80, 96, 32, 8, 132, 43, 33, 0, 5,
82+
99, 65, 2, 0, 224, 44, 64, 78, 96, 4,
83+
];
84+
assert_eq!(bitset, expected);
85+
}
86+
}

0 commit comments

Comments
 (0)