Skip to content

Commit 13c648c

Browse files
committed
add fuse_block table function
1 parent bb42c39 commit 13c648c

File tree

9 files changed

+415
-0
lines changed

9 files changed

+415
-0
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2021 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_datablocks::DataBlock;
18+
use common_datavalues::DataSchema;
19+
use common_exception::Result;
20+
21+
use crate::procedures::OneBlockProcedure;
22+
use crate::procedures::Procedure;
23+
use crate::procedures::ProcedureFeatures;
24+
use crate::sessions::QueryContext;
25+
use crate::sessions::TableContext;
26+
use crate::storages::fuse::table_functions::FuseBlock;
27+
use crate::storages::fuse::FuseTable;
28+
29+
pub struct FuseBlockProcedure {}
30+
31+
impl FuseBlockProcedure {
32+
pub fn try_create() -> Result<Box<dyn Procedure>> {
33+
Ok(FuseBlockProcedure {}.into_procedure())
34+
}
35+
}
36+
37+
#[async_trait::async_trait]
38+
impl OneBlockProcedure for FuseBlockProcedure {
39+
fn name(&self) -> &str {
40+
"FUSE_BLOCK"
41+
}
42+
43+
fn features(&self) -> ProcedureFeatures {
44+
ProcedureFeatures::default().num_arguments(3)
45+
}
46+
47+
async fn all_data(&self, ctx: Arc<QueryContext>, args: Vec<String>) -> Result<DataBlock> {
48+
let database_name = args[0].clone();
49+
let table_name = args[1].clone();
50+
let snapshot_id = args[2].clone();
51+
let tenant_id = ctx.get_tenant();
52+
let tbl = ctx
53+
.get_catalog(&ctx.get_current_catalog())?
54+
.get_table(
55+
tenant_id.as_str(),
56+
database_name.as_str(),
57+
table_name.as_str(),
58+
)
59+
.await?;
60+
61+
let tbl = FuseTable::try_from_table(tbl.as_ref())?;
62+
63+
Ok(FuseBlock::new(ctx, tbl, snapshot_id).get_blocks().await?)
64+
}
65+
66+
fn schema(&self) -> Arc<DataSchema> {
67+
FuseBlock::schema()
68+
}
69+
}

src/query/service/src/procedures/systems/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
// limitations under the License.
1414

1515
mod clustering_information;
16+
mod fuse_block;
1617
mod fuse_segment;
1718
mod fuse_snapshot;
1819
mod search_tables;
1920
mod system;
2021

2122
pub use clustering_information::ClusteringInformationProcedure;
23+
pub use fuse_block::FuseBlockProcedure;
2224
pub use fuse_segment::FuseSegmentProcedure;
2325
pub use fuse_snapshot::FuseSnapshotProcedure;
2426
pub use search_tables::SearchTablesProcedure;

