Skip to content

Commit dbaf2e5

Browse files
authored
Merge pull request #7547 from Xuanwo/remove-meta-index
refactor: Always list from OpenDAL instead of meta
2 parents 70cef24 + ed55dce commit dbaf2e5

File tree

14 files changed

+17
-460
lines changed

14 files changed

+17
-460
lines changed

src/meta/types/src/user_stage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ impl UserStageInfo {
258258
}
259259
}
260260

261-
#[derive(Default, Clone)]
261+
#[derive(Default, Debug, Clone)]
262262
pub struct StageFile {
263263
pub path: String,
264264
pub size: u64,

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 9 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ use common_exception::ErrorCode;
2424
use common_exception::Result;
2525
use common_meta_types::GrantObject;
2626
use common_meta_types::StageFile;
27-
use common_meta_types::StageType;
2827
use common_meta_types::UserStageInfo;
2928
use futures::TryStreamExt;
3029
use regex::Regex;
30+
use tracing::debug;
3131
use tracing::warn;
3232

3333
use crate::pipelines::executor::ExecutorSettings;
@@ -138,10 +138,7 @@ pub async fn list_files(
138138
path: &str,
139139
pattern: &str,
140140
) -> Result<Vec<StageFile>> {
141-
match stage.stage_type {
142-
StageType::Internal => list_files_from_meta_api(ctx, stage, path, pattern).await,
143-
StageType::External => list_files_from_dal(ctx, stage, path, pattern).await,
144-
}
141+
list_files_from_dal(ctx, stage, path, pattern).await
145142
}
146143

147144
/// List files from DAL in recursive way.
@@ -155,8 +152,9 @@ pub async fn list_files_from_dal(
155152
path: &str,
156153
pattern: &str,
157154
) -> Result<Vec<StageFile>> {
158-
let rename_me_qry_ctx: Arc<dyn TableContext> = ctx.clone();
159-
let op = StageSourceHelper::get_op(&rename_me_qry_ctx, stage).await?;
155+
let table_ctx: Arc<dyn TableContext> = ctx.clone();
156+
let op = StageSourceHelper::get_op(&table_ctx, stage).await?;
157+
let prefix = stage.get_prefix();
160158
let mut files = Vec::new();
161159

162160
// - If the path itself is a dir, return directly.
@@ -165,7 +163,7 @@ pub async fn list_files_from_dal(
165163
let dir_path = match op.object(path).metadata().await {
166164
Ok(meta) if meta.mode().is_dir() => Some(path.to_string()),
167165
Ok(meta) if !meta.mode().is_dir() => {
168-
files.push((path.to_string(), meta));
166+
files.push((path.trim_start_matches(&prefix).to_string(), meta));
169167

170168
Some(format!("{path}/"))
171169
}
@@ -181,7 +179,7 @@ pub async fn list_files_from_dal(
181179
let mut ds = op.batch().walk_top_down(&dir)?;
182180
while let Some(de) = ds.try_next().await? {
183181
if de.mode().is_file() {
184-
let path = de.path().to_string();
182+
let path = de.path().trim_start_matches(&prefix[1..]).to_string();
185183
let meta = de.metadata().await?;
186184
files.push((path, meta));
187185
}
@@ -222,66 +220,7 @@ pub async fn list_files_from_dal(
222220
creator: None,
223221
})
224222
.collect::<Vec<StageFile>>();
225-
Ok(matched_files)
226-
}
227-
228-
pub async fn list_files_from_meta_api(
229-
ctx: &Arc<QueryContext>,
230-
stage: &UserStageInfo,
231-
path: &str,
232-
pattern: &str,
233-
) -> Result<Vec<StageFile>> {
234-
let tenant = ctx.get_tenant();
235-
let user_mgr = ctx.get_user_manager();
236-
let prefix = stage.get_prefix();
237-
238-
if stage.number_of_files == 0 {
239-
// try to sync files from dal
240-
if let Ok(files) = list_files_from_dal(ctx, stage, &prefix, "").await {
241-
for file in files.iter() {
242-
let mut file = file.clone();
243-
// In internal stage, files with `/stage/<stage_name>/` prefix will be ignored.
244-
// TODO: prefix of internal stage should be as root path.
245-
file.path = file
246-
.path
247-
.trim_start_matches(&prefix.trim_start_matches('/'))
248-
.to_string();
249-
let _ = user_mgr.add_file(&tenant, &stage.stage_name, file).await;
250-
}
251-
}
252-
}
253-
254-
let regex = if !pattern.is_empty() {
255-
Some(Regex::new(pattern).map_err(|e| {
256-
ErrorCode::SyntaxException(format!(
257-
"Pattern format invalid, got:{}, error:{:?}",
258-
pattern, e
259-
))
260-
})?)
261-
} else {
262-
None
263-
};
264223

265-
let files = user_mgr
266-
.list_files(&tenant, &stage.stage_name)
267-
.await?
268-
.iter()
269-
.filter(|file| {
270-
let name = format!("{}{}", prefix, file.path);
271-
if path.ends_with('/') {
272-
name.starts_with(path)
273-
} else {
274-
name.starts_with(&format!("{path}/")) || name == path
275-
}
276-
})
277-
.filter(|file| {
278-
if let Some(regex) = &regex {
279-
regex.is_match(&file.path)
280-
} else {
281-
true
282-
}
283-
})
284-
.cloned()
285-
.collect::<Vec<_>>();
286-
Ok(files)
224+
debug!("listed files: {:?}", matched_files);
225+
Ok(matched_files)
287226
}

src/query/service/src/interpreters/interpreter_user_stage_remove.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
use std::sync::Arc;
1616

1717
use common_exception::Result;
18-
use common_meta_types::StageType;
1918
use common_planners::RemoveUserStagePlan;
2019
use common_streams::DataBlockStream;
2120
use common_streams::SendableDataBlockStream;
22-
use tracing::debug;
2321

2422
use crate::interpreters::interpreter_common::list_files;
2523
use crate::interpreters::Interpreter;
@@ -48,23 +46,14 @@ impl Interpreter for RemoveUserStageInterpreter {
4846
#[tracing::instrument(level = "info", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
4947
async fn execute(&self) -> Result<SendableDataBlockStream> {
5048
let plan = self.plan.clone();
51-
let user_mgr = self.ctx.get_user_manager();
52-
let tenant = self.ctx.get_tenant();
49+
let prefix = plan.stage.get_prefix();
5350

5451
let files = list_files(&self.ctx, &plan.stage, &plan.path, &plan.pattern).await?;
55-
let files = files.iter().map(|f| f.path.clone()).collect::<Vec<_>>();
56-
let rename_me: Arc<dyn TableContext> = self.ctx.clone();
57-
let op = StageSourceHelper::get_op(&rename_me, &self.plan.stage).await?;
58-
if plan.stage.stage_type == StageType::Internal {
59-
user_mgr
60-
.remove_files(&tenant, &plan.stage.stage_name, files.clone())
61-
.await?;
62-
}
52+
let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
53+
let op = StageSourceHelper::get_op(&table_ctx, &self.plan.stage).await?;
6354

64-
for name in files.into_iter() {
65-
let obj = format!("{}/{}", plan.stage.get_prefix(), name);
66-
debug!("Removing object: {}", obj);
67-
let _ = op.object(&obj).delete().await;
55+
for name in files.iter().map(|f| f.path.as_str()) {
56+
op.object(&format!("{prefix}{name}")).delete().await?;
6857
}
6958

7059
Ok(Box::pin(DataBlockStream::create(

src/query/service/src/interpreters/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
109109
pub use interpreter_common::append2table;
110110
pub use interpreter_common::commit2table;
111111
pub use interpreter_common::list_files_from_dal;
112-
pub use interpreter_common::list_files_from_meta_api;
113112
pub use interpreter_database_create::CreateDatabaseInterpreter;
114113
pub use interpreter_database_drop::DropDatabaseInterpreter;
115114
pub use interpreter_database_rename::RenameDatabaseInterpreter;

src/query/service/src/procedures/systems/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@ mod clustering_information;
1616
mod fuse_segment;
1717
mod fuse_snapshot;
1818
mod search_tables;
19-
mod sync_stage;
2019
mod system;
2120

2221
pub use clustering_information::ClusteringInformationProcedure;
2322
pub use fuse_segment::FuseSegmentProcedure;
2423
pub use fuse_snapshot::FuseSnapshotProcedure;
2524
pub use search_tables::SearchTablesProcedure;
26-
pub use sync_stage::SyncStageFileProcedure;
2725
pub use system::SystemProcedure;

src/query/service/src/procedures/systems/sync_stage.rs

Lines changed: 0 additions & 79 deletions
This file was deleted.

src/query/service/src/procedures/systems/system.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use super::SyncStageFileProcedure;
1615
use crate::procedures::systems::ClusteringInformationProcedure;
1716
use crate::procedures::systems::FuseSegmentProcedure;
1817
use crate::procedures::systems::FuseSnapshotProcedure;
@@ -39,9 +38,5 @@ impl SystemProcedure {
3938
"system$search_tables",
4039
Box::new(SearchTablesProcedure::try_create),
4140
);
42-
factory.register(
43-
"system$sync_stage_file",
44-
Box::new(SyncStageFileProcedure::try_create),
45-
)
4641
}
4742
}

src/query/service/src/servers/http/v1/stage.rs

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@
1414

1515
use std::sync::Arc;
1616

17-
use chrono::DateTime;
18-
use chrono::TimeZone;
19-
use common_datavalues::chrono::Utc;
20-
use common_meta_types::StageFile;
21-
use common_meta_types::StageType;
2217
use poem::error::InternalServerError;
2318
use poem::error::Result as PoemResult;
2419
use poem::http::StatusCode;
@@ -85,12 +80,6 @@ pub async fn upload_to_stage(
8580

8681
let prefix = stage.get_prefix();
8782

88-
let tenant = context.get_tenant();
89-
let user = context
90-
.get_current_user()
91-
.map_err(InternalServerError)?
92-
.identity();
93-
9483
let mut files = vec![];
9584
while let Ok(Some(field)) = multipart.next_field().await {
9685
let name = match field.file_name() {
@@ -108,26 +97,6 @@ pub async fn upload_to_stage(
10897
.await
10998
.map_err(InternalServerError)?;
11099

111-
if stage.stage_type == StageType::Internal {
112-
let meta = op
113-
.object(&obj)
114-
.metadata()
115-
.await
116-
.map_err(InternalServerError)?;
117-
let file = StageFile {
118-
path: file_path,
119-
size: meta.content_length(),
120-
md5: meta.content_md5().map(str::to_string),
121-
last_modified: meta.last_modified().map_or(DateTime::default(), |t| {
122-
Utc.timestamp(t.unix_timestamp(), 0)
123-
}),
124-
creator: Some(user.clone()),
125-
};
126-
user_mgr
127-
.add_file(&tenant, &stage.stage_name, file)
128-
.await
129-
.map_err(InternalServerError)?;
130-
}
131100
files.push(name.clone());
132101
}
133102

src/query/service/src/storages/stage/stage_source.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ impl StageSourceHelper {
116116
))
117117
}
118118

119+
/// TODO: we should support construct operator with
120+
/// correct root.
119121
pub async fn get_op(ctx: &Arc<dyn TableContext>, stage: &UserStageInfo) -> Result<Operator> {
120122
if stage.stage_type == StageType::Internal {
121123
ctx.get_storage_operator()

0 commit comments

Comments
 (0)