Skip to content

Commit 3c2993e

Browse files
authored
Merge pull request #7631 from zhyass/feature_fix
feat(storage): get all parquet file list for fuse engine
2 parents 4d92219 + ce01285 commit 3c2993e

File tree

17 files changed

+688
-6
lines changed

17 files changed

+688
-6
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
---
2+
title: FUSE_BLOCK
3+
---
4+
5+
Returns the block information of a snapshot of table.
6+
7+
## Syntax
8+
9+
```sql
10+
FUSE_BLOCK('<database_name>', '<table_name>'[, '<snapshot_id>'])
11+
```
12+
13+
## Examples
14+
15+
```sql
16+
CREATE TABLE mytable(c int);
17+
INSERT INTO mytable values(1);
18+
INSERT INTO mytable values(2);
19+
20+
SELECT * FROM FUSE_BLOCK('default', 'mytable');
21+
22+
---
23+
+----------------------------------+----------------------------+----------------------------------------------------+------------+----------------------------------------------------+-------------------+
24+
| snapshot_id | timestamp | block_location | block_size | bloom_filter_location | bloom_filter_size |
25+
+----------------------------------+----------------------------+----------------------------------------------------+------------+----------------------------------------------------+-------------------+
26+
| 51e84b56458f44269b05a059b364a659 | 2022-09-15 07:14:14.137268 | 1/7/_b/39a6dbbfd9b44ad5a8ec8ab264c93cf5_v0.parquet | 4 | 1/7/_i/39a6dbbfd9b44ad5a8ec8ab264c93cf5_v1.parquet | 221 |
27+
| 51e84b56458f44269b05a059b364a659 | 2022-09-15 07:14:14.137268 | 1/7/_b/d0ee9688c4d24d6da86acd8b0d6f4fad_v0.parquet | 4 | 1/7/_i/d0ee9688c4d24d6da86acd8b0d6f4fad_v1.parquet | 219 |
28+
+----------------------------------+----------------------------+----------------------------------------------------+------------+----------------------------------------------------+-------------------+
29+
```
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_datablocks::DataBlock;
18+
use common_datavalues::DataSchema;
19+
use common_exception::Result;
20+
21+
use crate::procedures::OneBlockProcedure;
22+
use crate::procedures::Procedure;
23+
use crate::procedures::ProcedureFeatures;
24+
use crate::sessions::QueryContext;
25+
use crate::sessions::TableContext;
26+
use crate::storages::fuse::table_functions::FuseBlock;
27+
use crate::storages::fuse::FuseTable;
28+
29+
pub struct FuseBlockProcedure {}
30+
31+
impl FuseBlockProcedure {
32+
pub fn try_create() -> Result<Box<dyn Procedure>> {
33+
Ok(FuseBlockProcedure {}.into_procedure())
34+
}
35+
}
36+
37+
#[async_trait::async_trait]
38+
impl OneBlockProcedure for FuseBlockProcedure {
39+
fn name(&self) -> &str {
40+
"FUSE_BLOCK"
41+
}
42+
43+
fn features(&self) -> ProcedureFeatures {
44+
ProcedureFeatures::default().variadic_arguments(2, 3)
45+
}
46+
47+
async fn all_data(&self, ctx: Arc<QueryContext>, args: Vec<String>) -> Result<DataBlock> {
48+
let database_name = args[0].clone();
49+
let table_name = args[1].clone();
50+
let snapshot_id = if args.len() > 2 {
51+
Some(args[2].clone())
52+
} else {
53+
None
54+
};
55+
let tenant_id = ctx.get_tenant();
56+
let tbl = ctx
57+
.get_catalog(&ctx.get_current_catalog())?
58+
.get_table(
59+
tenant_id.as_str(),
60+
database_name.as_str(),
61+
table_name.as_str(),
62+
)
63+
.await?;
64+
65+
let tbl = FuseTable::try_from_table(tbl.as_ref())?;
66+
67+
Ok(FuseBlock::new(ctx, tbl, snapshot_id).get_blocks().await?)
68+
}
69+
70+
fn schema(&self) -> Arc<DataSchema> {
71+
FuseBlock::schema()
72+
}
73+
}

src/query/service/src/procedures/systems/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
// limitations under the License.
1414

1515
mod clustering_information;
16+
mod fuse_block;
1617
mod fuse_segment;
1718
mod fuse_snapshot;
1819
mod search_tables;
1920
mod system;
2021

2122
pub use clustering_information::ClusteringInformationProcedure;
23+
pub use fuse_block::FuseBlockProcedure;
2224
pub use fuse_segment::FuseSegmentProcedure;
2325
pub use fuse_snapshot::FuseSnapshotProcedure;
2426
pub use search_tables::SearchTablesProcedure;

