Skip to content

Commit 11afe9d

Browse files
authored
feat: enhance system.temporary_tables (#18339)
* feat: enhance system.temporary_tables - Enhance system.temporary_tables with additional columns (created_on, updated_on, compressed_data_bytes, index_data_bytes, num_rows, num_segments, num_blocks) - Set distribution level to Cluster for proper data collection across cluster nodes - Add node ID column to identify cluster node hosting temporary tables - Add client_session_id function * add logic test * shrink lock scope * fix lint error * update ut test data
1 parent b9e4e50 commit 11afe9d

File tree

9 files changed

+246
-77
lines changed

9 files changed

+246
-77
lines changed

src/query/service/src/catalogs/default/session_catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ impl Catalog for SessionCatalog {
437437
.await?
438438
{
439439
ClientSessionManager::instance()
440-
.remove_temp_tbl_mgr(req.temp_prefix, self.temp_tbl_mgr.clone());
440+
.remove_temp_tbl_mgr(req.temp_prefix, &self.temp_tbl_mgr);
441441
return Ok(reply);
442442
}
443443
self.inner.drop_table_by_id(req).await

src/query/service/src/servers/http/v1/session/client_session_manager.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -408,11 +408,15 @@ impl ClientSessionManager {
408408
}
409409
}
410410

411-
pub fn remove_temp_tbl_mgr(&self, prefix: String, temp_tbl_mgr: TempTblMgrRef) {
411+
pub fn remove_temp_tbl_mgr(&self, prefix: String, temp_tbl_mgr: &TempTblMgrRef) {
412412
let mut guard = self.session_state.lock();
413-
let is_empty = temp_tbl_mgr.lock().is_empty();
413+
let is_empty = { temp_tbl_mgr.lock().is_empty() };
414414
if is_empty {
415-
guard.remove(&prefix);
415+
let removed_session_state = guard.remove(&prefix);
416+
// Defensive check that drop session state contains no temp tables
417+
if let Some(removed_session_state) = removed_session_state {
418+
assert!({ removed_session_state.temp_tbl_mgr.lock().is_empty() });
419+
}
416420
// all temp table dropped by user, data should have been removed when executing drop.
417421
info!("[TEMP TABLE] session={prefix} removed from ClientSessionManager");
418422
}

src/query/service/src/servers/mysql/mysql_session.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,12 @@ impl MySQLConnection {
101101
.drop_client_session_id(&session_id, &user)
102102
.await
103103
.ok();
104-
drop_all_temp_tables(&session_id, session.temp_tbl_mgr(), "mysql").await
104+
drop_all_temp_tables(
105+
&format!("{user}/{session_id}"),
106+
session.temp_tbl_mgr(),
107+
"mysql",
108+
)
109+
.await
105110
});
106111
let _ = futures::executor::block_on(join_handle);
107112
});

src/query/service/src/sessions/session_mgr.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,6 @@ impl SessionManager {
404404
// Since there are chances that we are the last one that holding the reference, and the
405405
// destruction of session need to acquire the write lock of `active_sessions`, which leads
406406
// to dead lock.
407-
//
408-
// Although online expression can also do this, to make this clearer, we wrap it in a block
409407

410408
let active_sessions_guard = self.active_sessions.read();
411409
active_sessions_guard.values().cloned().collect::<Vec<_>>()
@@ -431,4 +429,37 @@ impl SessionManager {
431429

432430
false
433431
}
432+
pub fn get_all_temp_tables(
433+
&self,
434+
) -> Result<
435+
Vec<(
436+
String,
437+
SessionType,
438+
databend_common_meta_app::schema::TableInfo,
439+
)>,
440+
> {
441+
let mut all_temp_tables = Vec::new();
442+
let sessions = self.active_sessions_snapshot();
443+
for session in sessions {
444+
if let Some(session) = session.upgrade() {
445+
let temp_tables = {
446+
let mgr_ref = session.temp_tbl_mgr();
447+
let vals = mgr_ref.lock().list_tables();
448+
vals
449+
}?;
450+
for table in temp_tables {
451+
all_temp_tables.push((
452+
format!(
453+
"{}/{}",
454+
session.get_current_user()?.name.clone(),
455+
session.id.clone()
456+
),
457+
session.typ.read().clone(),
458+
table,
459+
));
460+
}
461+
}
462+
}
463+
Ok(all_temp_tables)
464+
}
434465
}

