Skip to content

Commit 13d3760

Browse files
author
julien
committed
refactor: Use JDBC reader factory in db import command
1 parent ca3d389 commit 13d3760

File tree

2 files changed

+38
-65
lines changed

2 files changed

+38
-65
lines changed

plugins/riot/src/main/java/com/redis/riot/DatabaseImport.java

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,8 @@
22

33
import java.util.Map;
44

5-
import javax.sql.DataSource;
6-
75
import org.springframework.batch.core.Job;
8-
import org.springframework.batch.item.database.JdbcCursorItemReader;
96
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
10-
import org.springframework.jdbc.core.ColumnMapRowMapper;
11-
import org.springframework.util.Assert;
12-
13-
import com.redis.riot.core.RiotException;
147

158
import picocli.CommandLine.ArgGroup;
169
import picocli.CommandLine.Command;
@@ -30,34 +23,10 @@ public class DatabaseImport extends AbstractRedisImportCommand {
3023

3124
@Override
3225
protected Job job() {
33-
return job(step(reader()));
34-
}
35-
36-
protected JdbcCursorItemReader<Map<String, Object>> reader() {
37-
Assert.hasLength(sql, "No SQL statement specified");
38-
log.info("Creating data source with {}", dataSourceArgs);
39-
DataSource dataSource;
40-
try {
41-
dataSource = dataSourceArgs.dataSource();
42-
} catch (Exception e) {
43-
throw new RiotException(e);
44-
}
45-
log.info("Creating JDBC reader with sql=\"{}\" {}", sql, readerArgs);
46-
JdbcCursorItemReaderBuilder<Map<String, Object>> reader = new JdbcCursorItemReaderBuilder<>();
47-
reader.dataSource(dataSource);
48-
reader.sql(sql);
49-
reader.saveState(false);
50-
reader.rowMapper(new ColumnMapRowMapper());
51-
reader.fetchSize(readerArgs.getFetchSize());
52-
reader.maxRows(readerArgs.getMaxRows());
53-
reader.queryTimeout(Math.toIntExact(readerArgs.getQueryTimeout().getValue().toMillis()));
54-
reader.useSharedExtendedConnection(readerArgs.isUseSharedExtendedConnection());
55-
reader.verifyCursorPosition(readerArgs.isVerifyCursorPosition());
56-
if (readerArgs.getMaxItemCount() > 0) {
57-
reader.maxItemCount(readerArgs.getMaxItemCount());
58-
}
59-
reader.name(sql);
60-
return reader.build();
26+
log.info("Creating JDBC reader with sql=\"{}\" {} {}", sql, dataSourceArgs, readerArgs);
27+
JdbcCursorItemReaderBuilder<Map<String, Object>> reader = JdbcCursorItemReaderFactory.create(sql,
28+
dataSourceArgs, readerArgs);
29+
return job(step(reader.build()));
6130
}
6231

6332
public String getSql() {
Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,42 @@
11
package com.redis.riot;
22

3-
import com.redis.riot.core.RiotException;
4-
import org.springframework.batch.item.database.JdbcCursorItemReader;
3+
import java.util.Map;
4+
5+
import javax.sql.DataSource;
6+
57
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
68
import org.springframework.jdbc.core.ColumnMapRowMapper;
9+
import org.springframework.util.Assert;
710

8-
import javax.sql.DataSource;
9-
import java.util.Map;
11+
import com.redis.riot.core.RiotException;
1012

1113
public class JdbcCursorItemReaderFactory {
12-
public static JdbcCursorItemReader<Map<String, Object>> createReader(String sql,
13-
DataSourceArgs dataSourceArgs,
14-
DatabaseReaderArgs readerArgs) {
15-
DataSource dataSource;
16-
try {
17-
dataSource = dataSourceArgs.dataSource();
18-
} catch (Exception e) {
19-
throw new RiotException(e);
20-
}
21-
22-
JdbcCursorItemReaderBuilder<Map<String, Object>> reader = new JdbcCursorItemReaderBuilder<>();
23-
reader.dataSource(dataSource);
24-
reader.sql(sql);
25-
reader.saveState(false);
26-
reader.rowMapper(new ColumnMapRowMapper());
27-
reader.fetchSize(readerArgs.getFetchSize());
28-
reader.maxRows(readerArgs.getMaxRows());
29-
reader.queryTimeout(Math.toIntExact(readerArgs.getQueryTimeout().getValue().toMillis()));
30-
reader.useSharedExtendedConnection(readerArgs.isUseSharedExtendedConnection());
31-
reader.verifyCursorPosition(readerArgs.isVerifyCursorPosition());
32-
if (readerArgs.getMaxItemCount() > 0) {
33-
reader.maxItemCount(readerArgs.getMaxItemCount());
34-
}
35-
reader.name(sql);
36-
return reader.build();
37-
}
14+
15+
public static JdbcCursorItemReaderBuilder<Map<String, Object>> create(String sql, DataSourceArgs dataSourceArgs,
16+
DatabaseReaderArgs readerArgs) {
17+
Assert.hasLength(sql, "No SQL statement specified");
18+
DataSource dataSource;
19+
try {
20+
dataSource = dataSourceArgs.dataSource();
21+
} catch (Exception e) {
22+
throw new RiotException(e);
23+
}
24+
25+
JdbcCursorItemReaderBuilder<Map<String, Object>> reader = new JdbcCursorItemReaderBuilder<>();
26+
reader.dataSource(dataSource);
27+
reader.sql(sql);
28+
reader.saveState(false);
29+
reader.rowMapper(new ColumnMapRowMapper());
30+
reader.fetchSize(readerArgs.getFetchSize());
31+
reader.maxRows(readerArgs.getMaxRows());
32+
reader.queryTimeout(Math.toIntExact(readerArgs.getQueryTimeout().getValue().toMillis()));
33+
reader.useSharedExtendedConnection(readerArgs.isUseSharedExtendedConnection());
34+
reader.verifyCursorPosition(readerArgs.isVerifyCursorPosition());
35+
if (readerArgs.getMaxItemCount() > 0) {
36+
reader.maxItemCount(readerArgs.getMaxItemCount());
37+
}
38+
reader.name(sql);
39+
return reader;
40+
}
41+
3842
}

0 commit comments

Comments
 (0)