Skip to content

Commit 6069064

Browse files
committed
tests: try make integrated test
Signed-off-by: ClSlaid <cailue@bupt.edu.cn>
1 parent 1fc9eac commit 6069064

File tree

4 files changed

+56
-7
lines changed

4 files changed

+56
-7
lines changed

src/common/storage/src/location.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ pub fn parse_uri_location(l: &UriLocation) -> Result<(StorageParams, String)> {
4444
// Path endswith `/` means it's a directory, otherwise it's a file.
4545
// If the path is a directory, we will use this path as root.
4646
// If the path is a file, we will use `/` as root (which is the default value)
47-
let (mut root, mut path) = if l.path.ends_with('/') {
47+
let (root, path) = if l.path.ends_with('/') {
4848
(l.path.as_str(), "/")
4949
} else {
5050
("/", l.path.as_str())
@@ -98,14 +98,9 @@ pub fn parse_uri_location(l: &UriLocation) -> Result<(StorageParams, String)> {
9898
root: root.to_string(),
9999
}),
100100
Scheme::Ipfs => {
101-
if l.name.ends_with('/') {
102-
root = l.name.as_str();
103-
} else {
104-
path = l.name.as_str();
105-
}
106101
StorageParams::Ipfs(StorageIpfsConfig {
107102
endpoint_url: STORAGE_IPFS_DEFAULT_ENDPOINT.to_string(),
108-
root: "/ipfs/".to_string() + root,
103+
root: "/ipfs/".to_string() + l.name.as_str(),
109104
})
110105
}
111106
Scheme::S3 => StorageParams::S3(StorageS3Config {

src/query/service/src/interpreters/interpreter_copy_v2.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,15 +163,18 @@ impl CopyInterpreterV2 {
163163
match &from.source_info {
164164
SourceInfo::StageSource(table_info) => {
165165
let path = &table_info.path;
166+
tracing::debug!("list files under path: {}", path);
166167
// Here we add the path to the file: /path/to/path/file1.
167168
let files_with_path = if !files.is_empty() {
169+
tracing::trace!("got files offered in list");
168170
let mut files_with_path = vec![];
169171
for file in files {
170172
let new_path = Path::new(path).join(file);
171173
files_with_path.push(new_path.to_string_lossy().to_string());
172174
}
173175
files_with_path
174176
} else if !path.ends_with('/') {
177+
tracing::trace!("path not ends with '/'");
175178
let rename_me: Arc<dyn TableContext> = self.ctx.clone();
176179
let op = StageSourceHelper::get_op(&rename_me, &table_info.stage_info).await?;
177180
if op.object(path).is_exist().await? {
@@ -180,6 +183,7 @@ impl CopyInterpreterV2 {
180183
vec![]
181184
}
182185
} else {
186+
tracing::trace!("have to list directories in stage");
183187
let rename_me: Arc<dyn TableContext> = self.ctx.clone();
184188
let op = StageSourceHelper::get_op(&rename_me, &table_info.stage_info).await?;
185189

@@ -191,8 +195,10 @@ impl CopyInterpreterV2 {
191195
let mut objects = op.batch().walk_top_down(path)?;
192196
while let Some(de) = objects.try_next().await? {
193197
if de.mode().is_dir() {
198+
tracing::trace!("listing in stage path:{} got directory: {}", path, de.path());
194199
continue;
195200
}
201+
tracing::trace!("listing in stage path:{} got file: {}",path, de.path());
196202
list.insert(de.path().to_string());
197203
}
198204

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
199 2020.0 769
2+
199 2020.0 769
3+
199 2020.0 769
4+
199 2020.0 769
5+
199 2020.0 769
6+
398 2020.0 1538
7+
398 2020.0 1538
8+
398 2020.0 1538
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
. "$CURDIR"/../../../shell_env.sh
5+
TABLE=ontime200
6+
FILE=ontime_200
7+
QMHASH=Qmei4dyyPazUy24cfCAtmumC1Ff1VLLiqjzF1zCXXVtvSk
8+
9+
echo "drop table if exists ${TABLE};" | $MYSQL_CLIENT_CONNECT
10+
11+
## Create table
12+
cat $CURDIR/../ddl/ontime.sql | sed "s/ontime/$TABLE/g" | $MYSQL_CLIENT_CONNECT
13+
14+
copy_from_location_cases=(
15+
# copy csv
16+
"copy into $TABLE from 'ipfs://$QMHASH/$FILE.csv' FILE_FORMAT = (type = 'CSV' field_delimiter = ',' record_delimiter = '\n' skip_header = 1)"
17+
# copy gzip csv
18+
"copy into $TABLE from 'ipfs://$QMHASH/$FILE.csv.gz' FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'gzip' record_delimiter = '\n' skip_header = 1)"
19+
# copy zstd csv
20+
"copy into $TABLE from 'ipfs://$QMHASH/$FILE.csv.zstd' FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'zstd' record_delimiter = '\n' skip_header = 1)"
21+
# copy bz2 csv
22+
"copy into $TABLE from 'ipfs://$QMHASH/$FILE.csv.bz2' FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'bz2' record_delimiter = '\n' skip_header = 1)"
23+
# copy xz csv
24+
"copy into $TABLE from 'ipfs://$QMHASH/$FILE.csv.xz' FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'xz' record_delimiter = '\n' skip_header = 1)"
25+
# copy file
26+
"copy into $TABLE from 'ipfs://$QMHASH' FILES = ('$FILE.csv', '${FILE}_v1.csv') FILE_FORMAT = (type = 'CSV' field_delimiter = ',' record_delimiter = '\n' skip_header = 1)"
27+
# copy dir with pattern
28+
"copy into $TABLE from 'ipfs://$QMHASH' PATTERN = 'ontime.*csv' FILE_FORMAT = (type = 'CSV' field_delimiter = ',' record_delimiter = '\n' skip_header = 1)"
29+
# copy parquet
30+
"copy into $TABLE from 'ipfs://$QMHASH' PATTERN = 'ontime.*parquet' FILE_FORMAT = (type = 'PARQUET')"
31+
)
32+
33+
for i in "${copy_from_location_cases[@]}"; do
34+
echo "$i" | $MYSQL_CLIENT_CONNECT
35+
echo "select count(1), avg(Year), sum(DayOfWeek) from $TABLE" | $MYSQL_CLIENT_CONNECT
36+
echo "truncate table $TABLE" | $MYSQL_CLIENT_CONNECT
37+
done
38+
39+
## Drop table
40+
echo "drop table if exists $TABLE;" | $MYSQL_CLIENT_CONNECT

0 commit comments

Comments
 (0)