Skip to content

Commit 5a1a8af

Browse files
authored
Merge pull request #172 from redis/FIELDENG-558-cleanup-and-refactor-snowflake
refactor to use common reader
2 parents b1f10bf + 658bc8f commit 5a1a8af

File tree

2 files changed

+44
-31
lines changed

2 files changed

+44
-31
lines changed

plugins/riot/src/main/java/com/redis/riot/DatabaseImport.java

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,8 @@
22

33
import java.util.Map;
44

5-
import javax.sql.DataSource;
6-
75
import org.springframework.batch.core.Job;
86
import org.springframework.batch.item.database.JdbcCursorItemReader;
9-
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
10-
import org.springframework.jdbc.core.ColumnMapRowMapper;
11-
import org.springframework.util.Assert;
12-
13-
import com.redis.riot.core.RiotException;
147

158
import picocli.CommandLine.ArgGroup;
169
import picocli.CommandLine.Command;
@@ -34,30 +27,8 @@ protected Job job() {
3427
}
3528

3629
protected JdbcCursorItemReader<Map<String, Object>> reader() {
37-
Assert.hasLength(sql, "No SQL statement specified");
38-
log.info("Creating data source with {}", dataSourceArgs);
39-
DataSource dataSource;
40-
try {
41-
dataSource = dataSourceArgs.dataSource();
42-
} catch (Exception e) {
43-
throw new RiotException(e);
44-
}
45-
log.info("Creating JDBC reader with sql=\"{}\" {}", sql, readerArgs);
46-
JdbcCursorItemReaderBuilder<Map<String, Object>> reader = new JdbcCursorItemReaderBuilder<>();
47-
reader.dataSource(dataSource);
48-
reader.sql(sql);
49-
reader.saveState(false);
50-
reader.rowMapper(new ColumnMapRowMapper());
51-
reader.fetchSize(readerArgs.getFetchSize());
52-
reader.maxRows(readerArgs.getMaxRows());
53-
reader.queryTimeout(Math.toIntExact(readerArgs.getQueryTimeout().getValue().toMillis()));
54-
reader.useSharedExtendedConnection(readerArgs.isUseSharedExtendedConnection());
55-
reader.verifyCursorPosition(readerArgs.isVerifyCursorPosition());
56-
if (readerArgs.getMaxItemCount() > 0) {
57-
reader.maxItemCount(readerArgs.getMaxItemCount());
58-
}
59-
reader.name(sql);
60-
return reader.build();
30+
log.info("Creating JDBC reader with sql=\"{}\" {} {}", sql, dataSourceArgs, readerArgs);
31+
return JdbcCursorItemReaderFactory.create(sql, dataSourceArgs, readerArgs).build();
6132
}
6233

6334
public String getSql() {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.redis.riot;
2+
3+
import java.util.Map;
4+
5+
import javax.sql.DataSource;
6+
7+
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
8+
import org.springframework.jdbc.core.ColumnMapRowMapper;
9+
import org.springframework.util.Assert;
10+
11+
import com.redis.riot.core.RiotException;
12+
13+
public class JdbcCursorItemReaderFactory {
14+
15+
public static JdbcCursorItemReaderBuilder<Map<String, Object>> create(String sql, DataSourceArgs dataSourceArgs,
16+
DatabaseReaderArgs readerArgs) {
17+
Assert.hasLength(sql, "No SQL statement specified");
18+
DataSource dataSource;
19+
try {
20+
dataSource = dataSourceArgs.dataSource();
21+
} catch (Exception e) {
22+
throw new RiotException(e);
23+
}
24+
25+
JdbcCursorItemReaderBuilder<Map<String, Object>> reader = new JdbcCursorItemReaderBuilder<>();
26+
reader.dataSource(dataSource);
27+
reader.sql(sql);
28+
reader.saveState(false);
29+
reader.rowMapper(new ColumnMapRowMapper());
30+
reader.fetchSize(readerArgs.getFetchSize());
31+
reader.maxRows(readerArgs.getMaxRows());
32+
reader.queryTimeout(Math.toIntExact(readerArgs.getQueryTimeout().getValue().toMillis()));
33+
reader.useSharedExtendedConnection(readerArgs.isUseSharedExtendedConnection());
34+
reader.verifyCursorPosition(readerArgs.isVerifyCursorPosition());
35+
if (readerArgs.getMaxItemCount() > 0) {
36+
reader.maxItemCount(readerArgs.getMaxItemCount());
37+
}
38+
reader.name(sql);
39+
return reader;
40+
}
41+
42+
}

0 commit comments

Comments
 (0)