Skip to content

Commit 38cd7a7

Browse files
committed
Merge branch 'main' of https://github.com/datafuselabs/databend into add_runtime_bloom_filter_for_merge_into
2 parents e9010e3 + 880e236 commit 38cd7a7

File tree

6 files changed

+157
-19
lines changed

6 files changed

+157
-19
lines changed

โ€Žsrc/common/metrics/src/metrics/session.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,31 @@
1313
// limitations under the License.
1414

1515
use std::sync::LazyLock;
16+
use std::time::Duration;
1617

1718
use crate::register_counter;
1819
use crate::register_gauge;
20+
use crate::register_histogram_in_milliseconds;
1921
use crate::Counter;
2022
use crate::Gauge;
23+
use crate::Histogram;
2124

2225
pub static SESSION_CONNECT_NUMBERS: LazyLock<Counter> =
2326
LazyLock::new(|| register_counter("session_connect_numbers"));
2427
pub static SESSION_CLOSE_NUMBERS: LazyLock<Counter> =
2528
LazyLock::new(|| register_counter("session_close_numbers"));
2629
pub static SESSION_ACTIVE_CONNECTIONS: LazyLock<Gauge> =
2730
LazyLock::new(|| register_gauge("session_connections"));
31+
pub static SESSION_QUQUED_QUERIES: LazyLock<Gauge> =
32+
LazyLock::new(|| register_gauge("session_queued_queries"));
33+
pub static SESSION_QUEUE_ABORT_COUNT: LazyLock<Counter> =
34+
LazyLock::new(|| register_counter("session_queue_abort_count"));
35+
pub static SESSION_QUEUE_ACQUIRE_ERROR_COUNT: LazyLock<Counter> =
36+
LazyLock::new(|| register_counter("session_queue_acquire_error_count"));
37+
pub static SESSION_QUEUE_ACQUIRE_TIMEOUT_COUNT: LazyLock<Counter> =
38+
LazyLock::new(|| register_counter("session_queue_acquire_timeout_count"));
39+
pub static SESSION_QUEUE_ACQUIRE_DURATION_MS: LazyLock<Histogram> =
40+
LazyLock::new(|| register_histogram_in_milliseconds("session_queue_acquire_duration_ms"));
2841

2942
pub fn incr_session_connect_numbers() {
3043
SESSION_CONNECT_NUMBERS.inc();
@@ -37,3 +50,23 @@ pub fn incr_session_close_numbers() {
3750
pub fn set_session_active_connections(num: usize) {
3851
SESSION_ACTIVE_CONNECTIONS.set(num as i64);
3952
}
53+
54+
pub fn set_session_queued_queries(num: usize) {
55+
SESSION_QUQUED_QUERIES.set(num as i64);
56+
}
57+
58+
pub fn incr_session_queue_abort_count() {
59+
SESSION_QUEUE_ABORT_COUNT.inc();
60+
}
61+
62+
pub fn incr_session_queue_acquire_error_count() {
63+
SESSION_QUEUE_ACQUIRE_ERROR_COUNT.inc();
64+
}
65+
66+
pub fn incr_session_queue_acquire_timeout_count() {
67+
SESSION_QUEUE_ACQUIRE_TIMEOUT_COUNT.inc();
68+
}
69+
70+
pub fn record_session_queue_acquire_duration_ms(duration: Duration) {
71+
SESSION_QUEUE_ACQUIRE_DURATION_MS.observe(duration.as_millis() as f64);
72+
}

โ€Žsrc/query/ast/src/parser/statement.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1986,7 +1986,7 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
19861986
},
19871987
);
19881988

1989-
let begin = value(Statement::Begin, rule! { BEGIN });
1989+
let begin = value(Statement::Begin, rule! { BEGIN ~ (TRANSACTION)? });
19901990
let commit = value(Statement::Commit, rule! { COMMIT });
19911991
let abort = value(Statement::Abort, rule! { ABORT | ROLLBACK });
19921992

