Skip to content

Commit fe9ba2d

Browse files
authored
Merge branch 'main' into view-schema
2 parents f1ea177 + ae9720f commit fe9ba2d

File tree

28 files changed

+415
-93
lines changed

28 files changed

+415
-93
lines changed

Cargo.lock

Lines changed: 41 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ build-native:
6262
unit-test:
6363
ulimit -n 10000;ulimit -s 16384; RUST_LOG="ERROR" bash ./scripts/ci/ci-run-unit-tests.sh
6464

65+
miri:
66+
cargo miri setup
67+
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --no-default-features
68+
6569
embedded-meta-test: build
6670
rm -rf ./_meta_embedded*
6771
bash ./scripts/ci/ci-run-tests-embedded-meta.sh

scripts/setup/rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[toolchain]
22
channel = "nightly-2022-11-14"
3-
components = ["rustfmt", "clippy", "rust-src"]
3+
components = ["rustfmt", "clippy", "rust-src", "miri"]

src/common/arrow/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ simd = ["arrow/simd"]
3434
# Workspace dependencies
3535

3636
# Crates.io dependencies
37-
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "95e117d", default-features = false, features = [
37+
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "1da33ac", default-features = false, features = [
3838
"io_parquet",
3939
"io_parquet_compression",
4040
] }
41+
4142
arrow-format = { version = "0.8.0", features = ["flight-data", "flight-service", "ipc"] }
4243
futures = "0.3.24"
43-
parquet2 = { version = "0.16.3", default_features = false }
44+
parquet2 = { version = "0.17.0", default_features = false }
4445

4546
[dev-dependencies]

