Skip to content

Commit b1233fc

Browse files
authored
Merge branch 'main' into binary-arrow
2 parents 1f3c0bb + 9de72eb commit b1233fc

File tree

21 files changed

+352
-70
lines changed

21 files changed

+352
-70
lines changed

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ build-native:
6262
unit-test:
6363
ulimit -n 10000;ulimit -s 16384; RUST_LOG="ERROR" bash ./scripts/ci/ci-run-unit-tests.sh
6464

65+
miri:
66+
cargo miri setup
67+
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --no-default-features
68+
6569
embedded-meta-test: build
6670
rm -rf ./_meta_embedded*
6771
bash ./scripts/ci/ci-run-tests-embedded-meta.sh

scripts/setup/rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[toolchain]
22
channel = "nightly-2022-11-14"
3-
components = ["rustfmt", "clippy", "rust-src"]
3+
components = ["rustfmt", "clippy", "rust-src", "miri"]

src/common/jsonb/src/functions.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,7 @@ pub fn parse_json_path(path: &[u8]) -> Result<Vec<JsonPath>, Error> {
816816
}
817817
let s = std::str::from_utf8(&path[prev_idx..idx - 2])?;
818818
let json_path = JsonPath::String(Cow::Borrowed(s));
819+
819820
json_paths.push(json_path);
820821
} else {
821822
prev_idx = idx - 1;
@@ -836,6 +837,34 @@ pub fn parse_json_path(path: &[u8]) -> Result<Vec<JsonPath>, Error> {
836837
return Err(Error::InvalidToken);
837838
}
838839
}
840+
} else if c == b'"' {
841+
prev_idx = idx;
842+
loop {
843+
let c = read_char(path, &mut idx)?;
844+
if c == b'\\' {
845+
idx += 1;
846+
let c = read_char(path, &mut idx)?;
847+
if c == b'"' {
848+
idx += 1;
849+
}
850+
} else if c != b'"' {
851+
idx += 1;
852+
} else {
853+
// Try to read to check if has extra strings, string value can only have one.
854+
let c = read_char(path, &mut idx);
855+
match c {
856+
Ok(_) => return Err(Error::InvalidToken),
857+
Err(_) => break,
858+
}
859+
}
860+
}
861+
let s = std::str::from_utf8(&path[prev_idx..idx - 1])?;
862+
let json_path = JsonPath::String(Cow::Borrowed(s));
863+
if json_paths.is_empty() {
864+
json_paths.push(json_path);
865+
} else {
866+
return Err(Error::InvalidToken);
867+
}
839868
} else {
840869
if c == b':' || c == b'.' {
841870
if idx == 1 {

src/common/jsonb/tests/it/functions.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use common_jsonb::object_keys;
3232
use common_jsonb::parse_json_path;
3333
use common_jsonb::parse_value;
3434
use common_jsonb::to_string;
35+
use common_jsonb::Error;
3536
use common_jsonb::JsonPath;
3637
use common_jsonb::Number;
3738
use common_jsonb::Object;
@@ -361,6 +362,8 @@ fn test_parse_json_path() {
361362
JsonPath::String(Cow::from("k2")),
362363
JsonPath::String(Cow::from("k3")),
363364
]),
365+
("\"k1\"", vec![JsonPath::String(Cow::from("k1"))]),
366+
("\"k1k2\"", vec![JsonPath::String(Cow::from("k1k2"))]),
364367
(r#"k1["k2"][1]"#, vec![
365368
JsonPath::String(Cow::from("k1")),
366369
JsonPath::String(Cow::from("k2")),
@@ -372,6 +375,18 @@ fn test_parse_json_path() {
372375
let path = parse_json_path(s.as_bytes()).unwrap();
373376
assert_eq!(&path[..], &expect[..]);
374377
}
378+
379+
let wrong_sources = vec![
380+
(r#"\"\"\\k1\"\""#, Error::InvalidToken),
381+
(r#"\\k1\\'"#, Error::InvalidToken),
382+
];
383+
for (s, expect) in wrong_sources {
384+
let path = parse_json_path(s.as_bytes());
385+
match path {
386+
Ok(_) => println!(),
387+
Err(_) => assert_eq!(Error::InvalidToken, expect),
388+
}
389+
}
375390
}
376391

377392
#[test]

src/query/datavalues/src/columns/column.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,6 @@ pub trait Column: Send + Sync {
149149
DFTryFrom::try_from(value)
150150
}
151151

152-
fn to_values(&self) -> Vec<DataValue> {
153-
(0..self.len()).map(|i| self.get(i)).collect()
154-
}
155-
156152
/// Visit each row value of Column
157153
fn for_each<F>(&self, f: F)
158154
where

src/query/functions/src/aggregates/aggregate_approx_count_distinct.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,7 @@ impl AggregateFunction for AggregateApproxCountDistinctFunction {
120120
fn accumulate_row(&self, place: StateAddr, columns: &[ColumnRef], _row: usize) -> Result<()> {
121121
let state = place.get::<AggregateApproxCountDistinctState>();
122122
for column in columns {
123-
column.to_values().iter().for_each(|value| {
124-
state.hll.push(value);
125-
});
123+
(0..column.len()).for_each(|i| state.hll.push(&column.get(i)));
126124
}
127125
Ok(())
128126
}

src/query/pipeline/sinks/src/processors/sinks/subquery_receive_sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl Sink for SubqueryReceiveSink {
7373
fn consume(&mut self, data_block: DataBlock) -> Result<()> {
7474
for column_index in 0..data_block.num_columns() {
7575
let column = data_block.column(column_index);
76-
let mut values = column.to_values();
76+
let mut values = (0..column.len()).map(|i| column.get(i)).collect();
7777
self.input_columns[column_index].append(&mut values)
7878
}
7979

src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,9 +663,10 @@ impl CompactSegmentTestFixture {
663663
let mut stats_acc = StatisticsAccumulator::default();
664664
for block in blocks {
665665
let block = block?;
666-
let col_stats = gen_columns_statistics(&block)?;
666+
let col_stats = gen_columns_statistics(&block, None)?;
667667

668-
let mut block_statistics = BlockStatistics::from(&block, "".to_owned(), None)?;
668+
let mut block_statistics =
669+
BlockStatistics::from(&block, "".to_owned(), None, None)?;
669670
let block_meta = block_writer.write(block, col_stats, None).await?;
670671
block_statistics.block_file_location = block_meta.location.0.clone();
671672

src/query/service/tests/it/storages/fuse/statistics.rs

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,50 @@ use crate::storages::fuse::table_test_fixture::TestFixture;
4646

4747
#[test]
4848
fn test_ft_stats_block_stats() -> common_exception::Result<()> {
49-
let schema = DataSchemaRefExt::create(vec![DataField::new("a", i32::to_data_type())]);
50-
let block = DataBlock::create(schema, vec![Series::from_data(vec![1, 2, 3])]);
51-
let r = gen_columns_statistics(&block)?;
52-
assert_eq!(1, r.len());
49+
let schema = DataSchemaRefExt::create(vec![
50+
DataField::new("a", i32::to_data_type()),
51+
DataField::new("b", Vu8::to_data_type()),
52+
]);
53+
let block = DataBlock::create(schema, vec![
54+
Series::from_data(vec![1, 2, 3]),
55+
Series::from_data(vec!["aa", "aa", "bb"]),
56+
]);
57+
let r = gen_columns_statistics(&block, None)?;
58+
assert_eq!(2, r.len());
59+
let col_stats = r.get(&0).unwrap();
60+
assert_eq!(col_stats.min, DataValue::Int64(1));
61+
assert_eq!(col_stats.max, DataValue::Int64(3));
62+
assert_eq!(col_stats.distinct_of_values, Some(3));
63+
let col_stats = r.get(&1).unwrap();
64+
assert_eq!(col_stats.min, DataValue::String(b"aa".to_vec()));
65+
assert_eq!(col_stats.max, DataValue::String(b"bb".to_vec()));
66+
assert_eq!(col_stats.distinct_of_values, Some(2));
67+
Ok(())
68+
}
69+
70+
#[test]
71+
fn test_ft_stats_block_stats_with_column_distinct_count() -> common_exception::Result<()> {
72+
let schema = DataSchemaRefExt::create(vec![
73+
DataField::new("a", i32::to_data_type()),
74+
DataField::new("b", Vu8::to_data_type()),
75+
]);
76+
let block = DataBlock::create(schema, vec![
77+
Series::from_data(vec![1, 2, 3]),
78+
Series::from_data(vec!["aa", "aa", "bb"]),
79+
]);
80+
let mut column_distinct_count = HashMap::new();
81+
column_distinct_count.insert(0, 3);
82+
column_distinct_count.insert(1, 2);
83+
let r = gen_columns_statistics(&block, Some(column_distinct_count))?;
84+
assert_eq!(2, r.len());
5385
let col_stats = r.get(&0).unwrap();
5486
assert_eq!(col_stats.min, DataValue::Int64(1));
5587
assert_eq!(col_stats.max, DataValue::Int64(3));
88+
assert_eq!(col_stats.distinct_of_values, Some(3));
89+
let col_stats = r.get(&1).unwrap();
90+
assert_eq!(col_stats.min, DataValue::String(b"aa".to_vec()));
91+
assert_eq!(col_stats.max, DataValue::String(b"bb".to_vec()));
92+
assert_eq!(col_stats.distinct_of_values, Some(2));
5693
Ok(())
5794
}
5895

@@ -69,7 +106,7 @@ fn test_ft_tuple_stats_block_stats() -> common_exception::Result<()> {
69106
let column = StructColumn::from_data(inner_columns, tuple_data_type).arc();
70107

71108
let block = DataBlock::create(schema, vec![column]);
72-
let r = gen_columns_statistics(&block)?;
109+
let r = gen_columns_statistics(&block, None)?;
73110
assert_eq!(2, r.len());
74111
let col0_stats = r.get(&0).unwrap();
75112
assert_eq!(col0_stats.min, DataValue::Int64(1));
@@ -89,7 +126,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> {
89126
let blocks = TestFixture::gen_sample_blocks_ex(num_of_blocks, rows_per_block, val_start_with);
90127
let col_stats = blocks
91128
.iter()
92-
.map(|b| gen_columns_statistics(&b.clone().unwrap()))
129+
.map(|b| gen_columns_statistics(&b.clone().unwrap(), None))
93130
.collect::<common_exception::Result<Vec<_>>>()?;
94131
let r = reducers::reduce_block_statistics(&col_stats, None);
95132
assert!(r.is_ok());
@@ -158,8 +195,9 @@ async fn test_accumulator() -> common_exception::Result<()> {
158195

159196
for item in blocks {
160197
let block = item?;
161-
let col_stats = gen_columns_statistics(&block)?;
162-
let block_statistics = BlockStatistics::from(&block, "does_not_matter".to_owned(), None)?;
198+
let col_stats = gen_columns_statistics(&block, None)?;
199+
let block_statistics =
200+
BlockStatistics::from(&block, "does_not_matter".to_owned(), None, None)?;
163201
let block_writer = BlockWriter::new(&operator, &loc_generator);
164202
let block_meta = block_writer.write(block, col_stats, None).await?;
165203
stats_acc.add_with_block_meta(block_meta, block_statistics)?;
@@ -344,7 +382,7 @@ fn test_ft_stats_block_stats_string_columns_trimming_using_eval() -> common_exce
344382
let max_expr = max_col.get(0);
345383

346384
// generate the statistics of column
347-
let stats_of_columns = gen_columns_statistics(&block).unwrap();
385+
let stats_of_columns = gen_columns_statistics(&block, None).unwrap();
348386

349387
// check if the max value (untrimmed) is in degenerated condition:
350388
// - the length of string value is larger or equal than STRING_PREFIX_LEN

src/query/service/tests/it/storages/statistics/column_statistics.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use common_datavalues::ColumnRef;
1919
use common_datavalues::DataField;
2020
use common_datavalues::DataSchemaRefExt;
2121
use common_datavalues::DataTypeImpl;
22+
use common_datavalues::DataValue;
2223
use common_datavalues::Series;
2324
use common_datavalues::SeriesFrom;
2425
use common_datavalues::StructColumn;
@@ -88,29 +89,31 @@ fn test_column_traverse() -> Result<()> {
8889
let cols = traverse::traverse_columns_dfs(sample_block.columns())?;
8990

9091
assert_eq!(5, cols.len());
91-
(0..5).for_each(|i| assert_eq!(cols[i], sample_cols[i], "checking col {}", i));
92+
(0..5).for_each(|i| assert_eq!(cols[i].1, sample_cols[i], "checking col {}", i));
9293

9394
Ok(())
9495
}
9596

9697
#[test]
9798
fn test_column_statistic() -> Result<()> {
9899
let (sample_block, sample_cols) = gen_sample_block();
99-
let col_stats = gen_columns_statistics(&sample_block)?;
100+
let col_stats = gen_columns_statistics(&sample_block, None)?;
100101

101102
assert_eq!(5, col_stats.len());
102103

103104
(0..5).for_each(|i| {
104105
let stats = col_stats.get(&(i as u32)).unwrap();
106+
let column = &sample_cols[i];
107+
let values: Vec<DataValue> = (0..column.len()).map(|i| column.get(i)).collect();
105108
assert_eq!(
106109
&stats.min,
107-
sample_cols[i].to_values().iter().min().unwrap(),
110+
values.iter().min().unwrap(),
108111
"checking min of col {}",
109112
i
110113
);
111114
assert_eq!(
112115
&stats.max,
113-
sample_cols[i].to_values().iter().max().unwrap(),
116+
values.iter().max().unwrap(),
114117
"checking max of col {}",
115118
i
116119
);

0 commit comments

Comments
 (0)