Skip to content

Commit c8b06a5

Browse files
authored
Merge branch 'main' into sieve
2 parents 09a10e6 + aa7b8fd commit c8b06a5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+613
-405
lines changed

src/query/ee/src/test_kits/context.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ use common_exception::Result;
1717
use common_license::license::LicenseInfo;
1818
use common_meta_app::storage::StorageFsConfig;
1919
use common_meta_app::storage::StorageParams;
20-
use databend_query::test_kits::ConfigBuilder;
21-
use databend_query::test_kits::Setup;
20+
use databend_query::test_kits::*;
2221
use jwt_simple::algorithms::ECDSAP256KeyPairLike;
2322
use jwt_simple::prelude::Claims;
2423
use jwt_simple::prelude::Duration;

src/query/ee/tests/it/aggregating_index/index_refresh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use common_sql::plans::Plan;
3434
use common_sql::Planner;
3535
use databend_query::interpreters::InterpreterFactory;
3636
use databend_query::sessions::QueryContext;
37-
use databend_query::test_kits::TestFixture;
37+
use databend_query::test_kits::*;
3838
use enterprise_query::test_kits::context::EESetup;
3939
use futures_util::TryStreamExt;
4040

src/query/ee/tests/it/aggregating_index/index_scan.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ use common_sql::plans::RelOperator;
2828
use common_sql::Planner;
2929
use databend_query::interpreters::InterpreterFactory;
3030
use databend_query::sessions::QueryContext;
31-
use databend_query::test_kits::fixture::expects_ok;
32-
use databend_query::test_kits::TestFixture;
31+
use databend_query::test_kits::*;
3332
use enterprise_query::test_kits::context::EESetup;
3433
use futures_util::TryStreamExt;
3534

src/query/ee/tests/it/license/license_mgr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use common_base::base::tokio;
1616
use common_license::license::Feature;
1717
use common_license::license::LicenseInfo;
1818
use common_license::license_manager::LicenseManager;
19-
use databend_query::test_kits::TestFixture;
19+
use databend_query::test_kits::*;
2020
use enterprise_query::license::RealLicenseManager;
2121
use jwt_simple::algorithms::ES256KeyPair;
2222
use jwt_simple::claims::Claims;

src/query/ee/tests/it/storages/fuse/operations/computed_columns.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414

1515
use common_base::base::tokio;
1616
use common_exception::Result;
17-
use databend_query::test_kits::fixture::expects_ok;
18-
use databend_query::test_kits::fixture::TestFixture;
17+
use databend_query::test_kits::*;
1918
use enterprise_query::test_kits::context::EESetup;
2019
use futures::TryStreamExt;
2120

src/query/ee/tests/it/storages/fuse/operations/vacuum.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414

1515
use common_base::base::tokio;
1616
use common_exception::Result;
17-
use databend_query::test_kits::fixture::append_sample_data;
18-
use databend_query::test_kits::fixture::check_data_dir;
19-
use databend_query::test_kits::fixture::TestFixture;
17+
use databend_query::test_kits::*;
2018
use enterprise_query::storages::fuse::do_vacuum_drop_tables;
2119

2220
#[tokio::test(flavor = "multi_thread")]

