Skip to content

Commit b6e7948

Browse files
committed
refactor(sessions): remove useless code async
1 parent 46e67cd commit b6e7948

File tree

5 files changed

+28
-36
lines changed

5 files changed

+28
-36
lines changed

src/query/catalog/src/table_context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,5 @@ pub trait TableContext: Send + Sync {
103103
fn get_cluster(&self) -> Arc<Cluster>;
104104
async fn get_table(&self, catalog: &str, database: &str, table: &str)
105105
-> Result<Arc<dyn Table>>;
106-
async fn get_processes_info(&self) -> Vec<ProcessInfo>;
106+
fn get_processes_info(&self) -> Vec<ProcessInfo>;
107107
}

src/query/service/src/interpreters/interpreter_kill.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl KillInterpreter {
3535
}
3636

3737
async fn execute_kill(&self, session_id: &String) -> Result<PipelineBuildResult> {
38-
match self.ctx.get_session_by_id(session_id).await {
38+
match self.ctx.get_session_by_id(session_id) {
3939
None => Err(ErrorCode::UnknownSession(format!(
4040
"Not found session id {}",
4141
session_id
@@ -73,16 +73,13 @@ impl Interpreter for KillInterpreter {
7373
// otherwise use the session_id.
7474
// More info Link to: https://github.com/datafuselabs/databend/discussions/5405.
7575
match id.parse::<u32>() {
76-
Ok(mysql_conn_id) => {
77-
let session_id = self.ctx.get_id_by_mysql_conn_id(&Some(mysql_conn_id)).await;
78-
match session_id {
79-
Some(get) => self.execute_kill(&get).await,
80-
None => Err(ErrorCode::UnknownSession(format!(
81-
"MySQL connection id {} not found session id",
82-
mysql_conn_id
83-
))),
84-
}
85-
}
76+
Ok(mysql_conn_id) => match self.ctx.get_id_by_mysql_conn_id(&Some(mysql_conn_id)) {
77+
Some(get) => self.execute_kill(&get).await,
78+
None => Err(ErrorCode::UnknownSession(format!(
79+
"MySQL connection id {} not found session id",
80+
mysql_conn_id
81+
))),
82+
},
8683
Err(_) => self.execute_kill(id).await,
8784
}
8885
}

src/query/service/src/sessions/query_ctx.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -171,23 +171,18 @@ impl QueryContext {
171171
}
172172

173173
// Get one session by session id.
174-
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<Arc<Session>> {
175-
SessionManager::instance().get_session_by_id(id).await
174+
pub fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<Arc<Session>> {
175+
SessionManager::instance().get_session_by_id(id)
176176
}
177177

178178
// Get session id by mysql connection id.
179-
pub async fn get_id_by_mysql_conn_id(
180-
self: &Arc<Self>,
181-
conn_id: &Option<u32>,
182-
) -> Option<String> {
183-
SessionManager::instance()
184-
.get_id_by_mysql_conn_id(conn_id)
185-
.await
179+
pub fn get_id_by_mysql_conn_id(self: &Arc<Self>, conn_id: &Option<u32>) -> Option<String> {
180+
SessionManager::instance().get_id_by_mysql_conn_id(conn_id)
186181
}
187182

188183
// Get all the processes list info.
189-
pub async fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
190-
SessionManager::instance().processes_info().await
184+
pub fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
185+
SessionManager::instance().processes_info()
191186
}
192187

193188
/// Get the client socket address.
@@ -392,8 +387,8 @@ impl TableContext for QueryContext {
392387
}
393388

394389
// Get all the processes list info.
395-
async fn get_processes_info(&self) -> Vec<ProcessInfo> {
396-
SessionManager::instance().processes_info().await
390+
fn get_processes_info(&self) -> Vec<ProcessInfo> {
391+
SessionManager::instance().processes_info()
397392
}
398393
}
399394

src/query/service/src/sessions/session_mgr.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,12 @@ impl SessionManager {
147147
}
148148
}
149149

150-
pub async fn get_session_by_id(&self, id: &str) -> Option<Arc<Session>> {
150+
pub fn get_session_by_id(&self, id: &str) -> Option<Arc<Session>> {
151151
let sessions = self.active_sessions.read();
152152
sessions.get(id).and_then(|weak_ptr| weak_ptr.upgrade())
153153
}
154154

155-
pub async fn get_id_by_mysql_conn_id(&self, mysql_conn_id: &Option<u32>) -> Option<String> {
155+
pub fn get_id_by_mysql_conn_id(&self, mysql_conn_id: &Option<u32>) -> Option<String> {
156156
let sessions = self.mysql_conn_map.read();
157157
sessions.get(mysql_conn_id).cloned()
158158
}
@@ -194,7 +194,7 @@ impl SessionManager {
194194
let mut signal = Box::pin(signal.next());
195195

196196
for _index in 0..timeout_secs {
197-
if SessionManager::destroy_idle_sessions(&active_sessions).await {
197+
if SessionManager::destroy_idle_sessions(&active_sessions) {
198198
return;
199199
}
200200

@@ -216,7 +216,7 @@ impl SessionManager {
216216
}
217217
}
218218

219-
pub async fn processes_info(&self) -> Vec<ProcessInfo> {
219+
pub fn processes_info(&self) -> Vec<ProcessInfo> {
220220
let sessions = self.active_sessions.read();
221221

222222
let mut processes_info = Vec::with_capacity(sessions.len());
@@ -229,7 +229,7 @@ impl SessionManager {
229229
processes_info
230230
}
231231

232-
async fn destroy_idle_sessions(sessions: &Arc<RwLock<HashMap<String, Weak<Session>>>>) -> bool {
232+
fn destroy_idle_sessions(sessions: &Arc<RwLock<HashMap<String, Weak<Session>>>>) -> bool {
233233
// Read lock does not support reentrant
234234
// https://github.com/Amanieu/parking_lot::/blob/lock_api-0.4.4/lock_api/src/rwlock.rs#L422
235235
let mut active_sessions_read_guard = sessions.write();

src/query/storages/preludes/src/system/processes_table.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,24 @@ use common_meta_app::schema::TableMeta;
2626
use common_meta_types::UserInfo;
2727

2828
use crate::sessions::TableContext;
29-
use crate::storages::system::table::AsyncOneBlockSystemTable;
30-
use crate::storages::system::table::AsyncSystemTable;
3129
use crate::storages::Table;
30+
use crate::system::SyncOneBlockSystemTable;
31+
use crate::system::SyncSystemTable;
3232

3333
pub struct ProcessesTable {
3434
table_info: TableInfo,
3535
}
3636

3737
#[async_trait::async_trait]
38-
impl AsyncSystemTable for ProcessesTable {
38+
impl SyncSystemTable for ProcessesTable {
3939
const NAME: &'static str = "system.processes";
4040

4141
fn get_table_info(&self) -> &TableInfo {
4242
&self.table_info
4343
}
4444

45-
async fn get_full_data(&self, ctx: Arc<dyn TableContext>) -> Result<DataBlock> {
46-
let processes_info = ctx.get_processes_info().await;
45+
fn get_full_data(&self, ctx: Arc<dyn TableContext>) -> Result<DataBlock> {
46+
let processes_info = ctx.get_processes_info();
4747

4848
let mut processes_id = Vec::with_capacity(processes_info.len());
4949
let mut processes_type = Vec::with_capacity(processes_info.len());
@@ -129,7 +129,7 @@ impl ProcessesTable {
129129
},
130130
};
131131

132-
AsyncOneBlockSystemTable::create(ProcessesTable { table_info })
132+
SyncOneBlockSystemTable::create(ProcessesTable { table_info })
133133
}
134134

135135
fn process_host(client_address: &Option<SocketAddr>) -> Option<Vec<u8>> {

0 commit comments

Comments
 (0)