Skip to content

Commit 5500a72

Browse files
committed
Merge branch 'main' of https://github.com/datafuselabs/databend into substr
2 parents 9963404 + 14ca4e7 commit 5500a72

File tree

7 files changed

+86
-78
lines changed

7 files changed

+86
-78
lines changed

src/query/functions-v2/src/scalars/string.rs

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use common_expression::types::StringType;
2727
use common_expression::vectorize_with_builder_1_arg;
2828
use common_expression::vectorize_with_builder_2_arg;
2929
use common_expression::vectorize_with_builder_3_arg;
30+
use common_expression::vectorize_with_builder_4_arg;
3031
use common_expression::FunctionProperty;
3132
use common_expression::FunctionRegistry;
3233
use common_expression::Value;
@@ -142,78 +143,83 @@ pub fn register(registry: &mut FunctionRegistry) {
142143
),
143144
);
144145

145-
registry.register_4_arg::<StringType, NumberType<i64>, NumberType<i64>, StringType, StringType, _, _>(
146-
"insert",
147-
FunctionProperty::default(),
148-
|_, _, _, _| None,
149-
|srcstr, pos, len, substr| {
150-
let mut values: Vec<u8> = vec![];
151-
152-
let sl = srcstr.len() as i64;
153-
if pos < 1 || pos > sl {
154-
values.extend_from_slice(srcstr);
146+
registry.register_passthrough_nullable_4_arg::<StringType, NumberType<i64>, NumberType<i64>, StringType, StringType, _, _>(
147+
"insert",
148+
FunctionProperty::default(),
149+
|_, _, _, _| None,
150+
vectorize_with_builder_4_arg::<StringType, NumberType<i64>, NumberType<i64>, StringType, StringType>(
151+
|srcstr, pos, len, substr, output| {
152+
let pos = pos as usize;
153+
let len = len as usize;
154+
if pos < 1 || pos > srcstr.len() {
155+
output.put_slice(srcstr);
155156
} else {
156-
let p = pos as usize - 1;
157-
values.extend_from_slice(&srcstr[0..p]);
158-
values.extend_from_slice(substr);
159-
if len >= 0 && pos + len < sl {
160-
let l = len as usize;
161-
values.extend_from_slice(&srcstr[p + l..]);
157+
let pos = pos - 1;
158+
output.put_slice(&srcstr[0..pos]);
159+
output.put_slice(substr);
160+
if pos + len < srcstr.len() {
161+
output.put_slice(&srcstr[(pos + len)..]);
162162
}
163163
}
164-
values
165-
}
166-
);
164+
output.commit_row();
165+
Ok(())
166+
}),
167+
);
167168

168-
registry.register_3_arg::<StringType, NumberType<u64>, StringType, StringType, _, _>(
169+
registry.register_passthrough_nullable_3_arg::<StringType, NumberType<u64>, StringType, StringType, _, _>(
169170
"rpad",
170171
FunctionProperty::default(),
171172
|_, _, _| None,
172-
|str: &[u8], l: u64, pad: &[u8]| {
173-
let mut buff: Vec<u8> = vec![];
174-
if l != 0 {
175-
if l > str.len() as u64 {
176-
buff.extend_from_slice(str);
177-
while buff.len() < l as usize {
178-
if buff.len() + pad.len() <= l as usize {
179-
buff.extend_from_slice(pad);
180-
} else {
181-
buff.extend_from_slice(&pad[0..l as usize - buff.len()])
182-
}
173+
vectorize_with_builder_3_arg::<StringType, NumberType<u64>, StringType, StringType>(
174+
|s: &[u8], pad_len: u64, pad: &[u8], output| {
175+
let pad_len = pad_len as usize;
176+
if pad_len <= s.len() {
177+
output.put_slice(&s[..pad_len])
178+
} else {
179+
output.put_slice(s);
180+
let mut remain_pad_len = pad_len - s.len();
181+
while remain_pad_len > 0 {
182+
if remain_pad_len < pad.len() {
183+
output.put_slice(&pad[..remain_pad_len]);
184+
remain_pad_len = 0;
185+
} else {
186+
output.put_slice(pad);
187+
remain_pad_len -= pad.len();
183188
}
184-
} else {
185-
buff.extend_from_slice(&str[0..l as usize]);
186189
}
187190
}
188-
buff
189-
},
191+
output.commit_row();
192+
Ok(())
193+
}),
190194
);
191195

192-
registry.register_3_arg::<StringType, StringType, StringType, StringType, _, _>(
196+
registry.register_passthrough_nullable_3_arg::<StringType, StringType, StringType, StringType, _, _>(
193197
"replace",
194198
FunctionProperty::default(),
195199
|_, _, _| None,
196-
|str, from, to| {
197-
let mut buf: Vec<u8> = vec![];
200+
vectorize_with_builder_3_arg::<StringType, StringType, StringType, StringType>(
201+
|str, from, to, output| {
198202
if from.is_empty() || from == to {
199-
buf.extend_from_slice(str);
200-
return buf;
203+
output.put_slice(str);
204+
output.commit_row();
205+
return Ok(());
201206
}
202207
let mut skip = 0;
203208
for (p, w) in str.windows(from.len()).enumerate() {
204209
if w == from {
205-
buf.extend_from_slice(to);
210+
output.put_slice(to);
206211
skip = from.len();
207212
} else if p + w.len() == str.len() {
208-
buf.extend_from_slice(w);
213+
output.put_slice(w);
209214
} else if skip > 1 {
210215
skip -= 1;
211216
} else {
212-
buf.extend_from_slice(&w[0..1]);
217+
output.put_slice(&w[0..1]);
213218
}
214219
}
215-
buf
216-
},
220+
output.commit_row();
221+
Ok(())
222+
}),
217223
);
218224

219225
registry.register_2_arg::<StringType, StringType, NumberType<i8>, _, _>(

src/query/functions-v2/tests/it/scalars/testdata/string.txt

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2862,19 +2862,19 @@ evaluation:
28622862
| Domain | {"cc"..="test"} | {1..=4} | {1..=5} | {"12"..="zc"} | Unknown |
28632863
| Row 0 | "hi" | 1 | 3 | "xx" | "xx" |
28642864
| Row 1 | "test" | 4 | 5 | "zc" | "teszc" |
2865-
| Row 2 | "cc" | 1 | 1 | "12" | "12" |
2865+
| Row 2 | "cc" | 1 | 1 | "12" | "12c" |
28662866
| Row 3 | "q" | 1 | 1 | "56" | "56" |
28672867
+--------+-----------------+---------+---------+---------------+---------+
28682868
evaluation (internal):
2869-
+--------+----------------------------------------------------------------------------+
2870-
| Column | Data |
2871-
+--------+----------------------------------------------------------------------------+
2872-
| a | StringColumn { data: 0x686974657374636371, offsets: [0, 2, 6, 8, 9] } |
2873-
| b | Int32([1, 4, 1, 1]) |
2874-
| c | Int32([3, 5, 1, 1]) |
2875-
| d | StringColumn { data: 0x78787a6331323536, offsets: [0, 2, 4, 6, 8] } |
2876-
| Output | StringColumn { data: 0x78787465737a6331323536, offsets: [0, 2, 7, 9, 11] } |
2877-
+--------+----------------------------------------------------------------------------+
2869+
+--------+-------------------------------------------------------------------------------+
2870+
| Column | Data |
2871+
+--------+-------------------------------------------------------------------------------+
2872+
| a | StringColumn { data: 0x686974657374636371, offsets: [0, 2, 6, 8, 9] } |
2873+
| b | Int32([1, 4, 1, 1]) |
2874+
| c | Int32([3, 5, 1, 1]) |
2875+
| d | StringColumn { data: 0x78787a6331323536, offsets: [0, 2, 4, 6, 8] } |
2876+
| Output | StringColumn { data: 0x78787465737a633132633536, offsets: [0, 2, 7, 10, 12] } |
2877+
+--------+-------------------------------------------------------------------------------+
28782878

28792879

28802880
ast : insert(x, y, z, u)
@@ -2892,15 +2892,15 @@ evaluation:
28922892
| Row 3 | "q" | 1 | 1 | "56" | "56" |
28932893
+--------+--------------------------+------------------+------------------+------------------------+-------------+
28942894
evaluation (internal):
2895-
+--------+-------------------------------------------------------------------------------------------------------------------------------+
2896-
| Column | Data |
2897-
+--------+-------------------------------------------------------------------------------------------------------------------------------+
2898-
| x | NullableColumn { column: StringColumn { data: 0x686974657374636371, offsets: [0, 2, 6, 8, 9] }, validity: [0b____1110] } |
2899-
| y | NullableColumn { column: Int32([1, 4, 1, 1]), validity: [0b____1011] } |
2900-
| z | NullableColumn { column: Int32([3, 5, 1, 1]), validity: [0b____1101] } |
2901-
| u | NullableColumn { column: StringColumn { data: 0x78787a6331323536, offsets: [0, 2, 4, 6, 8] }, validity: [0b____1110] } |
2902-
| Output | NullableColumn { column: StringColumn { data: 0x78787465737a6331323536, offsets: [0, 2, 7, 9, 11] }, validity: [0b____1000] } |
2903-
+--------+-------------------------------------------------------------------------------------------------------------------------------+
2895+
+--------+----------------------------------------------------------------------------------------------------------------------------------+
2896+
| Column | Data |
2897+
+--------+----------------------------------------------------------------------------------------------------------------------------------+
2898+
| x | NullableColumn { column: StringColumn { data: 0x686974657374636371, offsets: [0, 2, 6, 8, 9] }, validity: [0b____1110] } |
2899+
| y | NullableColumn { column: Int32([1, 4, 1, 1]), validity: [0b____1011] } |
2900+
| z | NullableColumn { column: Int32([3, 5, 1, 1]), validity: [0b____1101] } |
2901+
| u | NullableColumn { column: StringColumn { data: 0x78787a6331323536, offsets: [0, 2, 4, 6, 8] }, validity: [0b____1110] } |
2902+
| Output | NullableColumn { column: StringColumn { data: 0x78787465737a633132633536, offsets: [0, 2, 7, 10, 12] }, validity: [0b____1000] } |
2903+
+--------+----------------------------------------------------------------------------------------------------------------------------------+
29042904

29052905

29062906
ast : space(0)

src/query/service/src/api/rpc/exchange/statistics_receiver.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl StatisticsReceiver {
7676
recv = Box::pin(flight_exchange.recv());
7777
}
7878
Err(cause) => {
79-
ctx.get_current_session().force_kill_query();
79+
ctx.get_current_session().force_kill_query(cause.clone());
8080
return Err(cause);
8181
}
8282
};
@@ -90,14 +90,13 @@ impl StatisticsReceiver {
9090
notified = middle;
9191
match Self::recv_data(&ctx, res) {
9292
Ok(true) => {
93-
ctx.get_current_session().force_kill_query();
9493
return Ok(());
9594
}
9695
Ok(false) => {
9796
recv = Box::pin(flight_exchange.recv());
9897
}
9998
Err(cause) => {
100-
ctx.get_current_session().force_kill_query();
99+
ctx.get_current_session().force_kill_query(cause.clone());
101100
return Err(cause);
102101
}
103102
};
@@ -106,7 +105,7 @@ impl StatisticsReceiver {
106105
}
107106

108107
if let Err(cause) = Self::fetch(&ctx, &flight_exchange, recv).await {
109-
ctx.get_current_session().force_kill_query();
108+
ctx.get_current_session().force_kill_query(cause.clone());
110109
return Err(cause);
111110
}
112111

src/query/service/src/interpreters/interpreter_kill.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ impl KillInterpreter {
4848
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
4949
}
5050
Some(kill_session) => {
51-
kill_session.force_kill_query();
51+
kill_session.force_kill_query(ErrorCode::AbortedQuery(
52+
"Aborted query, because the server is shutting down or the query was killed",
53+
));
5254
let schema = Arc::new(DataSchema::empty());
5355
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
5456
}

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,9 @@ impl Executor {
197197
Running(r) => {
198198
// release session
199199
if kill {
200-
r.session.force_kill_query();
200+
r.session.force_kill_query(ErrorCode::AbortedQuery(
201+
"Aborted query, because the server is shutting down or the query was killed",
202+
));
201203
}
202204
if let Err(e) = &reason {
203205
r.ctx.set_error(e.clone());

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,8 @@ impl QueryContextShared {
124124
self.query_need_abort.clone()
125125
}
126126

127-
pub fn kill(&self) {
128-
self.set_error(ErrorCode::AbortedQuery(
129-
"Aborted query, because the server is shutting down or the query was killed",
130-
));
131-
127+
pub fn kill(&self, cause: ErrorCode) {
128+
self.set_error(cause);
132129
self.query_need_abort.store(true, Ordering::Release);
133130
let mut sources_abort_handle = self.sources_abort_handle.write();
134131

src/query/service/src/sessions/session.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,17 @@ impl Session {
106106
}
107107

108108
pub fn force_kill_session(self: &Arc<Self>) {
109-
self.force_kill_query();
109+
self.force_kill_query(ErrorCode::AbortedQuery(
110+
"Aborted query, because the server is shutting down or the query was killed",
111+
));
110112
self.kill(/* shutdown io stream */);
111113
}
112114

113-
pub fn force_kill_query(self: &Arc<Self>) {
115+
pub fn force_kill_query(self: &Arc<Self>, cause: ErrorCode) {
114116
let session_ctx = self.session_ctx.clone();
115117

116118
if let Some(context_shared) = session_ctx.get_query_context_shared() {
117-
context_shared.kill(/* shutdown executing query */);
119+
context_shared.kill(cause);
118120
}
119121
}
120122

0 commit comments

Comments
 (0)