src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ use common_exception::Result;
1717
use common_storages_fuse::io::MetaReaders;
1818
use common_storages_fuse::io::TableMetaLocationGenerator;
1919
use common_storages_fuse::FuseTable;
20-
use databend_query::test_kits::fixture::append_variant_sample_data;
21-
use databend_query::test_kits::fixture::TestFixture;
20+
use databend_query::test_kits::*;
2221
use enterprise_query::storages::fuse::operations::virtual_columns::do_refresh_virtual_column;
2322
use storages_common_cache::LoadParams;
2423

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::str;
16+
17+
use common_config::GlobalConfig;
18+
use common_exception::Result;
19+
use common_expression::block_debug::assert_blocks_sorted_eq_with_name;
20+
use common_expression::DataBlock;
21+
use common_expression::SendableDataBlockStream;
22+
use common_meta_app::storage::StorageParams;
23+
use common_storages_fuse::FuseTable;
24+
use common_storages_fuse::FUSE_TBL_BLOCK_PREFIX;
25+
use common_storages_fuse::FUSE_TBL_LAST_SNAPSHOT_HINT;
26+
use common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX;
27+
use common_storages_fuse::FUSE_TBL_SNAPSHOT_PREFIX;
28+
use common_storages_fuse::FUSE_TBL_SNAPSHOT_STATISTICS_PREFIX;
29+
use common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
30+
use futures::TryStreamExt;
31+
use walkdir::WalkDir;
32+
33+
use crate::sessions::TableContext;
34+
use crate::test_kits::TestFixture;
35+
36+
pub fn expects_err<T>(case_name: &str, err_code: u16, res: Result<T>) {
37+
if let Err(err) = res {
38+
assert_eq!(
39+
err.code(),
40+
err_code,
41+
"case name {}, unexpected error: {}",
42+
case_name,
43+
err
44+
);
45+
} else {
46+
panic!(
47+
"case name {}, expecting err code {}, but got ok",
48+
case_name, err_code,
49+
);
50+
}
51+
}
52+
53+
pub async fn expects_ok(
54+
case_name: impl AsRef<str>,
55+
res: Result<SendableDataBlockStream>,
56+
expected: Vec<&str>,
57+
) -> Result<()> {
58+
match res {
59+
Ok(stream) => {
60+
let blocks: Vec<DataBlock> = stream.try_collect().await?;
61+
assert_blocks_sorted_eq_with_name(case_name.as_ref(), expected, &blocks)
62+
}
63+
Err(err) => {
64+
panic!(
65+
"case name {}, expecting Ok, but got err {}",
66+
case_name.as_ref(),
67+
err,
68+
)
69+
}
70+
};
71+
Ok(())
72+
}
73+
74+
pub async fn check_data_dir(
75+
fixture: &TestFixture,
76+
case_name: &str,
77+
snapshot_count: u32,
78+
table_statistic_count: u32,
79+
segment_count: u32,
80+
block_count: u32,
81+
index_count: u32,
82+
check_last_snapshot: Option<()>,
83+
check_table_statistic_file: Option<()>,
84+
) -> Result<()> {
85+
let data_path = match &GlobalConfig::instance().storage.params {
86+
StorageParams::Fs(v) => v.root.clone(),
87+
_ => panic!("storage type is not fs"),
88+
};
89+
let root = data_path.as_str();
90+
let mut ss_count = 0;
91+
let mut ts_count = 0;
92+
let mut sg_count = 0;
93+
let mut b_count = 0;
94+
let mut i_count = 0;
95+
let mut last_snapshot_loc = "".to_string();
96+
let mut table_statistic_files = vec![];
97+
let prefix_snapshot = FUSE_TBL_SNAPSHOT_PREFIX;
98+
let prefix_snapshot_statistics = FUSE_TBL_SNAPSHOT_STATISTICS_PREFIX;
99+
let prefix_segment = FUSE_TBL_SEGMENT_PREFIX;
100+
let prefix_block = FUSE_TBL_BLOCK_PREFIX;
101+
let prefix_index = FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
102+
let prefix_last_snapshot_hint = FUSE_TBL_LAST_SNAPSHOT_HINT;
103+
for entry in WalkDir::new(root) {
104+
let entry = entry.unwrap();
105+
if entry.file_type().is_file() {
106+
let (_, entry_path) = entry.path().to_str().unwrap().split_at(root.len());
107+
// trim the leading prefix, e.g. "/db_id/table_id/"
108+
let path = entry_path.split('/').skip(3).collect::<Vec<_>>();
109+
let path = path[0];
110+
if path.starts_with(prefix_snapshot) {
111+
ss_count += 1;
112+
} else if path.starts_with(prefix_segment) {
113+
sg_count += 1;
114+
} else if path.starts_with(prefix_block) {
115+
b_count += 1;
116+
} else if path.starts_with(prefix_index) {
117+
i_count += 1;
118+
} else if path.starts_with(prefix_snapshot_statistics) {
119+
ts_count += 1;
120+
table_statistic_files.push(entry_path.to_string());
121+
} else if path.starts_with(prefix_last_snapshot_hint) && check_last_snapshot.is_some() {
122+
let content = fixture
123+
.default_ctx
124+
.get_data_operator()?
125+
.operator()
126+
.read(entry_path)
127+
.await?;
128+
last_snapshot_loc = str::from_utf8(&content)?.to_string();
129+
}
130+
}
131+
}
132+
133+
assert_eq!(
134+
ss_count, snapshot_count,
135+
"case [{}], check snapshot count",
136+
case_name
137+
);
138+
assert_eq!(
139+
ts_count, table_statistic_count,
140+
"case [{}], check snapshot statistics count",
141+
case_name
142+
);
143+
assert_eq!(
144+
sg_count, segment_count,
145+
"case [{}], check segment count",
146+
case_name
147+
);
148+
149+
assert_eq!(
150+
b_count, block_count,
151+
"case [{}], check block count",
152+
case_name
153+
);
154+
155+
assert_eq!(
156+
i_count, index_count,
157+
"case [{}], check index count",
158+
case_name
159+
);
160+
161+
if check_last_snapshot.is_some() {
162+
let table = fixture.latest_default_table().await?;
163+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
164+
let snapshot_loc = fuse_table.snapshot_loc().await?;
165+
let snapshot_loc = snapshot_loc.unwrap();
166+
assert!(last_snapshot_loc.contains(&snapshot_loc));
167+
assert_eq!(
168+
last_snapshot_loc.find(&snapshot_loc),
169+
Some(last_snapshot_loc.len() - snapshot_loc.len())
170+
);
171+
}
172+
173+
if check_table_statistic_file.is_some() {
174+
let table = fixture.latest_default_table().await?;
175+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
176+
let snapshot_opt = fuse_table.read_table_snapshot().await?;
177+
assert!(snapshot_opt.is_some());
178+
let snapshot = snapshot_opt.unwrap();
179+
let ts_location_opt = snapshot.table_statistics_location.clone();
180+
assert!(ts_location_opt.is_some());
181+
let ts_location = ts_location_opt.unwrap();
182+
println!(
183+
"ts_location_opt: {:?}, table_statistic_files: {:?}",
184+
ts_location, table_statistic_files
185+
);
186+
assert!(
187+
table_statistic_files
188+
.iter()
189+
.any(|e| e.contains(&ts_location))
190+
);
191+
}
192+
193+
Ok(())
194+
}
195+
196+
pub async fn history_should_have_item(
197+
fixture: &TestFixture,
198+
case_name: &str,
199+
item_cnt: u32,
200+
) -> Result<()> {
201+
// check history
202+
let db = fixture.default_db_name();
203+
let tbl = fixture.default_table_name();
204+
let count_str = format!("| {} |", item_cnt);
205+
let expected = vec![
206+
"+----------+",
207+
"| Column 0 |",
208+
"+----------+",
209+
count_str.as_str(),
210+
"+----------+",
211+
];
212+
213+
let qry = format!(
214+
"select count(*) as count from fuse_snapshot('{}', '{}')",
215+
db, tbl
216+
);
217+
218+
expects_ok(
219+
format!("{}: count_of_history_item_should_be_1", case_name),
220+
fixture.execute_query(qry.as_str()).await,
221+
expected,
222+
)
223+
.await
224+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_config::DATABEND_COMMIT_VERSION;
18+
use common_meta_types::NodeInfo;
19+
20+
/// A descriptor for a cluster.
21+
#[derive(Clone)]
22+
pub struct ClusterDescriptor {
23+
pub(crate) local_node_id: String,
24+
pub(crate) cluster_nodes_list: Vec<Arc<NodeInfo>>,
25+
}
26+
27+
impl ClusterDescriptor {
28+
pub fn new() -> ClusterDescriptor {
29+
ClusterDescriptor {
30+
local_node_id: String::from(""),
31+
cluster_nodes_list: vec![],
32+
}
33+
}
34+
35+
pub fn with_node(self, id: impl Into<String>, addr: impl Into<String>) -> ClusterDescriptor {
36+
let mut new_nodes = self.cluster_nodes_list.clone();
37+
new_nodes.push(Arc::new(NodeInfo::create(
38+
id.into(),
39+
0,
40+
addr.into(),
41+
DATABEND_COMMIT_VERSION.to_string(),
42+
)));
43+
ClusterDescriptor {
44+
cluster_nodes_list: new_nodes,
45+
local_node_id: self.local_node_id,
46+
}
47+
}
48+
49+
pub fn with_local_id(self, id: impl Into<String>) -> ClusterDescriptor {
50+
ClusterDescriptor {
51+
local_node_id: id.into(),
52+
cluster_nodes_list: self.cluster_nodes_list,
53+
}
54+
}
55+
}
56+
57+
impl Default for ClusterDescriptor {
58+
fn default() -> Self {
59+
Self::new()
60+
}
61+
}

0 commit comments

Comments
 (0)