src/query/service/src/procedures/systems/system.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use crate::procedures::systems::ClusteringInformationProcedure;
16+
use crate::procedures::systems::FuseBlockProcedure;
1617
use crate::procedures::systems::FuseSegmentProcedure;
1718
use crate::procedures::systems::FuseSnapshotProcedure;
1819
use crate::procedures::systems::SearchTablesProcedure;
@@ -34,6 +35,10 @@ impl SystemProcedure {
3435
"system$fuse_segment",
3536
Box::new(FuseSegmentProcedure::try_create),
3637
);
38+
factory.register(
39+
"system$fuse_block",
40+
Box::new(FuseBlockProcedure::try_create),
41+
);
3742
factory.register(
3843
"system$search_tables",
3944
Box::new(SearchTablesProcedure::try_create),

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use parking_lot::RwLock;
2424
use crate::catalogs::SYS_TBL_FUC_ID_END;
2525
use crate::catalogs::SYS_TBL_FUNC_ID_BEGIN;
2626
use crate::storages::fuse::table_functions::ClusteringInformationTable;
27+
use crate::storages::fuse::table_functions::FuseBlockTable;
2728
use crate::storages::fuse::table_functions::FuseSegmentTable;
2829
use crate::storages::fuse::table_functions::FuseSnapshotTable;
2930
use crate::table_functions::async_crash_me::AsyncCrashMeTable;
@@ -105,6 +106,10 @@ impl TableFunctionFactory {
105106
"fuse_segment".to_string(),
106107
(next_id(), Arc::new(FuseSegmentTable::create)),
107108
);
109+
creators.insert(
110+
"fuse_block".to_string(),
111+
(next_id(), Arc::new(FuseBlockTable::create)),
112+
);
108113

109114
creators.insert(
110115
"clustering_information".to_string(),
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_datablocks::DataBlock;
18+
use common_datavalues::prelude::*;
19+
use common_exception::Result;
20+
use futures_util::TryStreamExt;
21+
22+
use crate::io::MetaReaders;
23+
use crate::io::SnapshotHistoryReader;
24+
use crate::sessions::TableContext;
25+
use crate::FuseTable;
26+
27+
pub struct FuseBlock<'a> {
28+
pub ctx: Arc<dyn TableContext>,
29+
pub table: &'a FuseTable,
30+
pub snapshot_id: String,
31+
}
32+
33+
impl<'a> FuseBlock<'a> {
34+
pub fn new(ctx: Arc<dyn TableContext>, table: &'a FuseTable, snapshot_id: String) -> Self {
35+
Self {
36+
ctx,
37+
table,
38+
snapshot_id,
39+
}
40+
}
41+
42+
pub async fn get_blocks(&self) -> Result<DataBlock> {
43+
let tbl = self.table;
44+
let maybe_snapshot = tbl.read_table_snapshot(self.ctx.clone()).await?;
45+
if let Some(snapshot) = maybe_snapshot {
46+
// prepare the stream of snapshot
47+
let snapshot_version = tbl.snapshot_format_version();
48+
let snapshot_location = tbl
49+
.meta_location_generator
50+
.snapshot_location_from_uuid(&snapshot.snapshot_id, snapshot_version)?;
51+
let reader = MetaReaders::table_snapshot_reader(self.ctx.clone());
52+
let mut snapshot_stream = reader.snapshot_history(
53+
snapshot_location,
54+
snapshot_version,
55+
tbl.meta_location_generator().clone(),
56+
);
57+
58+
// find the element by snapshot_id in stream
59+
while let Some(snapshot) = snapshot_stream.try_next().await? {
60+
if snapshot.snapshot_id.simple().to_string() == self.snapshot_id {
61+
let len = snapshot.summary.block_count as usize;
62+
let snapshot_id = vec![self.snapshot_id.clone().into_bytes()];
63+
let timestamp =
64+
vec![snapshot.timestamp.map(|dt| (dt.timestamp_micros()) as i64)];
65+
let mut block_location: Vec<Vec<u8>> = Vec::with_capacity(len);
66+
let mut block_size: Vec<u64> = Vec::with_capacity(len);
67+
let mut bloom_filter_location: Vec<Option<Vec<u8>>> = Vec::with_capacity(len);
68+
let mut bloom_filter_size: Vec<u64> = Vec::with_capacity(len);
69+
70+
let reader = MetaReaders::segment_info_reader(self.ctx.as_ref());
71+
for (x, ver) in &snapshot.segments {
72+
let segment = reader.read(x, None, *ver).await?;
73+
segment.blocks.clone().into_iter().for_each(|block| {
74+
block_location.push(block.location.0.into_bytes());
75+
block_size.push(block.block_size);
76+
bloom_filter_location.push(
77+
block
78+
.bloom_filter_index_location
79+
.map(|(s, _)| s.into_bytes()),
80+
);
81+
bloom_filter_size.push(block.bloom_filter_index_size);
82+
});
83+
}
84+
85+
return Ok(DataBlock::create(FuseBlock::schema(), vec![
86+
Arc::new(ConstColumn::new(Series::from_data(snapshot_id), len)),
87+
Arc::new(ConstColumn::new(Series::from_data(timestamp), len)),
88+
Series::from_data(block_location),
89+
Series::from_data(block_size),
90+
Series::from_data(bloom_filter_location),
91+
Series::from_data(bloom_filter_size),
92+
]));
93+
}
94+
}
95+
}
96+
97+
Ok(DataBlock::empty_with_schema(Self::schema()))
98+
}
99+
100+
pub fn schema() -> Arc<DataSchema> {
101+
DataSchemaRefExt::create(vec![
102+
DataField::new("snapshot_id", Vu8::to_data_type()),
103+
DataField::new_nullable("timestamp", TimestampType::new_impl(6)),
104+
DataField::new("block_location", Vu8::to_data_type()),
105+
DataField::new("block_size", u64::to_data_type()),
106+
DataField::new_nullable("bloom_filter_location", Vu8::to_data_type()),
107+
DataField::new("bloom_filter_size", u64::to_data_type()),
108+
])
109+
}
110+
}

0 commit comments

Comments
 (0)