Skip to content

Commit e0f2485

Browse files
authored
Merge pull request #9108 from dantengsky/fix-orphan-snapshots
fix: collect orphan snapshots
2 parents dd0d2ef + 88700eb commit e0f2485

File tree

22 files changed

+858
-381
lines changed

22 files changed

+858
-381
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ pub trait Table: Sync + Send {
194194
Ok(())
195195
}
196196

197-
async fn optimize(&self, ctx: Arc<dyn TableContext>, keep_last_snapshot: bool) -> Result<()> {
197+
async fn purge(&self, ctx: Arc<dyn TableContext>, keep_last_snapshot: bool) -> Result<()> {
198198
let (_, _) = (ctx, keep_last_snapshot);
199199

200200
Ok(())

src/query/service/src/interpreters/interpreter_table_optimize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl Interpreter for OptimizeTableInterpreter {
113113
}
114114

115115
if do_purge {
116-
table.optimize(self.ctx.clone(), true).await?;
116+
table.purge(self.ctx.clone(), true).await?;
117117
}
118118

119119
Ok(PipelineBuildResult::create())
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
16+
use common_ast::ast::Engine;
17+
use common_base::base::tokio;
18+
use common_sql::plans::AlterTableClusterKeyPlan;
19+
use common_sql::plans::CreateTablePlanV2;
20+
use common_sql::plans::DropTableClusterKeyPlan;
21+
use common_storages_fuse::io::MetaReaders;
22+
use common_storages_fuse::FuseTable;
23+
use common_storages_table_meta::table::OPT_KEY_DATABASE_ID;
24+
use common_storages_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
25+
use databend_query::interpreters::AlterTableClusterKeyInterpreter;
26+
use databend_query::interpreters::CreateTableInterpreterV2;
27+
use databend_query::interpreters::DropTableClusterKeyInterpreter;
28+
use databend_query::interpreters::Interpreter;
29+
30+
use crate::storages::fuse::table_test_fixture::TestFixture;
31+
32+
#[tokio::test]
33+
async fn test_fuse_alter_table_cluster_key() -> common_exception::Result<()> {
34+
let fixture = TestFixture::new().await;
35+
let ctx = fixture.ctx();
36+
37+
let create_table_plan = CreateTablePlanV2 {
38+
if_not_exists: false,
39+
tenant: fixture.default_tenant(),
40+
catalog: fixture.default_catalog_name(),
41+
database: fixture.default_db_name(),
42+
table: fixture.default_table_name(),
43+
schema: TestFixture::default_schema(),
44+
engine: Engine::Fuse,
45+
storage_params: None,
46+
options: [
47+
// database id is required for FUSE
48+
(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()),
49+
]
50+
.into(),
51+
field_default_exprs: vec![],
52+
field_comments: vec![],
53+
as_select: None,
54+
cluster_key: None,
55+
};
56+
57+
// create test table
58+
let interpreter = CreateTableInterpreterV2::try_create(ctx.clone(), create_table_plan)?;
59+
interpreter.execute(ctx.clone()).await?;
60+
61+
// add cluster key
62+
let alter_table_cluster_key_plan = AlterTableClusterKeyPlan {
63+
tenant: fixture.default_tenant(),
64+
catalog: fixture.default_catalog_name(),
65+
database: fixture.default_db_name(),
66+
table: fixture.default_table_name(),
67+
cluster_keys: vec!["id".to_string()],
68+
};
69+
let interpreter =
70+
AlterTableClusterKeyInterpreter::try_create(ctx.clone(), alter_table_cluster_key_plan)?;
71+
interpreter.execute(ctx.clone()).await?;
72+
73+
let table = fixture.latest_default_table().await?;
74+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
75+
let table_info = table.get_table_info();
76+
assert_eq!(table_info.meta.cluster_keys, vec!["(id)".to_string()]);
77+
assert_eq!(table_info.meta.default_cluster_key_id, Some(0));
78+
79+
let snapshot_loc = table
80+
.get_table_info()
81+
.options()
82+
.get(OPT_KEY_SNAPSHOT_LOCATION)
83+
.unwrap();
84+
let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator());
85+
let snapshot = reader.read(snapshot_loc.as_str(), None, 1).await?;
86+
let expected = Some((0, "(id)".to_string()));
87+
assert_eq!(snapshot.cluster_key_meta, expected);
88+
89+
// drop cluster key
90+
let drop_table_cluster_key_plan = DropTableClusterKeyPlan {
91+
tenant: fixture.default_tenant(),
92+
catalog: fixture.default_catalog_name(),
93+
database: fixture.default_db_name(),
94+
table: fixture.default_table_name(),
95+
};
96+
let interpreter =
97+
DropTableClusterKeyInterpreter::try_create(ctx.clone(), drop_table_cluster_key_plan)?;
98+
interpreter.execute(ctx.clone()).await?;
99+
100+
let table = fixture.latest_default_table().await?;
101+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
102+
let table_info = table.get_table_info();
103+
assert_eq!(table_info.meta.default_cluster_key, None);
104+
assert_eq!(table_info.meta.default_cluster_key_id, None);
105+
106+
let snapshot_loc = table
107+
.get_table_info()
108+
.options()
109+
.get(OPT_KEY_SNAPSHOT_LOCATION)
110+
.unwrap();
111+
let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator());
112+
let snapshot = reader.read(snapshot_loc.as_str(), None, 1).await?;
113+
let expected = None;
114+
assert_eq!(snapshot.cluster_key_meta, expected);
115+
116+
Ok(())
117+
}

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,6 @@ async fn test_abort_on_error() -> Result<()> {
232232
)) {
233233
let entry = entry.unwrap();
234234
if entry.file_type().is_file() {
235-
eprintln!("{:?}", entry);
236235
ss_count += 1;
237236
}
238237
}

0 commit comments

Comments
 (0)