src/query/service/src/table_functions/temporary_tables_table.rs

Lines changed: 102 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,20 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_catalog::session_type::SessionType;
18+
use databend_common_catalog::table::DistributionLevel;
1719
use databend_common_catalog::table::Table;
1820
use databend_common_catalog::table_context::TableContext;
1921
use databend_common_exception::Result;
2022
use databend_common_expression::types::BooleanType;
23+
use databend_common_expression::types::DataType;
2124
use databend_common_expression::types::NumberDataType;
2225
use databend_common_expression::types::StringType;
26+
use databend_common_expression::types::TimestampType;
2327
use databend_common_expression::types::UInt64Type;
2428
use databend_common_expression::DataBlock;
2529
use databend_common_expression::FromData;
30+
use databend_common_expression::Scalar;
2631
use databend_common_expression::TableDataType;
2732
use databend_common_expression::TableField;
2833
use databend_common_expression::TableSchemaRefExt;
@@ -33,6 +38,7 @@ use databend_common_storages_system::SyncOneBlockSystemTable;
3338
use databend_common_storages_system::SyncSystemTable;
3439

3540
use crate::servers::http::v1::ClientSessionManager;
41+
use crate::sessions::SessionManager;
3642

3743
pub struct TemporaryTablesTable {
3844
table_info: TableInfo,
@@ -41,26 +47,62 @@ pub struct TemporaryTablesTable {
4147
impl SyncSystemTable for TemporaryTablesTable {
4248
const NAME: &'static str = "system.temporary_tables";
4349

50+
const DISTRIBUTION_LEVEL: DistributionLevel = DistributionLevel::Cluster;
51+
4452
fn get_table_info(&self) -> &TableInfo {
4553
&self.table_info
4654
}
4755

4856
fn get_full_data(&self, ctx: Arc<dyn TableContext>) -> Result<DataBlock> {
57+
let node_id = ctx.get_cluster().local_id.clone();
58+
4959
let mut dbs = Vec::new();
5060
let mut names = Vec::new();
5161
let mut table_ids = Vec::new();
5262
let mut engines = Vec::new();
5363
let mut users = Vec::new();
5464
let mut session_ids = Vec::new();
65+
let mut session_types = Vec::new();
5566
let mut is_current_sessions = Vec::new();
67+
let mut created_ons = Vec::new();
68+
let mut updated_ons = Vec::new();
69+
let mut compressed_data_bytes_col = Vec::new();
70+
let mut index_bytes_col = Vec::new();
71+
let mut number_of_rows_col = Vec::new();
72+
let mut number_of_blocks_col = Vec::new();
73+
let mut number_of_segments_col = Vec::new();
74+
75+
let mysql_temp_tables = {
76+
let session_manager = SessionManager::instance();
77+
session_manager
78+
.get_all_temp_tables()?
79+
.into_iter()
80+
.filter(|(_, typ, _)| typ == &SessionType::MySQL)
81+
.collect::<Vec<_>>()
82+
};
83+
84+
let http_client_temp_tables = {
85+
let client_session_manager = ClientSessionManager::instance();
86+
client_session_manager
87+
.get_all_temp_tables()?
88+
.into_iter()
89+
.map(|(session_key, table)| (session_key, SessionType::HTTPQuery, table))
90+
.collect::<Vec<_>>()
91+
};
92+
93+
let all_temp_tables = mysql_temp_tables
94+
.into_iter()
95+
.chain(http_client_temp_tables)
96+
.collect::<Vec<_>>();
5697

57-
let client_session_manager = ClientSessionManager::instance();
58-
let all_temp_tables = client_session_manager.get_all_temp_tables()?;
98+
let current_session_type = ctx.get_session_type();
5999

60-
let current_session_id = ctx.get_current_client_session_id();
61-
log::info!("current_session_id: {:?}", current_session_id);
100+
// currently http clients have their own client side session ids, besides server side session ids
101+
let current_client_session_id = ctx.get_current_client_session_id();
102+
let current_session_id = ctx.get_current_session_id();
103+
log::info!("current_client_session_id: {:?}", current_client_session_id);
62104

63-
for (session_key, table) in all_temp_tables {
105+
for (session_key, typ, table) in all_temp_tables {
64106
log::info!("session_key: {:?}", session_key);
65107
let desc = table.desc;
66108
let db_name = desc
@@ -77,28 +119,55 @@ impl SyncSystemTable for TemporaryTablesTable {
77119

78120
let user = session_key.split('/').next().unwrap().to_string();
79121
let session_id = session_key.split('/').nth(1).unwrap().to_string();
80-
let is_current_session = current_session_id
81-
.as_ref()
82-
.map(|id| id == &session_id)
83-
.unwrap_or(false);
122+
let is_current_session = {
123+
if current_session_type == SessionType::HTTPQuery {
124+
current_client_session_id
125+
.as_ref()
126+
.map(|id| id == &session_id)
127+
.unwrap_or(false)
128+
} else {
129+
current_session_id == session_id
130+
}
131+
};
132+
133+
let meta = table.meta;
84134
dbs.push(db_name.to_string());
85135
names.push(table.name);
86136
table_ids.push(table.ident.table_id);
87-
engines.push(table.meta.engine);
137+
engines.push(meta.engine);
88138
users.push(user);
89139
session_ids.push(session_id);
140+
session_types.push(typ.to_string());
90141
is_current_sessions.push(is_current_session);
142+
created_ons.push(meta.created_on.timestamp_micros());
143+
updated_ons.push(meta.updated_on.timestamp_micros());
144+
compressed_data_bytes_col.push(meta.statistics.compressed_data_bytes);
145+
index_bytes_col.push(meta.statistics.index_data_bytes);
146+
number_of_rows_col.push(meta.statistics.number_of_rows);
147+
number_of_segments_col.push(meta.statistics.number_of_segments);
148+
number_of_blocks_col.push(meta.statistics.number_of_blocks);
91149
}
92150

93-
Ok(DataBlock::new_from_columns(vec![
151+
let mut block = DataBlock::new_from_columns(vec![
94152
StringType::from_data(dbs),
95153
StringType::from_data(names),
96154
UInt64Type::from_data(table_ids),
97155
StringType::from_data(engines),
98156
StringType::from_data(users),
99157
StringType::from_data(session_ids),
158+
StringType::from_data(session_types),
100159
BooleanType::from_data(is_current_sessions),
101-
]))
160+
TimestampType::from_data(created_ons),
161+
TimestampType::from_data(updated_ons),
162+
UInt64Type::from_data(compressed_data_bytes_col),
163+
UInt64Type::from_data(index_bytes_col),
164+
UInt64Type::from_data(number_of_rows_col),
165+
UInt64Type::from_opt_data(number_of_segments_col),
166+
UInt64Type::from_opt_data(number_of_blocks_col),
167+
]);
168+
169+
block.add_const_column(Scalar::String(node_id), DataType::String);
170+
Ok(block)
102171
}
103172
}
104173

@@ -111,7 +180,28 @@ impl TemporaryTablesTable {
111180
TableField::new("engine", TableDataType::String),
112181
TableField::new("user", TableDataType::String),
113182
TableField::new("session_id", TableDataType::String),
183+
TableField::new("session_type", TableDataType::String),
114184
TableField::new("is_current_session", TableDataType::Boolean),
185+
TableField::new("created_on", TableDataType::Timestamp),
186+
TableField::new("updated_on", TableDataType::Timestamp),
187+
TableField::new(
188+
"compressed_data_bytes",
189+
TableDataType::Number(NumberDataType::UInt64),
190+
),
191+
TableField::new(
192+
"index_data_bytes",
193+
TableDataType::Number(NumberDataType::UInt64),
194+
),
195+
TableField::new("num_rows", TableDataType::Number(NumberDataType::UInt64)),
196+
TableField::new(
197+
"num_segments",
198+
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
199+
),
200+
TableField::new(
201+
"num_blocks",
202+
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
203+
),
204+
TableField::new("node", TableDataType::String),
115205
]);
116206
let table_info = TableInfo {
117207
ident: TableIdent::new(table_id, 0),

0 commit comments

Comments
 (0)