โ€Žsrc/query/ast/src/parser/token.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,6 +1213,8 @@ pub enum TokenKind {
12131213
UNTIL,
12141214
#[token("BEGIN", ignore(ascii_case))]
12151215
BEGIN,
1216+
#[token("TRANSACTION", ignore(ascii_case))]
1217+
TRANSACTION,
12161218
#[token("COMMIT", ignore(ascii_case))]
12171219
COMMIT,
12181220
#[token("ABORT", ignore(ascii_case))]
@@ -1541,6 +1543,7 @@ impl TokenKind {
15411543
// | TokenKind::TABLESAMPLE
15421544
| TokenKind::THEN
15431545
| TokenKind::TRAILING
1546+
| TokenKind::TRANSACTION
15441547
| TokenKind::TRUE
15451548
// | TokenKind::UNIQUE
15461549
//| TokenKind::USER

โ€Žsrc/query/service/src/sessions/queue_mgr.rs

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16+
use std::fmt::Display;
1617
use std::future::Future;
1718
use std::hash::Hash;
1819
use std::pin::Pin;
@@ -30,6 +31,11 @@ use databend_common_catalog::table_context::TableContext;
3031
use databend_common_exception::ErrorCode;
3132
use databend_common_exception::Result;
3233
use databend_common_meta_app::principal::UserInfo;
34+
use databend_common_metrics::session::incr_session_queue_abort_count;
35+
use databend_common_metrics::session::incr_session_queue_acquire_error_count;
36+
use databend_common_metrics::session::incr_session_queue_acquire_timeout_count;
37+
use databend_common_metrics::session::record_session_queue_acquire_duration_ms;
38+
use databend_common_metrics::session::set_session_queued_queries;
3339
use log::info;
3440
use parking_lot::Mutex;
3541
use pin_project_lite::pin_project;
@@ -41,7 +47,7 @@ use tokio::time::error::Elapsed;
4147
use crate::sessions::QueryContext;
4248

4349
pub trait QueueData: Send + Sync + 'static {
44-
type Key: Send + Sync + Eq + Hash + Clone + 'static;
50+
type Key: Send + Sync + Eq + Hash + Display + Clone + 'static;
4551

4652
fn get_key(&self) -> Self::Key;
4753

@@ -91,12 +97,14 @@ impl<Data: QueueData> QueueManager<Data> {
9197
pub fn remove(&self, key: Data::Key) -> bool {
9298
let mut queue = self.queue.lock();
9399
if let Some(inner) = queue.remove(&key) {
100+
set_session_queued_queries(queue.len());
94101
inner.waker.wake();
95102
inner.is_abort.store(true, Ordering::SeqCst);
96-
return true;
103+
true
104+
} else {
105+
set_session_queued_queries(queue.len());
106+
false
97107
}
98-
99-
false
100108
}
101109

102110
pub async fn acquire(self: &Arc<Self>, data: Data) -> Result<AcquireQueueGuard> {
@@ -106,20 +114,43 @@ impl<Data: QueueData> QueueManager<Data> {
106114
tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()),
107115
self.clone(),
108116
);
117+
let start_time = SystemTime::now();
109118

110-
future.await
119+
match future.await {
120+
Ok(v) => {
121+
record_session_queue_acquire_duration_ms(start_time.elapsed().unwrap_or_default());
122+
Ok(v)
123+
}
124+
Err(e) => {
125+
match e.code() {
126+
ErrorCode::ABORTED_QUERY => {
127+
incr_session_queue_abort_count();
128+
}
129+
ErrorCode::TIMEOUT => {
130+
incr_session_queue_acquire_timeout_count();
131+
}
132+
_ => {
133+
incr_session_queue_acquire_error_count();
134+
}
135+
}
136+
Err(e)
137+
}
138+
}
111139
}
112140

113141
pub(crate) fn add_entity(&self, inner: Inner<Data>) -> Data::Key {
114142
let key = inner.data.get_key();
115143
let mut queue = self.queue.lock();
116144
queue.insert(key.clone(), inner);
145+
set_session_queued_queries(queue.len());
117146
key
118147
}
119148

120149
pub(crate) fn remove_entity(&self, key: &Data::Key) -> Option<Arc<Data>> {
121150
let mut queue = self.queue.lock();
122-
queue.remove(key).map(|inner| inner.data.clone())
151+
let data = queue.remove(key).map(|inner| inner.data.clone());
152+
set_session_queued_queries(queue.len());
153+
data
123154
}
124155
}
125156

โ€Žsrc/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -283,17 +283,14 @@ pub fn optimize_query(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Result<SE
283283
enable_distributed_query,
284284
)?;
285285

286+
if opt_ctx.enable_join_reorder {
287+
s_expr =
288+
RecursiveOptimizer::new([RuleID::CommuteJoin].as_slice(), &opt_ctx).run(&s_expr)?;
289+
}
290+
286291
// Cascades optimizer may fail due to timeout, fallback to heuristic optimizer in this case.
287292
s_expr = match cascades.optimize(s_expr.clone()) {
288293
Ok(mut s_expr) => {
289-
let rules = if opt_ctx.enable_join_reorder {
290-
[RuleID::EliminateEvalScalar, RuleID::CommuteJoin].as_slice()
291-
} else {
292-
[RuleID::EliminateEvalScalar].as_slice()
293-
};
294-
295-
s_expr = RecursiveOptimizer::new(rules, &opt_ctx).run(&s_expr)?;
296-
297294
// Push down sort and limit
298295
// TODO(leiysky): do this optimization in cascades optimizer
299296
if enable_distributed_query {
@@ -308,10 +305,6 @@ pub fn optimize_query(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Result<SE
308305
"CascadesOptimizer failed, fallback to heuristic optimizer: {}",
309306
e
310307
);
311-
312-
s_expr =
313-
RecursiveOptimizer::new(&[RuleID::EliminateEvalScalar], &opt_ctx).run(&s_expr)?;
314-
315308
if enable_distributed_query {
316309
s_expr = optimize_distributed_query(opt_ctx.table_ctx.clone(), &s_expr)?;
317310
}
@@ -320,6 +313,9 @@ pub fn optimize_query(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Result<SE
320313
}
321314
};
322315

316+
s_expr =
317+
RecursiveOptimizer::new([RuleID::EliminateEvalScalar].as_slice(), &opt_ctx).run(&s_expr)?;
318+
323319
Ok(s_expr)
324320
}
325321

โ€Žtests/sqllogictests/suites/base/14_transaction/14_0003_merge_into.test

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,80 @@ SELECT employee_id, salary FROM salaries order by employee_id;
6969
4 55000.00
7070

7171

72+
statement ok
73+
drop database test_txn_merge_into;
74+
75+
76+
statement ok
77+
create or replace database test_txn_merge_into;
78+
79+
statement ok
80+
use test_txn_merge_into;
81+
82+
statement ok
83+
set enable_experimental_merge_into = 1;
84+
85+
statement ok
86+
CREATE TABLE employees (
87+
employee_id INT,
88+
employee_name VARCHAR(255),
89+
department VARCHAR(255)
90+
);
91+
92+
statement ok
93+
CREATE TABLE salaries (
94+
employee_id INT,
95+
salary DECIMAL(10, 2)
96+
);
97+
98+
statement ok
99+
BEGIN TRANSACTION;
100+
101+
statement ok
102+
INSERT INTO employees VALUES
103+
(1, 'Alice', 'HR'),
104+
(2, 'Bob', 'IT'),
105+
(3, 'Charlie', 'Finance'),
106+
(4, 'David', 'HR');
107+
108+
statement ok
109+
INSERT INTO salaries VALUES
110+
(1, 50000.00),
111+
(2, 60000.00);
112+
113+
statement ok
114+
MERGE INTO salaries
115+
USING (SELECT * FROM employees) AS employees
116+
ON salaries.employee_id = employees.employee_id
117+
WHEN MATCHED AND employees.department = 'HR' THEN
118+
UPDATE SET
119+
salaries.salary = salaries.salary + 1000.00
120+
WHEN MATCHED THEN
121+
UPDATE SET
122+
salaries.salary = salaries.salary + 500.00
123+
WHEN NOT MATCHED THEN
124+
INSERT (employee_id, salary)
125+
VALUES (employees.employee_id, 55000.00);
126+
127+
query IF
128+
SELECT employee_id, salary FROM salaries order by employee_id;
129+
----
130+
1 51000.00
131+
2 60500.00
132+
3 55000.00
133+
4 55000.00
134+
135+
statement ok
136+
COMMIT;
137+
138+
query IF
139+
SELECT employee_id, salary FROM salaries order by employee_id;
140+
----
141+
1 51000.00
142+
2 60500.00
143+
3 55000.00
144+
4 55000.00
145+
146+
72147
statement ok
73148
drop database test_txn_merge_into;

0 commit comments

Comments
ย (0)