Skip to content

Commit ff0805f

Browse files
committed
Fix test
Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent 9918909 commit ff0805f

File tree

6 files changed

+13
-52
lines changed

6 files changed

+13
-52
lines changed

src/meta/types/src/user_stage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ impl UserStageInfo {
258258
}
259259
}
260260

261-
#[derive(Default, Clone)]
261+
#[derive(Default, Debug, Clone)]
262262
pub struct StageFile {
263263
pub path: String,
264264
pub size: u64,

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use common_meta_types::StageFile;
2727
use common_meta_types::UserStageInfo;
2828
use futures::TryStreamExt;
2929
use regex::Regex;
30+
use tracing::debug;
3031
use tracing::warn;
3132

3233
use crate::pipelines::executor::ExecutorSettings;
@@ -151,8 +152,8 @@ pub async fn list_files_from_dal(
151152
path: &str,
152153
pattern: &str,
153154
) -> Result<Vec<StageFile>> {
154-
let rename_me_qry_ctx: Arc<dyn TableContext> = ctx.clone();
155-
let op = StageSourceHelper::get_op(&rename_me_qry_ctx, stage).await?;
155+
let table_ctx: Arc<dyn TableContext> = ctx.clone();
156+
let op = StageSourceHelper::get_op(&table_ctx, stage).await?;
156157
let prefix = stage.get_prefix();
157158
let mut files = Vec::new();
158159

@@ -162,7 +163,7 @@ pub async fn list_files_from_dal(
162163
let dir_path = match op.object(path).metadata().await {
163164
Ok(meta) if meta.mode().is_dir() => Some(path.to_string()),
164165
Ok(meta) if !meta.mode().is_dir() => {
165-
files.push((path.to_string(), meta));
166+
files.push((path.trim_start_matches(&prefix).to_string(), meta));
166167

167168
Some(format!("{path}/"))
168169
}
@@ -219,5 +220,7 @@ pub async fn list_files_from_dal(
219220
creator: None,
220221
})
221222
.collect::<Vec<StageFile>>();
223+
224+
debug!("listed files: {:?}", matched_files);
222225
Ok(matched_files)
223226
}

src/query/service/src/interpreters/interpreter_user_stage_remove.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use common_exception::Result;
1818
use common_planners::RemoveUserStagePlan;
1919
use common_streams::DataBlockStream;
2020
use common_streams::SendableDataBlockStream;
21-
use tracing::debug;
2221

2322
use crate::interpreters::interpreter_common::list_files;
2423
use crate::interpreters::Interpreter;
@@ -47,15 +46,14 @@ impl Interpreter for RemoveUserStageInterpreter {
4746
#[tracing::instrument(level = "info", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
4847
async fn execute(&self) -> Result<SendableDataBlockStream> {
4948
let plan = self.plan.clone();
49+
let prefix = plan.stage.get_prefix();
5050

5151
let files = list_files(&self.ctx, &plan.stage, &plan.path, &plan.pattern).await?;
52-
let rename_me: Arc<dyn TableContext> = self.ctx.clone();
53-
let op = StageSourceHelper::get_op(&rename_me, &self.plan.stage).await?;
52+
let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
53+
let op = StageSourceHelper::get_op(&table_ctx, &self.plan.stage).await?;
5454

5555
for name in files.iter().map(|f| f.path.as_str()) {
56-
let obj = format!("{}/{}", plan.stage.get_prefix(), name);
57-
debug!("Removing object: {}", obj);
58-
let _ = op.object(&obj).delete().await;
56+
op.object(&format!("{prefix}{name}")).delete().await?;
5957
}
6058

6159
Ok(Box::pin(DataBlockStream::create(

src/query/service/src/storages/stage/stage_source.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ impl StageSourceHelper {
116116
))
117117
}
118118

119+
/// TODO: we should support construct operator with
120+
/// correct root.
119121
pub async fn get_op(ctx: &Arc<dyn TableContext>, stage: &UserStageInfo) -> Result<Operator> {
120122
if stage.stage_type == StageType::Internal {
121123
ctx.get_storage_operator()

tests/suites/1_stateful/01_load_v2/01_0003_sync_stage_file.result

Lines changed: 0 additions & 10 deletions
This file was deleted.

tests/suites/1_stateful/01_load_v2/01_0003_sync_stage_file.sh

Lines changed: 0 additions & 32 deletions
This file was deleted.

0 commit comments

Comments
 (0)