Skip to content

Commit 36a2496

Browse files
committed
Workaround for opendal's bug
Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent 1343c60 commit 36a2496

File tree

2 files changed

+16
-11
lines changed

2 files changed

+16
-11
lines changed

src/common/storage/src/operator.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ pub fn init_azblob_operator(cfg: &StorageAzblobConfig) -> Result<Operator> {
7676
builder.account_name(&cfg.account_name);
7777
builder.account_key(&cfg.account_key);
7878

79-
Ok(Operator::new(builder.build()?))
79+
Ok(Operator::new(builder.build()?).layer(LoggingLayer))
8080
}
8181

8282
/// init_fs_operator will init a opendal fs operator.
@@ -89,7 +89,7 @@ pub fn init_fs_operator(cfg: &StorageFsConfig) -> Result<Operator> {
8989
}
9090
builder.root(&path);
9191

92-
Ok(Operator::new(builder.build()?))
92+
Ok(Operator::new(builder.build()?).layer(LoggingLayer))
9393
}
9494

9595
/// init_gcs_operator will init a opendal gcs operator.
@@ -103,7 +103,7 @@ pub fn init_gcs_operator(cfg: &StorageGcsConfig) -> Result<Operator> {
103103
.credential(&cfg.credential)
104104
.build()?;
105105

106-
Ok(Operator::new(accessor))
106+
Ok(Operator::new(accessor).layer(LoggingLayer))
107107
}
108108

109109
/// init_hdfs_operator will init an opendal hdfs operator.
@@ -119,7 +119,7 @@ pub fn init_hdfs_operator(cfg: &super::StorageHdfsConfig) -> Result<Operator> {
119119
// Root
120120
builder.root(&cfg.root);
121121

122-
Ok(Operator::new(builder.build()?))
122+
Ok(Operator::new(builder.build()?).layer(LoggingLayer))
123123
}
124124

125125
pub fn init_http_operator(cfg: &StorageHttpConfig) -> Result<Operator> {
@@ -141,14 +141,16 @@ pub fn init_http_operator(cfg: &StorageHttpConfig) -> Result<Operator> {
141141
immutable_layer.insert(i);
142142
}
143143

144-
Ok(Operator::new(builder.build()?).layer(immutable_layer))
144+
Ok(Operator::new(builder.build()?)
145+
.layer(LoggingLayer)
146+
.layer(immutable_layer))
145147
}
146148

147149
/// init_memory_operator will init a opendal memory operator.
148150
pub fn init_memory_operator() -> Result<Operator> {
149151
let mut builder = memory::Builder::default();
150152

151-
Ok(Operator::new(builder.build()?))
153+
Ok(Operator::new(builder.build()?).layer(LoggingLayer))
152154
}
153155

154156
/// init_s3_operator will init a opendal s3 operator with input s3 config.
@@ -181,7 +183,7 @@ pub fn init_s3_operator(cfg: &StorageS3Config) -> Result<Operator> {
181183
builder.enable_virtual_host_style();
182184
}
183185

184-
Ok(Operator::new(builder.build()?))
186+
Ok(Operator::new(builder.build()?).layer(LoggingLayer))
185187
}
186188

187189
/// init_obs_operator will init a opendal obs operator with input obs config.
@@ -197,7 +199,7 @@ pub fn init_obs_operator(cfg: &StorageObsConfig) -> Result<Operator> {
197199
builder.access_key_id(&cfg.access_key_id);
198200
builder.secret_access_key(&cfg.secret_access_key);
199201

200-
Ok(Operator::new(builder.build()?))
202+
Ok(Operator::new(builder.build()?).layer(LoggingLayer))
201203
}
202204

203205
pub struct StorageOperator;

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashSet;
12
// Copyright 2022 Datafuse Labs.
23
//
34
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -87,18 +88,20 @@ impl CopyInterpreterV2 {
8788
} else {
8889
let rename_me: Arc<dyn TableContext> = self.ctx.clone();
8990
let op = StageSourceHelper::get_op(&rename_me, &table_info.stage_info).await?;
90-
let mut list = vec![];
91+
// TODO: Workaround for OpenDAL's bug: https://github.com/datafuselabs/opendal/issues/670
92+
// Should be removed after OpenDAL fixes.
93+
let mut list = HashSet::new();
9194

9295
// TODO: we could rewrite into try_collect.
9396
let mut objects = op.batch().walk_top_down(path)?;
9497
while let Some(de) = objects.try_next().await? {
9598
if de.mode().is_dir() {
9699
continue;
97100
}
98-
list.push(de.path().to_string());
101+
list.insert(de.path().to_string());
99102
}
100103

101-
list
104+
list.into_iter().collect::<Vec<_>>()
102105
};
103106

104107
tracing::info!("listed files: {:?}", &files_with_path);

0 commit comments

Comments
 (0)