Skip to content

Commit 95a5c93

Browse files
authored
Merge pull request #9214 from b41sh/feat-fast-parse-values
feat(query): fast parse insert values
2 parents 97cf0eb + 987cc1b commit 95a5c93

File tree

10 files changed

+544
-22
lines changed

10 files changed

+544
-22
lines changed

Cargo.lock

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/io/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ ordered-float = "3.1.0"
2727
serde = { workspace = true }
2828

2929
[dev-dependencies]
30+
aho-corasick = { version = "0.7.20" }
3031
rand = "0.8.5"

src/common/io/src/cursor_ext/cursor_read_string_ext.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::VecDeque;
1516
use std::io::BufRead;
1617
use std::io::Cursor;
1718
use std::io::ErrorKind;
@@ -23,6 +24,11 @@ use crate::cursor_ext::cursor_read_bytes_ext::ReadBytesExt;
2324
pub trait BufferReadStringExt {
2425
fn read_quoted_text(&mut self, buf: &mut Vec<u8>, quota: u8) -> Result<()>;
2526
fn read_escaped_string_text(&mut self, buf: &mut Vec<u8>) -> Result<()>;
27+
fn fast_read_quoted_text(
28+
&mut self,
29+
buf: &mut Vec<u8>,
30+
positions: &mut VecDeque<usize>,
31+
) -> Result<()>;
2632
}
2733

2834
impl<T> BufferReadStringExt for Cursor<T>
@@ -111,6 +117,86 @@ where T: AsRef<[u8]>
111117
}
112118
Ok(())
113119
}
120+
121+
// `positions` stores the positions of all `'` and `\` that are pre-generated
122+
// by the `Aho-Corasick` algorithm, which can use SIMD instructions to
123+
// accelerate the search process.
124+
// Using these positions, we can directly jump to the end of the text,
125+
// instead of inefficient step-by-step iterate over the buffer.
126+
fn fast_read_quoted_text(
127+
&mut self,
128+
buf: &mut Vec<u8>,
129+
positions: &mut VecDeque<usize>,
130+
) -> Result<()> {
131+
self.must_ignore_byte(b'\'')?;
132+
let mut start = self.position() as usize;
133+
check_pos(start - 1, positions)?;
134+
135+
// Get next possible end position.
136+
while let Some(pos) = positions.pop_front() {
137+
let len = pos - start;
138+
buf.extend_from_slice(&self.remaining_slice()[..len]);
139+
self.consume(len);
140+
141+
if self.ignore_byte(b'\'') {
142+
return Ok(());
143+
} else if self.ignore_byte(b'\\') {
144+
let b = self.remaining_slice();
145+
if b.is_empty() {
146+
return Err(std::io::Error::new(
147+
ErrorKind::InvalidData,
148+
"Expected to have terminated string literal after escaped char '\' ."
149+
.to_string(),
150+
));
151+
}
152+
let c = b[0];
153+
self.ignore_byte(c);
154+
155+
match c {
156+
b'n' => buf.push(b'\n'),
157+
b't' => buf.push(b'\t'),
158+
b'r' => buf.push(b'\r'),
159+
b'0' => buf.push(b'\0'),
160+
b'\'' => {
161+
check_pos(pos + 1, positions)?;
162+
buf.push(b'\'');
163+
}
164+
b'\\' => {
165+
check_pos(pos + 1, positions)?;
166+
buf.push(b'\\');
167+
}
168+
b'\"' => buf.push(b'\"'),
169+
_ => {
170+
buf.push(b'\\');
171+
buf.push(c);
172+
}
173+
}
174+
} else {
175+
break;
176+
}
177+
start = self.position() as usize;
178+
}
179+
Err(std::io::Error::new(
180+
ErrorKind::InvalidData,
181+
format!(
182+
"Expected to have terminated string literal after quota \', while consumed buf: {:?}",
183+
buf
184+
),
185+
))
186+
}
187+
}
188+
189+
// Check that the pre-calculated position is correct.
190+
fn check_pos(curr_pos: usize, positions: &mut VecDeque<usize>) -> Result<()> {
191+
if let Some(pos) = positions.pop_front() {
192+
if curr_pos == pos {
193+
return Ok(());
194+
}
195+
}
196+
Err(std::io::Error::new(
197+
ErrorKind::InvalidData,
198+
"Expected to have quotes in string literal.".to_string(),
199+
))
114200
}
115201

116202
fn unescape(c: u8) -> u8 {
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::VecDeque;
16+
use std::io::Cursor;
17+
18+
use aho_corasick::AhoCorasick;
19+
use common_exception::Result;
20+
use common_io::cursor_ext::*;
21+
22+
#[test]
23+
fn test_positions() -> Result<()> {
24+
let cases = vec![
25+
(r#"abc"#.to_string(), vec![]),
26+
(r#"'abcdefg'"#.to_string(), vec![0, 8]),
27+
(r#"'abc\d'e'"#.to_string(), vec![0, 4, 6, 8]),
28+
(r#"'abc','d\'ef','g\\\'hi'"#.to_string(), vec![
29+
0, 4, 6, 8, 9, 12, 14, 16, 17, 18, 19, 22,
30+
]),
31+
];
32+
33+
let patterns = &["'", "\\"];
34+
let ac = AhoCorasick::new(patterns);
35+
for (data, expect) in cases {
36+
let mut positions = VecDeque::new();
37+
for mat in ac.find_iter(&data) {
38+
let pos = mat.start();
39+
positions.push_back(pos);
40+
}
41+
assert_eq!(positions, expect)
42+
}
43+
Ok(())
44+
}
45+
46+
#[test]
47+
fn test_fast_read_text() -> Result<()> {
48+
let data = r#"'abc','d\'ef','g\\\'hi'"#.to_string();
49+
let patterns = &["'", "\\"];
50+
let ac = AhoCorasick::new(patterns);
51+
let mut positions = VecDeque::new();
52+
for mat in ac.find_iter(&data) {
53+
let pos = mat.start();
54+
positions.push_back(pos);
55+
}
56+
57+
let mut reader = Cursor::new(data.as_bytes());
58+
let expected = vec![
59+
"abc".as_bytes().to_vec(),
60+
"d'ef".as_bytes().to_vec(),
61+
"g\\'hi".as_bytes().to_vec(),
62+
];
63+
let mut res = vec![];
64+
for i in 0..expected.len() {
65+
if i > 0 {
66+
assert!(reader.ignore_byte(b','));
67+
}
68+
let mut buf = vec![];
69+
reader.fast_read_quoted_text(&mut buf, &mut positions)?;
70+
res.push(buf);
71+
}
72+
assert_eq!(res, expected);
73+
Ok(())
74+
}

src/common/io/tests/it/cursor_ext/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod fast_read_text_ext;
1516
mod read_bytes_ext;
1617
mod read_datetime_ext;
1718
mod read_number_ext;

0 commit comments

Comments
 (0)