Skip to content

Commit 4bc46b3

Browse files
authored
Merge branch 'main' into issue_9057
2 parents b03895a + 0c92718 commit 4bc46b3

File tree

16 files changed

+188
-53
lines changed

16 files changed

+188
-53
lines changed

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ build-native:
6262
unit-test:
6363
ulimit -n 10000;ulimit -s 16384; RUST_LOG="ERROR" bash ./scripts/ci/ci-run-unit-tests.sh
6464

65+
miri:
66+
cargo miri setup
67+
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --no-default-features
68+
6569
embedded-meta-test: build
6670
rm -rf ./_meta_embedded*
6771
bash ./scripts/ci/ci-run-tests-embedded-meta.sh

scripts/setup/rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[toolchain]
22
channel = "nightly-2022-11-14"
3-
components = ["rustfmt", "clippy", "rust-src"]
3+
components = ["rustfmt", "clippy", "rust-src", "miri"]

src/common/jsonb/src/functions.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,7 @@ pub fn parse_json_path(path: &[u8]) -> Result<Vec<JsonPath>, Error> {
816816
}
817817
let s = std::str::from_utf8(&path[prev_idx..idx - 2])?;
818818
let json_path = JsonPath::String(Cow::Borrowed(s));
819+
819820
json_paths.push(json_path);
820821
} else {
821822
prev_idx = idx - 1;
@@ -836,6 +837,34 @@ pub fn parse_json_path(path: &[u8]) -> Result<Vec<JsonPath>, Error> {
836837
return Err(Error::InvalidToken);
837838
}
838839
}
840+
} else if c == b'"' {
841+
prev_idx = idx;
842+
loop {
843+
let c = read_char(path, &mut idx)?;
844+
if c == b'\\' {
845+
idx += 1;
846+
let c = read_char(path, &mut idx)?;
847+
if c == b'"' {
848+
idx += 1;
849+
}
850+
} else if c != b'"' {
851+
idx += 1;
852+
} else {
853+
// Try to read to check if has extra strings, string value can only have one.
854+
let c = read_char(path, &mut idx);
855+
match c {
856+
Ok(_) => return Err(Error::InvalidToken),
857+
Err(_) => break,
858+
}
859+
}
860+
}
861+
let s = std::str::from_utf8(&path[prev_idx..idx - 1])?;
862+
let json_path = JsonPath::String(Cow::Borrowed(s));
863+
if json_paths.is_empty() {
864+
json_paths.push(json_path);
865+
} else {
866+
return Err(Error::InvalidToken);
867+
}
839868
} else {
840869
if c == b':' || c == b'.' {
841870
if idx == 1 {

src/common/jsonb/tests/it/functions.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use common_jsonb::object_keys;
3232
use common_jsonb::parse_json_path;
3333
use common_jsonb::parse_value;
3434
use common_jsonb::to_string;
35+
use common_jsonb::Error;
3536
use common_jsonb::JsonPath;
3637
use common_jsonb::Number;
3738
use common_jsonb::Object;
@@ -361,6 +362,8 @@ fn test_parse_json_path() {
361362
JsonPath::String(Cow::from("k2")),
362363
JsonPath::String(Cow::from("k3")),
363364
]),
365+
("\"k1\"", vec![JsonPath::String(Cow::from("k1"))]),
366+
("\"k1k2\"", vec![JsonPath::String(Cow::from("k1k2"))]),
364367
(r#"k1["k2"][1]"#, vec![
365368
JsonPath::String(Cow::from("k1")),
366369
JsonPath::String(Cow::from("k2")),
@@ -372,6 +375,18 @@ fn test_parse_json_path() {
372375
let path = parse_json_path(s.as_bytes()).unwrap();
373376
assert_eq!(&path[..], &expect[..]);
374377
}
378+
379+
let wrong_sources = vec![
380+
(r#"\"\"\\k1\"\""#, Error::InvalidToken),
381+
(r#"\\k1\\'"#, Error::InvalidToken),
382+
];
383+
for (s, expect) in wrong_sources {
384+
let path = parse_json_path(s.as_bytes());
385+
match path {
386+
Ok(_) => println!(),
387+
Err(_) => assert_eq!(Error::InvalidToken, expect),
388+
}
389+
}
375390
}
376391

377392
#[test]

src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,9 +663,10 @@ impl CompactSegmentTestFixture {
663663
let mut stats_acc = StatisticsAccumulator::default();
664664
for block in blocks {
665665
let block = block?;
666-
let col_stats = gen_columns_statistics(&block)?;
666+
let col_stats = gen_columns_statistics(&block, None)?;
667667

668-
let mut block_statistics = BlockStatistics::from(&block, "".to_owned(), None)?;
668+
let mut block_statistics =
669+
BlockStatistics::from(&block, "".to_owned(), None, None)?;
669670
let block_meta = block_writer.write(block, col_stats, None).await?;
670671
block_statistics.block_file_location = block_meta.location.0.clone();
671672

src/query/service/tests/it/storages/fuse/statistics.rs

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,50 @@ use crate::storages::fuse::table_test_fixture::TestFixture;
4646

4747
#[test]
4848
fn test_ft_stats_block_stats() -> common_exception::Result<()> {
49-
let schema = DataSchemaRefExt::create(vec![DataField::new("a", i32::to_data_type())]);
50-
let block = DataBlock::create(schema, vec![Series::from_data(vec![1, 2, 3])]);
51-
let r = gen_columns_statistics(&block)?;
52-
assert_eq!(1, r.len());
49+
let schema = DataSchemaRefExt::create(vec![
50+
DataField::new("a", i32::to_data_type()),
51+
DataField::new("b", Vu8::to_data_type()),
52+
]);
53+
let block = DataBlock::create(schema, vec![
54+
Series::from_data(vec![1, 2, 3]),
55+
Series::from_data(vec!["aa", "aa", "bb"]),
56+
]);
57+
let r = gen_columns_statistics(&block, None)?;
58+
assert_eq!(2, r.len());
59+
let col_stats = r.get(&0).unwrap();
60+
assert_eq!(col_stats.min, DataValue::Int64(1));
61+
assert_eq!(col_stats.max, DataValue::Int64(3));
62+
assert_eq!(col_stats.distinct_of_values, Some(3));
63+
let col_stats = r.get(&1).unwrap();
64+
assert_eq!(col_stats.min, DataValue::String(b"aa".to_vec()));
65+
assert_eq!(col_stats.max, DataValue::String(b"bb".to_vec()));
66+
assert_eq!(col_stats.distinct_of_values, Some(2));
67+
Ok(())
68+
}
69+
70+
#[test]
71+
fn test_ft_stats_block_stats_with_column_distinct_count() -> common_exception::Result<()> {
72+
let schema = DataSchemaRefExt::create(vec![
73+
DataField::new("a", i32::to_data_type()),
74+
DataField::new("b", Vu8::to_data_type()),
75+
]);
76+
let block = DataBlock::create(schema, vec![
77+
Series::from_data(vec![1, 2, 3]),
78+
Series::from_data(vec!["aa", "aa", "bb"]),
79+
]);
80+
let mut column_distinct_count = HashMap::new();
81+
column_distinct_count.insert(0, 3);
82+
column_distinct_count.insert(1, 2);
83+
let r = gen_columns_statistics(&block, Some(column_distinct_count))?;
84+
assert_eq!(2, r.len());
5385
let col_stats = r.get(&0).unwrap();
5486
assert_eq!(col_stats.min, DataValue::Int64(1));
5587
assert_eq!(col_stats.max, DataValue::Int64(3));
88+
assert_eq!(col_stats.distinct_of_values, Some(3));
89+
let col_stats = r.get(&1).unwrap();
90+
assert_eq!(col_stats.min, DataValue::String(b"aa".to_vec()));
91+
assert_eq!(col_stats.max, DataValue::String(b"bb".to_vec()));
92+
assert_eq!(col_stats.distinct_of_values, Some(2));
5693
Ok(())
5794
}
5895

@@ -69,7 +106,7 @@ fn test_ft_tuple_stats_block_stats() -> common_exception::Result<()> {
69106
let column = StructColumn::from_data(inner_columns, tuple_data_type).arc();
70107

71108
let block = DataBlock::create(schema, vec![column]);
72-
let r = gen_columns_statistics(&block)?;
109+
let r = gen_columns_statistics(&block, None)?;
73110
assert_eq!(2, r.len());
74111
let col0_stats = r.get(&0).unwrap();
75112
assert_eq!(col0_stats.min, DataValue::Int64(1));
@@ -89,7 +126,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> {
89126
let blocks = TestFixture::gen_sample_blocks_ex(num_of_blocks, rows_per_block, val_start_with);
90127
let col_stats = blocks
91128
.iter()
92-
.map(|b| gen_columns_statistics(&b.clone().unwrap()))
129+
.map(|b| gen_columns_statistics(&b.clone().unwrap(), None))
93130
.collect::<common_exception::Result<Vec<_>>>()?;
94131
let r = reducers::reduce_block_statistics(&col_stats, None);
95132
assert!(r.is_ok());
@@ -158,8 +195,9 @@ async fn test_accumulator() -> common_exception::Result<()> {
158195

159196
for item in blocks {
160197
let block = item?;
161-
let col_stats = gen_columns_statistics(&block)?;
162-
let block_statistics = BlockStatistics::from(&block, "does_not_matter".to_owned(), None)?;
198+
let col_stats = gen_columns_statistics(&block, None)?;
199+
let block_statistics =
200+
BlockStatistics::from(&block, "does_not_matter".to_owned(), None, None)?;
163201
let block_writer = BlockWriter::new(&operator, &loc_generator);
164202
let block_meta = block_writer.write(block, col_stats, None).await?;
165203
stats_acc.add_with_block_meta(block_meta, block_statistics)?;
@@ -344,7 +382,7 @@ fn test_ft_stats_block_stats_string_columns_trimming_using_eval() -> common_exce
344382
let max_expr = max_col.get(0);
345383

346384
// generate the statistics of column
347-
let stats_of_columns = gen_columns_statistics(&block).unwrap();
385+
let stats_of_columns = gen_columns_statistics(&block, None).unwrap();
348386

349387
// check if the max value (untrimmed) is in degenerated condition:
350388
// - the length of string value is larger or equal than STRING_PREFIX_LEN

src/query/service/tests/it/storages/statistics/column_statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,15 @@ fn test_column_traverse() -> Result<()> {
8989
let cols = traverse::traverse_columns_dfs(sample_block.columns())?;
9090

9191
assert_eq!(5, cols.len());
92-
(0..5).for_each(|i| assert_eq!(cols[i], sample_cols[i], "checking col {}", i));
92+
(0..5).for_each(|i| assert_eq!(cols[i].1, sample_cols[i], "checking col {}", i));
9393

9494
Ok(())
9595
}
9696

9797
#[test]
9898
fn test_column_statistic() -> Result<()> {
9999
let (sample_block, sample_cols) = gen_sample_block();
100-
let col_stats = gen_columns_statistics(&sample_block)?;
100+
let col_stats = gen_columns_statistics(&sample_block, None)?;
101101

102102
assert_eq!(5, col_stats.len());
103103

src/query/storages/fuse/fuse-result/src/result_table_sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl Processor for ResultTableSink {
176176
match std::mem::replace(&mut self.state, State::None) {
177177
State::NeedSerialize(block) => {
178178
let location = self.locations.gen_block_location();
179-
let block_statistics = BlockStatistics::from(&block, location.clone(), None)?;
179+
let block_statistics = BlockStatistics::from(&block, location.clone(), None, None)?;
180180

181181
let mut data = Vec::with_capacity(100 * 1024 * 1024);
182182
let schema = block.schema().clone();

src/query/storages/fuse/fuse-result/src/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl ResultTableWriter {
101101
pub async fn append_block(&mut self, block: DataBlock) -> Result<PartInfoPtr> {
102102
let location = self.locations.gen_block_location();
103103
let mut data = Vec::with_capacity(100 * 1024 * 1024);
104-
let block_statistics = BlockStatistics::from(&block, location.clone(), None)?;
104+
let block_statistics = BlockStatistics::from(&block, location.clone(), None, None)?;
105105
let schema = block.schema().clone();
106106
let (size, meta_data) = serialize_data_blocks(vec![block], &schema, &mut data)?;
107107

src/query/storages/fuse/fuse/src/operations/fuse_sink.rs

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16+
use std::collections::HashMap;
1617
use std::sync::Arc;
1718

1819
use async_trait::async_trait;
@@ -51,6 +52,33 @@ struct BloomIndexState {
5152
location: Location,
5253
}
5354

55+
impl BloomIndexState {
56+
pub fn try_create(
57+
block: &DataBlock,
58+
location: Location,
59+
) -> Result<(Self, HashMap<usize, usize>)> {
60+
// write index
61+
let bloom_index = BlockFilter::try_create(&[block])?;
62+
let index_block = bloom_index.filter_block;
63+
let mut data = Vec::with_capacity(100 * 1024);
64+
let index_block_schema = &bloom_index.filter_schema;
65+
let (size, _) = serialize_data_blocks_with_compression(
66+
vec![index_block],
67+
index_block_schema,
68+
&mut data,
69+
CompressionOptions::Uncompressed,
70+
)?;
71+
Ok((
72+
Self {
73+
data,
74+
size,
75+
location,
76+
},
77+
bloom_index.column_distinct_count,
78+
))
79+
}
80+
}
81+
5482
enum State {
5583
None,
5684
NeedSerialize(DataBlock),
@@ -168,28 +196,16 @@ impl Processor for FuseTableSink {
168196

169197
let (block_location, block_id) = self.meta_locations.gen_block_location();
170198

171-
let bloom_index_state = {
172-
// write index
173-
let bloom_index = BlockFilter::try_create(&[&block])?;
174-
let index_block = bloom_index.filter_block;
175-
let location = self.meta_locations.block_bloom_index_location(&block_id);
176-
let mut data = Vec::with_capacity(100 * 1024);
177-
let index_block_schema = &bloom_index.filter_schema;
178-
let (size, _) = serialize_data_blocks_with_compression(
179-
vec![index_block],
180-
index_block_schema,
181-
&mut data,
182-
CompressionOptions::Uncompressed,
183-
)?;
184-
BloomIndexState {
185-
data,
186-
size,
187-
location,
188-
}
189-
};
199+
let location = self.meta_locations.block_bloom_index_location(&block_id);
200+
let (bloom_index_state, column_distinct_count) =
201+
BloomIndexState::try_create(&block, location)?;
190202

191-
let block_statistics =
192-
BlockStatistics::from(&block, block_location.0, cluster_stats)?;
203+
let block_statistics = BlockStatistics::from(
204+
&block,
205+
block_location.0,
206+
cluster_stats,
207+
Some(column_distinct_count),
208+
)?;
193209
// we need a configuration of block size threshold here
194210
let mut data = Vec::with_capacity(100 * 1024 * 1024);
195211
let schema = block.schema().clone();

0 commit comments

Comments
 (0)