Skip to content

Commit a8e526e

Browse files
committed
fix review comment
1 parent de7bcdf commit a8e526e

File tree

11 files changed

+216
-66
lines changed

11 files changed

+216
-66
lines changed

docs/doc/30-reference/20-functions/111-system-functions/fuse_block.md

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Returns the block information of a snapshot of table.
77
## Syntax
88

99
```sql
10-
FUSE_BLOCK('<database_name>', '<table_name>','<snapshot_id>')
10+
FUSE_BLOCK('<database_name>', '<table_name>'[, '<snapshot_id>'])
1111
```
1212

1313
## Examples
@@ -17,17 +17,7 @@ CREATE TABLE mytable(c int);
1717
INSERT INTO mytable values(1);
1818
INSERT INTO mytable values(2);
1919

20-
-- Obtain a snapshot ID
21-
SELECT snapshot_id FROM FUSE_SNAPSHOT('default', 'mytable') limit 1;
22-
23-
---
24-
+----------------------------------+
25-
| snapshot_id |
26-
+----------------------------------+
27-
| 51e84b56458f44269b05a059b364a659 |
28-
+----------------------------------+
29-
30-
SELECT * FROM FUSE_BLOCK('default', 'mytable', '51e84b56458f44269b05a059b364a659');
20+
SELECT * FROM FUSE_BLOCK('default', 'mytable');
3121

3222
---
3323
+----------------------------------+----------------------------+----------------------------------------------------+------------+----------------------------------------------------+-------------------+

src/query/service/src/procedures/systems/fuse_block.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,17 @@ impl OneBlockProcedure for FuseBlockProcedure {
4141
}
4242

4343
fn features(&self) -> ProcedureFeatures {
44-
ProcedureFeatures::default().num_arguments(3)
44+
ProcedureFeatures::default().variadic_arguments(2, 3)
4545
}
4646