src/common/jsonb/src/functions.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,7 @@ pub fn parse_json_path(path: &[u8]) -> Result<Vec<JsonPath>, Error> {
816816
}
817817
let s = std::str::from_utf8(&path[prev_idx..idx - 2])?;
818818
let json_path = JsonPath::String(Cow::Borrowed(s));
819+
819820
json_paths.push(json_path);
820821
} else {
821822
prev_idx = idx - 1;
@@ -836,6 +837,34 @@ pub fn parse_json_path(path: &[u8]) -> Result<Vec<JsonPath>, Error> {
836837
return Err(Error::InvalidToken);
837838
}
838839
}
840+
} else if c == b'"' {
841+
prev_idx = idx;
842+
loop {
843+
let c = read_char(path, &mut idx)?;
844+
if c == b'\\' {
845+
idx += 1;
846+
let c = read_char(path, &mut idx)?;
847+
if c == b'"' {
848+
idx += 1;
849+
}
850+
} else if c != b'"' {
851+
idx += 1;
852+
} else {
853+
// Try to read to check if has extra strings, string value can only have one.
854+
let c = read_char(path, &mut idx);
855+
match c {
856+
Ok(_) => return Err(Error::InvalidToken),
857+
Err(_) => break,
858+
}
859+
}
860+
}
861+
let s = std::str::from_utf8(&path[prev_idx..idx - 1])?;
862+
let json_path = JsonPath::String(Cow::Borrowed(s));
863+
if json_paths.is_empty() {
864+
json_paths.push(json_path);
865+
} else {
866+
return Err(Error::InvalidToken);
867+
}
839868
} else {
840869
if c == b':' || c == b'.' {
841870
if idx == 1 {

src/common/jsonb/tests/it/functions.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use common_jsonb::object_keys;
3232
use common_jsonb::parse_json_path;
3333
use common_jsonb::parse_value;
3434
use common_jsonb::to_string;
35+
use common_jsonb::Error;
3536
use common_jsonb::JsonPath;
3637
use common_jsonb::Number;
3738
use common_jsonb::Object;
@@ -361,6 +362,8 @@ fn test_parse_json_path() {
361362
JsonPath::String(Cow::from("k2")),
362363
JsonPath::String(Cow::from("k3")),
363364
]),
365+
("\"k1\"", vec![JsonPath::String(Cow::from("k1"))]),
366+
("\"k1k2\"", vec![JsonPath::String(Cow::from("k1k2"))]),
364367
(r#"k1["k2"][1]"#, vec![
365368
JsonPath::String(Cow::from("k1")),
366369
JsonPath::String(Cow::from("k2")),
@@ -372,6 +375,18 @@ fn test_parse_json_path() {
372375
let path = parse_json_path(s.as_bytes()).unwrap();
373376
assert_eq!(&path[..], &expect[..]);
374377
}
378+
379+
let wrong_sources = vec![
380+
(r#"\"\"\\k1\"\""#, Error::InvalidToken),
381+
(r#"\\k1\\'"#, Error::InvalidToken),
382+
];
383+
for (s, expect) in wrong_sources {
384+
let path = parse_json_path(s.as_bytes());
385+
match path {
386+
Ok(_) => println!(),
387+
Err(_) => assert_eq!(Error::InvalidToken, expect),
388+
}
389+
}
375390
}
376391

377392
#[test]

src/query/datablocks/src/serialize.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub fn serialize_data_blocks_with_compression(
4040
write_statistics: false,
4141
compression,
4242
version: Version::V2,
43+
data_pagesize_limit: None,
4344
};
4445
let batches = blocks
4546
.into_iter()
@@ -87,16 +88,12 @@ pub fn serialize_data_blocks(
8788
serialize_data_blocks_with_compression(blocks, schema, buf, CompressionOptions::Lz4Raw)
8889
}
8990

90-
fn col_encoding(_data_type: &ArrowDataType) -> Encoding {
91-
// Although encoding does work, parquet2 has not implemented decoding of DeltaLengthByteArray yet, we fallback to Plain
92-
// From parquet2: Decoding "DeltaLengthByteArray"-encoded required V2 pages is not yet implemented for Binary.
93-
//
94-
// match data_type {
95-
// ArrowDataType::Binary
96-
// | ArrowDataType::LargeBinary
97-
// | ArrowDataType::Utf8
98-
// | ArrowDataType::LargeUtf8 => Encoding::DeltaLengthByteArray,
99-
// _ => Encoding::Plain,
100-
//}
101-
Encoding::Plain
91+
fn col_encoding(data_type: &ArrowDataType) -> Encoding {
92+
match data_type {
93+
ArrowDataType::Binary
94+
| ArrowDataType::LargeBinary
95+
| ArrowDataType::Utf8
96+
| ArrowDataType::LargeUtf8 => Encoding::DeltaLengthByteArray,
97+
_ => Encoding::Plain,
98+
}
10299
}

src/query/datavalues/src/columns/column.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,6 @@ pub trait Column: Send + Sync {
149149
DFTryFrom::try_from(value)
150150
}
151151

152-
fn to_values(&self) -> Vec<DataValue> {
153-
(0..self.len()).map(|i| self.get(i)).collect()
154-
}
155-
156152
/// Visit each row value of Column
157153
fn for_each<F>(&self, f: F)
158154
where

src/query/functions/src/aggregates/aggregate_approx_count_distinct.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,7 @@ impl AggregateFunction for AggregateApproxCountDistinctFunction {
120120
fn accumulate_row(&self, place: StateAddr, columns: &[ColumnRef], _row: usize) -> Result<()> {
121121
let state = place.get::<AggregateApproxCountDistinctState>();
122122
for column in columns {
123-
column.to_values().iter().for_each(|value| {
124-
state.hll.push(value);
125-
});
123+
(0..column.len()).for_each(|i| state.hll.push(&column.get(i)));
126124
}
127125
Ok(())
128126
}

src/query/pipeline/sinks/src/processors/sinks/subquery_receive_sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl Sink for SubqueryReceiveSink {
7373
fn consume(&mut self, data_block: DataBlock) -> Result<()> {
7474
for column_index in 0..data_block.num_columns() {
7575
let column = data_block.column(column_index);
76-
let mut values = column.to_values();
76+
let mut values = (0..column.len()).map(|i| column.get(i)).collect();
7777
self.input_columns[column_index].append(&mut values)
7878
}
7979

0 commit comments

Comments
 (0)