Skip to content

Commit fa49f1c

Browse files
authored
Merge pull request #7535 from zhyass/feature_fix
feat(storage): add clustering_history system table
2 parents dbaf2e5 + ea3f0a8 commit fa49f1c

File tree

11 files changed

+350
-45
lines changed

11 files changed

+350
-45
lines changed

src/query/service/src/databases/system/system_database.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ impl SystemDatabase {
5252
sys_db_meta.next_table_id(),
5353
config.query.max_query_log_size as i32,
5454
)),
55+
Arc::new(system::ClusteringHistoryTable::create(
56+
sys_db_meta.next_table_id(),
57+
config.query.max_query_log_size as i32,
58+
)),
5559
system::EnginesTable::create(sys_db_meta.next_table_id()),
5660
system::RolesTable::create(sys_db_meta.next_table_id()),
5761
system::StagesTable::create(sys_db_meta.next_table_id()),
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
use std::time::SystemTime;
17+
use std::time::UNIX_EPOCH;
18+
19+
use common_catalog::catalog::CATALOG_DEFAULT;
20+
use common_datablocks::DataBlock;
21+
use common_datavalues::prelude::Series;
22+
use common_datavalues::prelude::SeriesFrom;
23+
use common_exception::Result;
24+
25+
use crate::sessions::QueryContext;
26+
use crate::sessions::TableContext;
27+
use crate::storages::system::ClusteringHistoryTable;
28+
29+
pub struct InterpreterClusteringHistory {
30+
ctx: Arc<QueryContext>,
31+
}
32+
33+
impl InterpreterClusteringHistory {
34+
pub fn create(ctx: Arc<QueryContext>) -> Self {
35+
InterpreterClusteringHistory { ctx }
36+
}
37+
38+
pub async fn write_log(
39+
&self,
40+
start: SystemTime,
41+
db_name: &str,
42+
table_name: &str,
43+
) -> Result<()> {
44+
let start_time = start
45+
.duration_since(UNIX_EPOCH)
46+
.expect("Time went backwards")
47+
.as_micros() as i64;
48+
let end_time = SystemTime::now()
49+
.duration_since(UNIX_EPOCH)
50+
.expect("Time went backwards")
51+
.as_micros() as i64;
52+
let reclustered_bytes = self.ctx.get_scan_progress_value().bytes as u64;
53+
let reclustered_rows = self.ctx.get_scan_progress_value().rows as u64;
54+
55+
let table = self
56+
.ctx
57+
.get_table(CATALOG_DEFAULT, "system", "clustering_history")
58+
.await?;
59+
let schema = table.get_table_info().meta.schema.clone();
60+
61+
let block = DataBlock::create(schema.clone(), vec![
62+
Series::from_data(vec![start_time]),
63+
Series::from_data(vec![end_time]),
64+
Series::from_data(vec![db_name]),
65+
Series::from_data(vec![table_name]),
66+
Series::from_data(vec![reclustered_bytes]),
67+
Series::from_data(vec![reclustered_rows]),
68+
]);
69+
let blocks = vec![Ok(block)];
70+
let input_stream = futures::stream::iter::<Vec<Result<DataBlock>>>(blocks);
71+
72+
let clustering_history_table: &ClusteringHistoryTable =
73+
table.as_any().downcast_ref().unwrap();
74+
clustering_history_table
75+
.append_data(self.ctx.clone(), Box::pin(input_stream))
76+
.await?;
77+
Ok(())
78+
}
79+
}