4747
async fn all_data(&self, ctx: Arc<QueryContext>, args: Vec<String>) -> Result<DataBlock> {
4848
let database_name = args[0].clone();
4949
let table_name = args[1].clone();
50-
let snapshot_id = args[2].clone();
50+
let snapshot_id = if args.len() > 2 {
51+
Some(args[2].clone())
52+
} else {
53+
None
54+
};
5155
let tenant_id = ctx.get_tenant();
5256
let tbl = ctx
5357
.get_catalog(&ctx.get_current_catalog())?

src/query/service/tests/it/interpreters/interpreter_call.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,13 @@ async fn test_call_fuse_block_interpreter() -> Result<()> {
123123
assert_eq!(executor.name(), "CallInterpreter");
124124
let res = executor.execute().await;
125125
assert_eq!(res.is_err(), true);
126-
let expect = "Code: 1028, displayText = Function `FUSE_BLOCK` expect to have 3 arguments, but got 0.";
126+
let expect = "Code: 1028, displayText = Function `FUSE_BLOCK` expect to have [2, 3] arguments, but got 0.";
127127
assert_eq!(expect, res.err().unwrap().to_string());
128128
}
129129

130130
// UnknownTable
131131
{
132-
let query = "call system$fuse_block(default, test, xxxx)";
132+
let query = "call system$fuse_block(default, test)";
133133
let (plan, _, _) = planner.plan_sql(query).await?;
134134
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
135135
assert_eq!(executor.name(), "CallInterpreter");
@@ -143,7 +143,7 @@ async fn test_call_fuse_block_interpreter() -> Result<()> {
143143

144144
// BadArguments
145145
{
146-
let query = "call system$fuse_block(system, tables, xxxx)";
146+
let query = "call system$fuse_block(system, tables)";
147147
let (plan, _, _) = planner.plan_sql(query).await?;
148148
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
149149
assert_eq!(executor.name(), "CallInterpreter");
@@ -167,7 +167,7 @@ async fn test_call_fuse_block_interpreter() -> Result<()> {
167167

168168
// fuse_block
169169
{
170-
let query = "call system$fuse_block(default, a, xxxx)";
170+
let query = "call system$fuse_block(default, a)";
171171
let (plan, _, _) = planner.plan_sql(query).await?;
172172
let executor = InterpreterFactoryV2::get(ctx.clone(), &plan)?;
173173
let _ = executor.execute().await?;

src/query/service/tests/it/storages/fuse/table_functions/clustering_information_table.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use databend_query::interpreters::CreateTableInterpreterV2;
2222
use databend_query::interpreters::Interpreter;
2323
use tokio_stream::StreamExt;
2424

25-
use crate::storages::fuse::table_test_fixture::TestFixture;
2625
use crate::storages::fuse::table_test_fixture::*;
2726

2827
#[tokio::test]
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_base::base::tokio;
16+
use common_datablocks::DataBlock;
17+
use common_exception::ErrorCode;
18+
use common_exception::Result;
19+
use databend_query::interpreters::CreateTableInterpreterV2;
20+
use databend_query::interpreters::Interpreter;
21+
use tokio_stream::StreamExt;
22+
23+
use crate::storages::fuse::table_test_fixture::*;
24+
25+
#[tokio::test]
26+
async fn test_fuse_block_table() -> Result<()> {
27+
let fixture = TestFixture::new().await;
28+
let db = fixture.default_db_name();
29+
let tbl = fixture.default_table_name();
30+
let ctx = fixture.ctx();
31+
32+
// test db & table
33+
let create_table_plan = fixture.default_crate_table_plan();
34+
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
35+
interpreter.execute().await?;
36+
37+
{
38+
let expected = vec![
39+
"+-------+",
40+
"| count |",
41+
"+-------+",
42+
"| 0 |",
43+
"+-------+",
44+
];
45+
let qry = format!(
46+
"select count(1) as count from fuse_block('{}', '{}')",
47+
db, tbl
48+
);
49+
50+
expects_ok(
51+
"count_should_be_0",
52+
execute_query(ctx.clone(), qry.as_str()).await,
53+
expected,
54+
)
55+
.await?;
56+
}
57+
58+
{
59+
let qry = format!("insert into {}.{} values(1, (2, 3)),(2, (4, 6))", db, tbl);
60+
execute_query(ctx.clone(), qry.as_str()).await?;
61+
let qry = format!("insert into {}.{} values(7, (8, 9))", db, tbl);
62+
execute_query(ctx.clone(), qry.as_str()).await?;
63+
let expected = vec![
64+
"+-------+",
65+
"| count |",
66+
"+-------+",
67+
"| 2 |",
68+
"+-------+",
69+
];
70+
71+
let qry = format!(
72+
"select count(1) as count from fuse_block('{}', '{}')",
73+
db, tbl
74+
);
75+
76+
expects_ok(
77+
"count_should_be_2",
78+
execute_query(ctx.clone(), qry.as_str()).await,
79+
expected,
80+
)
81+
.await?;
82+
}
83+
84+
{
85+
// incompatible table engine
86+
let qry = format!("create table {}.in_mem (a int) engine =Memory", db);
87+
execute_query(ctx.clone(), qry.as_str()).await?;
88+
89+
let qry = format!("select * from fuse_block('{}', '{}')", db, "in_mem");
90+
let output_stream = execute_query(ctx.clone(), qry.as_str()).await?;
91+
expects_err(
92+
"unsupported_table_engine",
93+
ErrorCode::logical_error_code(),
94+
output_stream.collect::<Result<Vec<DataBlock>>>().await,
95+
);
96+
}
97+
98+
Ok(())
99+
}

src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use databend_query::interpreters::CreateTableInterpreterV2;
2222
use databend_query::interpreters::Interpreter;
2323
use tokio_stream::StreamExt;
2424

25-
use crate::storages::fuse::table_test_fixture::TestFixture;
2625
use crate::storages::fuse::table_test_fixture::*;
2726

2827
#[tokio::test]
@@ -182,7 +181,7 @@ async fn test_fuse_snapshot_table_read() -> Result<()> {
182181
let qry = format!("select * from fuse_snapshot('{}', '{}')", db, "in_mem");
183182
let output_stream = execute_query(ctx.clone(), qry.as_str()).await?;
184183
expects_err(
185-
"check_row_and_block_count_after_append",
184+
"unsupported_table_engine",
186185
ErrorCode::logical_error_code(),
187186
output_stream.collect::<Result<Vec<DataBlock>>>().await,
188187
);

src/query/service/tests/it/storages/fuse/table_functions/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
// limitations under the License.
1414

1515
mod clustering_information_table;
16+
mod fuse_block_table;
1617
mod fuse_snapshot_table;

src/query/storages/fuse/src/table_functions/fuse_blocks/fuse_block.rs

Lines changed: 47 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717
use common_datablocks::DataBlock;
1818
use common_datavalues::prelude::*;
1919
use common_exception::Result;
20+
use common_fuse_meta::meta::TableSnapshot;
2021
use futures_util::TryStreamExt;
2122

2223
use crate::io::MetaReaders;
@@ -27,11 +28,15 @@ use crate::FuseTable;
2728
pub struct FuseBlock<'a> {
2829
pub ctx: Arc<dyn TableContext>,
2930
pub table: &'a FuseTable,
30-
pub snapshot_id: String,
31+
pub snapshot_id: Option<String>,
3132
}
3233

3334
impl<'a> FuseBlock<'a> {
34-
pub fn new(ctx: Arc<dyn TableContext>, table: &'a FuseTable, snapshot_id: String) -> Self {
35+
pub fn new(
36+
ctx: Arc<dyn TableContext>,
37+
table: &'a FuseTable,
38+
snapshot_id: Option<String>,
39+
) -> Self {
3540
Self {
3641
ctx,
3742
table,
@@ -43,6 +48,10 @@ impl<'a> FuseBlock<'a> {
4348
let tbl = self.table;
4449
let maybe_snapshot = tbl.read_table_snapshot(self.ctx.clone()).await?;
4550
if let Some(snapshot) = maybe_snapshot {
51+
if self.snapshot_id.is_none() {
52+
return self.to_block(snapshot).await;
53+
}
54+
4655
// prepare the stream of snapshot
4756
let snapshot_version = tbl.snapshot_format_version();
4857
let snapshot_location = tbl
@@ -57,46 +66,49 @@ impl<'a> FuseBlock<'a> {
5766

5867
// find the element by snapshot_id in stream
5968
while let Some(snapshot) = snapshot_stream.try_next().await? {
60-
if snapshot.snapshot_id.simple().to_string() == self.snapshot_id {
61-
let len = snapshot.summary.block_count as usize;
62-
let snapshot_id = vec![self.snapshot_id.clone().into_bytes()];
63-
let timestamp =
64-
vec![snapshot.timestamp.map(|dt| (dt.timestamp_micros()) as i64)];
65-
let mut block_location: Vec<Vec<u8>> = Vec::with_capacity(len);
66-
let mut block_size: Vec<u64> = Vec::with_capacity(len);
67-
let mut bloom_filter_location: Vec<Option<Vec<u8>>> = Vec::with_capacity(len);
68-
let mut bloom_filter_size: Vec<u64> = Vec::with_capacity(len);
69-
70-
let reader = MetaReaders::segment_info_reader(self.ctx.as_ref());
71-
for (x, ver) in &snapshot.segments {
72-
let segment = reader.read(x, None, *ver).await?;
73-
segment.blocks.clone().into_iter().for_each(|block| {
74-
block_location.push(block.location.0.into_bytes());
75-
block_size.push(block.block_size);
76-
bloom_filter_location.push(
77-
block
78-
.bloom_filter_index_location
79-
.map(|(s, _)| s.into_bytes()),
80-
);
81-
bloom_filter_size.push(block.bloom_filter_index_size);
82-
});
83-
}
84-
85-
return Ok(DataBlock::create(FuseBlock::schema(), vec![
86-
Arc::new(ConstColumn::new(Series::from_data(snapshot_id), len)),
87-
Arc::new(ConstColumn::new(Series::from_data(timestamp), len)),
88-
Series::from_data(block_location),
89-
Series::from_data(block_size),
90-
Series::from_data(bloom_filter_location),
91-
Series::from_data(bloom_filter_size),
92-
]));
69+
if snapshot.snapshot_id.simple().to_string() == self.snapshot_id.clone().unwrap() {
70+
return self.to_block(snapshot).await;
9371
}
9472
}
9573
}
9674

9775
Ok(DataBlock::empty_with_schema(Self::schema()))
9876
}
9977

78+
async fn to_block(&self, snapshot: Arc<TableSnapshot>) -> Result<DataBlock> {
79+
let len = snapshot.summary.block_count as usize;
80+
let snapshot_id = vec![snapshot.snapshot_id.simple().to_string().into_bytes()];
81+
let timestamp = vec![snapshot.timestamp.map(|dt| (dt.timestamp_micros()) as i64)];
82+
let mut block_location: Vec<Vec<u8>> = Vec::with_capacity(len);
83+
let mut block_size: Vec<u64> = Vec::with_capacity(len);
84+
let mut bloom_filter_location: Vec<Option<Vec<u8>>> = Vec::with_capacity(len);
85+
let mut bloom_filter_size: Vec<u64> = Vec::with_capacity(len);
86+
87+
let reader = MetaReaders::segment_info_reader(self.ctx.as_ref());
88+
for (x, ver) in &snapshot.segments {
89+
let segment = reader.read(x, None, *ver).await?;
90+
segment.blocks.clone().into_iter().for_each(|block| {
91+
block_location.push(block.location.0.into_bytes());
92+
block_size.push(block.block_size);
93+
bloom_filter_location.push(
94+
block
95+
.bloom_filter_index_location
96+
.map(|(s, _)| s.into_bytes()),
97+
);
98+
bloom_filter_size.push(block.bloom_filter_index_size);
99+
});
100+
}
101+
102+
Ok(DataBlock::create(FuseBlock::schema(), vec![
103+
Arc::new(ConstColumn::new(Series::from_data(snapshot_id), len)),
104+
Arc::new(ConstColumn::new(Series::from_data(timestamp), len)),
105+
Series::from_data(block_location),
106+
Series::from_data(block_size),
107+
Series::from_data(bloom_filter_location),
108+
Series::from_data(bloom_filter_size),
109+
]))
110+
}
111+
100112
pub fn schema() -> Arc<DataSchema> {
101113
DataSchemaRefExt::create(vec![
102114
DataField::new("snapshot_id", Vu8::to_data_type()),

0 commit comments

Comments
 (0)