Skip to content

Commit 5c0babe

Browse files
committed
refactor: Always list from OpenDAL instead of meta
Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent af3d36f commit 5c0babe

File tree

10 files changed

+2
-407
lines changed

10 files changed

+2
-407
lines changed

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 1 addition & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use common_exception::ErrorCode;
2424
use common_exception::Result;
2525
use common_meta_types::GrantObject;
2626
use common_meta_types::StageFile;
27-
use common_meta_types::StageType;
2827
use common_meta_types::UserStageInfo;
2928
use futures::TryStreamExt;
3029
use regex::Regex;
@@ -138,10 +137,7 @@ pub async fn list_files(
138137
path: &str,
139138
pattern: &str,
140139
) -> Result<Vec<StageFile>> {
141-
match stage.stage_type {
142-
StageType::Internal => list_files_from_meta_api(ctx, stage, path, pattern).await,
143-
StageType::External => list_files_from_dal(ctx, stage, path, pattern).await,
144-
}
140+
list_files_from_dal(ctx, stage, path, pattern).await
145141
}
146142

147143
/// List files from DAL in recursive way.
@@ -224,64 +220,3 @@ pub async fn list_files_from_dal(
224220
.collect::<Vec<StageFile>>();
225221
Ok(matched_files)
226222
}
227-
228-
pub async fn list_files_from_meta_api(
229-
ctx: &Arc<QueryContext>,
230-
stage: &UserStageInfo,
231-
path: &str,
232-
pattern: &str,
233-
) -> Result<Vec<StageFile>> {
234-
let tenant = ctx.get_tenant();
235-
let user_mgr = ctx.get_user_manager();
236-
let prefix = stage.get_prefix();
237-
238-
if stage.number_of_files == 0 {
239-
// try to sync files from dal
240-
if let Ok(files) = list_files_from_dal(ctx, stage, &prefix, "").await {
241-
for file in files.iter() {
242-
let mut file = file.clone();
243-
// In internal stage, files with `/stage/<stage_name>/` prefix will be ignored.
244-
// TODO: prefix of internal stage should be as root path.
245-
file.path = file
246-
.path
247-
.trim_start_matches(&prefix.trim_start_matches('/'))
248-
.to_string();
249-
let _ = user_mgr.add_file(&tenant, &stage.stage_name, file).await;
250-
}
251-
}
252-
}
253-
254-
let regex = if !pattern.is_empty() {
255-
Some(Regex::new(pattern).map_err(|e| {
256-
ErrorCode::SyntaxException(format!(
257-
"Pattern format invalid, got:{}, error:{:?}",
258-
pattern, e
259-
))
260-
})?)
261-
} else {
262-
None
263-
};
264-
265-
let files = user_mgr
266-
.list_files(&tenant, &stage.stage_name)
267-
.await?
268-
.iter()
269-
.filter(|file| {
270-
let name = format!("{}{}", prefix, file.path);
271-
if path.ends_with('/') {
272-
name.starts_with(path)
273-
} else {
274-
name.starts_with(&format!("{path}/")) || name == path
275-
}
276-
})
277-
.filter(|file| {
278-
if let Some(regex) = &regex {
279-
regex.is_match(&file.path)
280-
} else {
281-
true
282-
}
283-
})
284-
.cloned()
285-
.collect::<Vec<_>>();
286-
Ok(files)
287-
}

src/query/service/src/interpreters/interpreter_user_stage_remove.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::sync::Arc;
1616