src/query/service/src/procedures/systems/system.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use crate::procedures::systems::ClusteringInformationProcedure;
16+
use crate::procedures::systems::FuseBlockProcedure;
1617
use crate::procedures::systems::FuseSegmentProcedure;
1718
use crate::procedures::systems::FuseSnapshotProcedure;
1819
use crate::procedures::systems::SearchTablesProcedure;
@@ -34,6 +35,10 @@ impl SystemProcedure {
3435
"system$fuse_segment",
3536
Box::new(FuseSegmentProcedure::try_create),
3637
);
38+
factory.register(
39+
"system$fuse_block",
40+
Box::new(FuseBlockProcedure::try_create),
41+
);
3742
factory.register(
3843
"system$search_tables",
3944
Box::new(SearchTablesProcedure::try_create),

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use parking_lot::RwLock;
2424
use crate::catalogs::SYS_TBL_FUC_ID_END;
2525
use crate::catalogs::SYS_TBL_FUNC_ID_BEGIN;
2626
use crate::storages::fuse::table_functions::ClusteringInformationTable;
27+
use crate::storages::fuse::table_functions::FuseBlockTable;
2728
use crate::storages::fuse::table_functions::FuseSegmentTable;
2829
use crate::storages::fuse::table_functions::FuseSnapshotTable;
2930
use crate::table_functions::async_crash_me::AsyncCrashMeTable;
@@ -105,6 +106,10 @@ impl TableFunctionFactory {
105106
"fuse_segment".to_string(),
106107
(next_id(), Arc::new(FuseSegmentTable::create)),
107108
);
109+
creators.insert(
110+
"fuse_block".to_string(),
111+
(next_id(), Arc::new(FuseBlockTable::create)),
112+
);
108113

109114
creators.insert(
110115
"clustering_information".to_string(),

src/query/service/tests/it/interpreters/interpreter_call.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,72 @@ async fn test_call_fuse_snapshot_interpreter() -> Result<()> {
110110
Ok(())
111111
}
112112

113+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
114+
async fn test_call_fuse_block_interpreter() -> Result<()> {
115+
let (_guard, ctx) = crate::tests::create_query_context().await?;
116+
let mut planner = Planner::new(ctx.clone());
117+
118+
// NumberArgumentsNotMatch
119+
{
120+
let query = "call system$fuse_block()";
121+
let (plan, _, _) = planner.plan_sql(query).await?;
122+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
123+
assert_eq!(executor.name(), "CallInterpreter");
124+
let res = executor.execute().await;
125+
assert_eq!(res.is_err(), true);
126+
let expect = "Code: 1028, displayText = Function `FUSE_BLOCK` expect to have [2, 3] arguments, but got 0.";
127+
assert_eq!(expect, res.err().unwrap().to_string());
128+
}
129+
130+
// UnknownTable
131+
{
132+
let query = "call system$fuse_block(default, test)";
133+
let (plan, _, _) = planner.plan_sql(query).await?;
134+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
135+
assert_eq!(executor.name(), "CallInterpreter");
136+
let res = executor.execute().await;
137+
assert_eq!(res.is_err(), true);
138+
assert_eq!(
139+
res.err().unwrap().code(),
140+
ErrorCode::UnknownTable("").code()
141+
);
142+
}
143+
144+
// BadArguments
145+
{
146+
let query = "call system$fuse_block(system, tables)";
147+
let (plan, _, _) = planner.plan_sql(query).await?;
148+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
149+
assert_eq!(executor.name(), "CallInterpreter");
150+
let res = executor.execute().await;
151+
assert_eq!(res.is_err(), true);
152+
let expect =
153+
"Code: 1015, displayText = expects table of engine FUSE, but got SystemTables.";
154+
assert_eq!(expect, res.err().unwrap().to_string());
155+
}
156+
157+
// Create table
158+
{
159+
let query = "\
160+
CREATE TABLE default.a(a bigint)\
161+
";
162+
163+
let (plan, _, _) = planner.plan_sql(query).await?;
164+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
165+
let _ = executor.execute().await?;
166+
}
167+
168+
// fuse_block
169+
{
170+
let query = "call system$fuse_block(default, a)";
171+
let (plan, _, _) = planner.plan_sql(query).await?;
172+
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
173+
let _ = executor.execute().await?;
174+
}
175+
176+
Ok(())
177+
}
178+
113179
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
114180
async fn test_call_clustering_information_interpreter() -> Result<()> {
115181
let (_guard, ctx) = crate::tests::create_query_context().await?;
@@ -188,7 +254,7 @@ async fn test_call_clustering_information_interpreter() -> Result<()> {
188254
let _ = executor.execute().await?;
189255
}
190256

191-
// FuseHistory
257+
// clustering_information
192258
{
193259
let query = "call system$clustering_information(default, b)";
194260
let (plan, _, _) = planner.plan_sql(query).await?;

src/query/service/tests/it/storages/fuse/table_functions/clustering_information_table.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use databend_query::interpreters::CreateTableInterpreterV2;
2222
use databend_query::interpreters::Interpreter;
2323
use tokio_stream::StreamExt;
2424

25-
use crate::storages::fuse::table_test_fixture::TestFixture;
2625
use crate::storages::fuse::table_test_fixture::*;
2726

2827
#[tokio::test]
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_base::base::tokio;
16+
use common_datablocks::DataBlock;
17+
use common_exception::ErrorCode;
18+
use common_exception::Result;
19+
use databend_query::interpreters::CreateTableInterpreterV2;
20+
use databend_query::interpreters::Interpreter;
21+
use tokio_stream::StreamExt;
22+
23+
use crate::storages::fuse::table_test_fixture::*;
24+
25+
#[tokio::test]
26+
async fn test_fuse_block_table() -> Result<()> {
27+
let fixture = TestFixture::new().await;
28+
let db = fixture.default_db_name();
29+
let tbl = fixture.default_table_name();
30+
let ctx = fixture.ctx();
31+
32+
// test db & table
33+
let create_table_plan = fixture.default_crate_table_plan();
34+
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
35+
interpreter.execute().await?;
36+
37+
{
38+
let expected = vec![
39+
"+-------+",
40+
"| count |",
41+
"+-------+",
42+
"| 0 |",
43+
"+-------+",
44+
];
45+
let qry = format!(
46+
"select count(1) as count from fuse_block('{}', '{}')",
47+
db, tbl
48+
);
49+
50+
expects_ok(
51+
"count_should_be_0",
52+
execute_query(ctx.clone(), qry.as_str()).await,
53+
expected,
54+
)
55+
.await?;
56+
}
57+
58+
{
59+
let qry = format!("insert into {}.{} values(1, (2, 3)),(2, (4, 6))", db, tbl);
60+
execute_query(ctx.clone(), qry.as_str()).await?;
61+
let qry = format!("insert into {}.{} values(7, (8, 9))", db, tbl);
62+
execute_query(ctx.clone(), qry.as_str()).await?;
63+
let expected = vec![
64+
"+-------+",
65+
"| count |",
66+
"+-------+",
67+
"| 2 |",
68+
"+-------+",
69+
];
70+
71+
let qry = format!(
72+
"select count(1) as count from fuse_block('{}', '{}')",
73+
db, tbl
74+
);
75+
76+
expects_ok(
77+
"count_should_be_2",
78+
execute_query(ctx.clone(), qry.as_str()).await,
79+
expected,
80+
)
81+
.await?;
82+
}
83+
84+
{
85+
// incompatible table engine
86+
let qry = format!("create table {}.in_mem (a int) engine =Memory", db);
87+
execute_query(ctx.clone(), qry.as_str()).await?;
88+
89+
let qry = format!("select * from fuse_block('{}', '{}')", db, "in_mem");
90+
let output_stream = execute_query(ctx.clone(), qry.as_str()).await?;
91+
expects_err(
92+
"unsupported_table_engine",
93+
ErrorCode::logical_error_code(),
94+
output_stream.collect::<Result<Vec<DataBlock>>>().await,
95+
);
96+
}
97+
98+
Ok(())
99+
}

src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use databend_query::interpreters::CreateTableInterpreterV2;
2222
use databend_query::interpreters::Interpreter;
2323
use tokio_stream::StreamExt;
2424

25-
use crate::storages::fuse::table_test_fixture::TestFixture;
2625
use crate::storages::fuse::table_test_fixture::*;
2726

2827
#[tokio::test]
@@ -182,7 +181,7 @@ async fn test_fuse_snapshot_table_read() -> Result<()> {
182181
let qry = format!("select * from fuse_snapshot('{}', '{}')", db, "in_mem");
183182
let output_stream = execute_query(ctx.clone(), qry.as_str()).await?;
184183
expects_err(
185-
"check_row_and_block_count_after_append",
184+
"unsupported_table_engine",
186185
ErrorCode::logical_error_code(),
187186
output_stream.collect::<Result<Vec<DataBlock>>>().await,
188187
);

src/query/service/tests/it/storages/fuse/table_functions/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
// limitations under the License.
1414

1515
mod clustering_information_table;
16+
mod fuse_block_table;
1617
mod fuse_snapshot_table;

0 commit comments

Comments
 (0)