src/query/service/src/interpreters/interpreter_query_log.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -185,29 +185,29 @@ impl InterpreterQueryLog {
185185
Series::from_data(vec![event.query_id.as_str()]),
186186
Series::from_data(vec![event.query_kind.as_str()]),
187187
Series::from_data(vec![event.query_text.as_str()]),
188-
Series::from_data(vec![event.event_date as i32]),
189-
Series::from_data(vec![event.event_time as i64]),
188+
Series::from_data(vec![event.event_date]),
189+
Series::from_data(vec![event.event_time]),
190190
// Schema.
191191
Series::from_data(vec![event.current_database.as_str()]),
192192
Series::from_data(vec![event.databases.as_str()]),
193193
Series::from_data(vec![event.tables.as_str()]),
194194
Series::from_data(vec![event.columns.as_str()]),
195195
Series::from_data(vec![event.projections.as_str()]),
196196
// Stats.
197-
Series::from_data(vec![event.written_rows as u64]),
198-
Series::from_data(vec![event.written_bytes as u64]),
199-
Series::from_data(vec![event.written_io_bytes as u64]),
200-
Series::from_data(vec![event.written_io_bytes_cost_ms as u64]),
201-
Series::from_data(vec![event.scan_rows as u64]),
202-
Series::from_data(vec![event.scan_bytes as u64]),
203-
Series::from_data(vec![event.scan_io_bytes as u64]),
204-
Series::from_data(vec![event.scan_io_bytes_cost_ms as u64]),
205-
Series::from_data(vec![event.scan_partitions as u64]),
206-
Series::from_data(vec![event.total_partitions as u64]),
207-
Series::from_data(vec![event.result_rows as u64]),
208-
Series::from_data(vec![event.result_bytes as u64]),
197+
Series::from_data(vec![event.written_rows]),
198+
Series::from_data(vec![event.written_bytes]),
199+
Series::from_data(vec![event.written_io_bytes]),
200+
Series::from_data(vec![event.written_io_bytes_cost_ms]),
201+
Series::from_data(vec![event.scan_rows]),
202+
Series::from_data(vec![event.scan_bytes]),
203+
Series::from_data(vec![event.scan_io_bytes]),
204+
Series::from_data(vec![event.scan_io_bytes_cost_ms]),
205+
Series::from_data(vec![event.scan_partitions]),
206+
Series::from_data(vec![event.total_partitions]),
207+
Series::from_data(vec![event.result_rows]),
208+
Series::from_data(vec![event.result_bytes]),
209209
Series::from_data(vec![event.cpu_usage]),
210-
Series::from_data(vec![event.memory_usage as u64]),
210+
Series::from_data(vec![event.memory_usage]),
211211
// Client.
212212
Series::from_data(vec![event.client_info.as_str()]),
213213
Series::from_data(vec![event.client_address.as_str()]),

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
// limitations under the License.
1414

1515
use std::sync::Arc;
16+
use std::time::SystemTime;
1617

1718
use common_exception::Result;
1819
use common_planners::ReclusterTablePlan;
1920
use common_streams::DataBlockStream;
2021
use common_streams::SendableDataBlockStream;
2122

2223
use crate::interpreters::Interpreter;
24+
use crate::interpreters::InterpreterClusteringHistory;
2325
use crate::pipelines::executor::ExecutorSettings;
2426
use crate::pipelines::executor::PipelineCompleteExecutor;
2527
use crate::pipelines::Pipeline;
@@ -48,6 +50,7 @@ impl Interpreter for ReclusterTableInterpreter {
4850
let ctx = self.ctx.clone();
4951
let settings = ctx.get_settings();
5052
let tenant = ctx.get_tenant();
53+
let start = SystemTime::now();
5154
loop {
5255
let table = self
5356
.ctx
@@ -92,6 +95,10 @@ impl Interpreter for ReclusterTableInterpreter {
9295
}
9396
}
9497

98+
InterpreterClusteringHistory::create(ctx.clone())
99+
.write_log(start, &plan.database, &plan.table)
100+
.await?;
101+
95102
Ok(Box::pin(DataBlockStream::create(
96103
self.plan.schema(),
97104
None,

src/query/service/src/interpreters/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod interpreter;
2020
mod interpreter_call;
2121
mod interpreter_cluster_key_alter;
2222
mod interpreter_cluster_key_drop;
23+
mod interpreter_clustering_history;
2324
mod interpreter_common;
2425
mod interpreter_copy_v2;
2526
mod interpreter_database_create;
@@ -106,6 +107,7 @@ pub use interpreter::InterpreterPtr;
106107
pub use interpreter_call::CallInterpreter;
107108
pub use interpreter_cluster_key_alter::AlterTableClusterKeyInterpreter;
108109
pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
110+
pub use interpreter_clustering_history::InterpreterClusteringHistory;
109111
pub use interpreter_common::append2table;
110112
pub use interpreter_common::commit2table;
111113
pub use interpreter_common::list_files_from_dal;

src/query/service/tests/it/interpreters/interpreter_table_recluster.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,22 @@ async fn test_alter_recluster_interpreter() -> Result<()> {
115115
common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice());
116116
}
117117

118+
// clustering_history.
119+
{
120+
let query = "select count(*) from system.clustering_history";
121+
let (plan, _, _) = planner.plan_sql(query).await?;
122+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
123+
let stream = executor.execute().await?;
124+
let result = stream.try_collect::<Vec<_>>().await?;
125+
let expected = vec![
126+
"+----------+",
127+
"| count(*) |",
128+
"+----------+",
129+
"| 1 |",
130+
"+----------+",
131+
];
132+
common_datablocks::assert_blocks_sorted_eq(expected, result.as_slice());
133+
}
134+
118135
Ok(())
119136
}

src/query/service/tests/it/storages/system/databases_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use databend_query::storages::ToReadDataSourcePlan;
2020
use futures::TryStreamExt;
2121

2222
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
23-
async fn test_tables_table() -> Result<()> {
23+
async fn test_databases_table() -> Result<()> {
2424
let (_guard, ctx) = crate::tests::create_query_context().await?;
2525
let table = DatabasesTable::create(1);
2626
let source_plan = table.read_plan(ctx.clone(), None).await?;

0 commit comments

Comments
 (0)