1717
use common_exception::Result;
18-
use common_meta_types::StageType;
1918
use common_planners::RemoveUserStagePlan;
2019
use common_streams::DataBlockStream;
2120
use common_streams::SendableDataBlockStream;
@@ -48,20 +47,12 @@ impl Interpreter for RemoveUserStageInterpreter {
4847
#[tracing::instrument(level = "info", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
4948
async fn execute(&self) -> Result<SendableDataBlockStream> {
5049
let plan = self.plan.clone();
51-
let user_mgr = self.ctx.get_user_manager();
52-
let tenant = self.ctx.get_tenant();
5350

5451
let files = list_files(&self.ctx, &plan.stage, &plan.path, &plan.pattern).await?;
55-
let files = files.iter().map(|f| f.path.clone()).collect::<Vec<_>>();
5652
let rename_me: Arc<dyn TableContext> = self.ctx.clone();
5753
let op = StageSourceHelper::get_op(&rename_me, &self.plan.stage).await?;
58-
if plan.stage.stage_type == StageType::Internal {
59-
user_mgr
60-
.remove_files(&tenant, &plan.stage.stage_name, files.clone())
61-
.await?;
62-
}
6354

64-
for name in files.into_iter() {
55+
for name in files.iter().map(|f| f.path.as_str()) {
6556
let obj = format!("{}/{}", plan.stage.get_prefix(), name);
6657
debug!("Removing object: {}", obj);
6758
let _ = op.object(&obj).delete().await;

src/query/service/src/interpreters/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ pub use interpreter_cluster_key_drop::DropTableClusterKeyInterpreter;
109109
pub use interpreter_common::append2table;
110110
pub use interpreter_common::commit2table;
111111
pub use interpreter_common::list_files_from_dal;
112-
pub use interpreter_common::list_files_from_meta_api;
113112
pub use interpreter_database_create::CreateDatabaseInterpreter;
114113
pub use interpreter_database_drop::DropDatabaseInterpreter;
115114
pub use interpreter_database_rename::RenameDatabaseInterpreter;

src/query/service/src/procedures/systems/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@ mod clustering_information;
1616
mod fuse_segment;
1717
mod fuse_snapshot;
1818
mod search_tables;
19-
mod sync_stage;
2019
mod system;
2120

2221
pub use clustering_information::ClusteringInformationProcedure;
2322
pub use fuse_segment::FuseSegmentProcedure;
2423
pub use fuse_snapshot::FuseSnapshotProcedure;
2524
pub use search_tables::SearchTablesProcedure;
26-
pub use sync_stage::SyncStageFileProcedure;
2725
pub use system::SystemProcedure;

src/query/service/src/procedures/systems/sync_stage.rs

Lines changed: 0 additions & 79 deletions
This file was deleted.

src/query/service/src/procedures/systems/system.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use super::SyncStageFileProcedure;
1615
use crate::procedures::systems::ClusteringInformationProcedure;
1716
use crate::procedures::systems::FuseSegmentProcedure;
1817
use crate::procedures::systems::FuseSnapshotProcedure;
@@ -39,9 +38,5 @@ impl SystemProcedure {
3938
"system$search_tables",
4039
Box::new(SearchTablesProcedure::try_create),
4140
);
42-
factory.register(
43-
"system$sync_stage_file",
44-
Box::new(SyncStageFileProcedure::try_create),
45-
)
4641
}
4742
}

src/query/service/src/servers/http/v1/stage.rs

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@
1414

1515
use std::sync::Arc;
1616

17-
use chrono::DateTime;
18-
use chrono::TimeZone;
19-
use common_datavalues::chrono::Utc;
20-
use common_meta_types::StageFile;
21-
use common_meta_types::StageType;
2217
use poem::error::InternalServerError;
2318
use poem::error::Result as PoemResult;
2419
use poem::http::StatusCode;
@@ -85,12 +80,6 @@ pub async fn upload_to_stage(
8580

8681
let prefix = stage.get_prefix();
8782

88-
let tenant = context.get_tenant();
89-
let user = context
90-
.get_current_user()
91-
.map_err(InternalServerError)?
92-
.identity();
93-
9483
let mut files = vec![];
9584
while let Ok(Some(field)) = multipart.next_field().await {
9685
let name = match field.file_name() {
@@ -108,26 +97,6 @@ pub async fn upload_to_stage(
10897
.await
10998
.map_err(InternalServerError)?;
11099

111-
if stage.stage_type == StageType::Internal {
112-
let meta = op
113-
.object(&obj)
114-
.metadata()
115-
.await
116-
.map_err(InternalServerError)?;
117-
let file = StageFile {
118-
path: file_path,
119-
size: meta.content_length(),
120-
md5: meta.content_md5().map(str::to_string),
121-
last_modified: meta.last_modified().map_or(DateTime::default(), |t| {
122-
Utc.timestamp(t.unix_timestamp(), 0)
123-
}),
124-
creator: Some(user.clone()),
125-
};
126-
user_mgr
127-
.add_file(&tenant, &stage.stage_name, file)
128-
.await
129-
.map_err(InternalServerError)?;
130-
}
131100
files.push(name.clone());
132101
}
133102

0 commit comments

Comments
 (0)