Skip to content

Commit ab8c930

Browse files
committed
added export
1 parent a14af81 commit ab8c930

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+831
-430
lines changed

pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@
103103
<artifactId>javafaker</artifactId>
104104
<version>0.18</version>
105105
</dependency>
106+
<dependency>
107+
<groupId>mysql</groupId>
108+
<artifactId>mysql-connector-java</artifactId>
109+
</dependency>
106110
</dependencies>
107111

108112
<build>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.redislabs.riot;
2+
3+
import picocli.CommandLine;
4+
import picocli.CommandLine.Command;
5+
6+
@Command(mixinStandardHelpOptions = true)
7+
public class BaseCommand implements Runnable {
8+
9+
@Override
10+
public void run() {
11+
new CommandLine(this).usage(System.out);
12+
}
13+
}
Lines changed: 23 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,34 @@
11
package com.redislabs.riot;
22

33
import java.net.InetAddress;
4-
import java.text.NumberFormat;
54
import java.util.List;
65
import java.util.Locale;
76

8-
import org.springframework.batch.core.ExitStatus;
9-
import org.springframework.batch.core.Job;
10-
import org.springframework.batch.core.JobExecution;
11-
import org.springframework.batch.core.JobParameters;
12-
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
13-
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
14-
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
15-
import org.springframework.batch.core.repository.JobRepository;
16-
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
17-
import org.springframework.batch.item.ItemProcessor;
18-
import org.springframework.batch.item.ItemStreamWriter;
19-
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
20-
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
217
import org.springframework.boot.CommandLineRunner;
228
import org.springframework.boot.SpringApplication;
239
import org.springframework.boot.autoconfigure.SpringBootApplication;
24-
import org.springframework.core.task.SyncTaskExecutor;
25-
import org.springframework.transaction.PlatformTransactionManager;
2610

27-
import com.redislabs.riot.batch.JobBuilder;
28-
import com.redislabs.riot.cli.BaseCommand;
29-
import com.redislabs.riot.cli.ImportCommand;
11+
import com.redislabs.riot.cli.Export;
12+
import com.redislabs.riot.cli.Import;
3013
import com.redislabs.riot.redis.RedisConnectionBuilder;
3114

3215
import io.lettuce.core.RedisURI;
3316
import lombok.Getter;
3417
import picocli.CommandLine;
3518
import picocli.CommandLine.Command;
3619
import picocli.CommandLine.DefaultExceptionHandler;
37-
import picocli.CommandLine.HelpCommand;
3820
import picocli.CommandLine.Option;
3921
import picocli.CommandLine.RunLast;
4022
import redis.clients.jedis.Protocol;
4123

4224
@SpringBootApplication
43-
@Command(name = "riot", subcommands = { HelpCommand.class, ImportCommand.class })
25+
@Command(name = "riot", subcommands = { Export.class, Import.class })
4426
public class RiotApplication extends BaseCommand implements CommandLineRunner {
4527

28+
public static void main(String[] args) {
29+
SpringApplication.run(RiotApplication.class, args);
30+
}
31+
4632
public static final String DEFAULT_HOST = "localhost";
4733

4834
public enum RedisDriver {
@@ -54,34 +40,26 @@ public enum RedisDriver {
5440
*/
5541
@Option(names = "--spring.output.ansi.enabled", hidden = true)
5642
private String ansiEnabled;
57-
@Option(names = "--max", description = "Maximum number of items to read.", paramLabel = "<count>")
58-
private Integer maxCount;
59-
@Option(names = "--threads", description = "Number of partitions to use for processing. (default: ${DEFAULT-VALUE}).")
60-
private int threads = 1;
61-
@Option(names = "--chunk-size", description = "The chunk size commit interval. (default: ${DEFAULT-VALUE}).")
62-
private int chunkSize = JobBuilder.DEFAULT_CHUNK_SIZE;
63-
@Option(names = "--sleep", description = "Sleep duration in milliseconds between each read.")
64-
private Long sleep;
43+
44+
@Option(names = "--driver", description = "Redis driver: ${COMPLETION-CANDIDATES}. (default: ${DEFAULT-VALUE})")
45+
@Getter
46+
private RedisDriver driver = RedisDriver.Jedis;
6547
@Option(names = "--host", description = "Redis server host. (default: localhost).")
6648
private InetAddress host;
67-
@Getter
6849
@Option(names = "--port", description = "Redis server port. (default: ${DEFAULT-VALUE}).")
6950
private int port = RedisURI.DEFAULT_REDIS_PORT;
7051
@Option(names = "--command-timeout", description = "Redis command timeout in seconds for synchronous command execution (default: ${DEFAULT-VALUE}).")
7152
private long commandTimeout = RedisURI.DEFAULT_TIMEOUT;
72-
@Getter
7353
@Option(names = "--connection-timeout", description = "Redis connect timeout in milliseconds. (default: ${DEFAULT-VALUE}).")
7454
private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
7555
@Option(names = "--socket-timeout", description = "Redis socket timeout in milliseconds. (default: ${DEFAULT-VALUE}).")
7656
private int socketTimeout = Protocol.DEFAULT_TIMEOUT;
77-
@Getter
7857
@Option(names = "--password", description = "Redis database password.", interactive = true)
79-
protected String password;
58+
private String password;
8059
@Option(names = "--max-idle", description = "Maximum number of idle connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (default: ${DEFAULT-VALUE}).")
8160
private int maxIdle = 8;
8261
@Option(names = "--min-idle", description = "Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if it is positive. (default: ${DEFAULT-VALUE}).")
8362
private int minIdle = 0;
84-
@Getter
8563
@Option(names = "--max-total", description = "Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (default: ${DEFAULT-VALUE})")
8664
private int maxTotal = 8;
8765
@Option(names = "--max-wait", description = "Maximum amount of time in milliseconds a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely (default).")
@@ -90,25 +68,6 @@ public enum RedisDriver {
9068
private int database = 0;
9169
@Option(names = "--client-name", description = "Redis client name.")
9270
private String clientName;
93-
@Getter
94-
@Option(names = "--driver", description = "Redis driver: ${COMPLETION-CANDIDATES}. (default: ${DEFAULT-VALUE})")
95-
private RedisDriver driver = RedisDriver.Jedis;
96-
97-
public static void main(String[] args) {
98-
SpringApplication.run(RiotApplication.class, args);
99-
}
100-
101-
@Override
102-
public void run(String... args) {
103-
CommandLine commandLine = new CommandLine(this);
104-
commandLine.registerConverter(Locale.class, s -> new Locale.Builder().setLanguageTag(s).build());
105-
commandLine.setCaseInsensitiveEnumValuesAllowed(true);
106-
RunLast handler = new RunLast();
107-
handler.useOut(System.out);
108-
DefaultExceptionHandler<List<Object>> exceptionHandler = CommandLine.defaultExceptionHandler();
109-
exceptionHandler.useErr(System.err);
110-
commandLine.parseWithHandlers(handler, exceptionHandler, args);
111-
}
11271

11372
public RedisConnectionBuilder redisConnectionBuilder() {
11473
RedisConnectionBuilder builder = new RedisConnectionBuilder();
@@ -127,52 +86,23 @@ public RedisConnectionBuilder redisConnectionBuilder() {
12786
return builder;
12887
}
12988

130-
public String getHostname() {
89+
private String getHostname() {
13190
if (host != null) {
13291
return host.getHostName();
13392
}
13493
return DEFAULT_HOST;
13594
}
13695

137-
public <I, O> ExitStatus call(AbstractItemCountingItemStreamItemReader<I> reader, ItemProcessor<I, O> processor,
138-
ItemStreamWriter<O> writer) throws Exception {
139-
PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
140-
MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(transactionManager);
141-
jobRepositoryFactory.afterPropertiesSet();
142-
JobRepository jobRepository = jobRepositoryFactory.getObject();
143-
JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
144-
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
145-
JobBuilder<I, O> builder = new JobBuilder<>(jobBuilderFactory, stepBuilderFactory);
146-
builder.setChunkSize(chunkSize);
147-
if (maxCount != null) {
148-
reader.setMaxItemCount(maxCount);
149-
}
150-
builder.setReader(reader);
151-
builder.setProcessor(processor);
152-
builder.setWriter(writer);
153-
builder.setPartitions(threads);
154-
builder.setSleep(sleep);
155-
Job job = builder.build();
156-
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
157-
jobLauncher.setJobRepository(jobRepository);
158-
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
159-
jobLauncher.afterPropertiesSet();
160-
long startTime = System.currentTimeMillis();
161-
System.out.println("Running job");
162-
JobExecution execution = jobLauncher.run(job, new JobParameters());
163-
if (execution.getExitStatus().equals(ExitStatus.FAILED)) {
164-
execution.getAllFailureExceptions().forEach(e -> e.printStackTrace());
165-
} else if (execution.getExitStatus().equals(ExitStatus.COMPLETED)) {
166-
long endTime = System.currentTimeMillis();
167-
long duration = endTime - startTime;
168-
NumberFormat numberFormat = NumberFormat.getIntegerInstance();
169-
double durationInSeconds = (double) duration / 1000;
170-
int writeCount = execution.getStepExecutions().iterator().next().getWriteCount();
171-
double throughput = writeCount / durationInSeconds;
172-
System.out.println("Processed " + numberFormat.format(writeCount) + " items in " + durationInSeconds
173-
+ " seconds (" + numberFormat.format(throughput) + " writes/sec)");
174-
}
175-
return execution.getExitStatus();
96+
@Override
97+
public void run(String... args) {
98+
CommandLine commandLine = new CommandLine(this);
99+
commandLine.registerConverter(Locale.class, s -> new Locale.Builder().setLanguageTag(s).build());
100+
commandLine.setCaseInsensitiveEnumValuesAllowed(true);
101+
RunLast handler = new RunLast();
102+
handler.useOut(System.out);
103+
DefaultExceptionHandler<List<Object>> exceptionHandler = CommandLine.defaultExceptionHandler();
104+
exceptionHandler.useErr(System.err);
105+
commandLine.parseWithHandlers(handler, exceptionHandler, args);
176106
}
177107

178108
}

src/main/java/com/redislabs/riot/batch/JobBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import org.springframework.batch.item.ItemProcessor;
1010
import org.springframework.batch.item.ItemReader;
1111
import org.springframework.batch.item.ItemStreamReader;
12-
import org.springframework.batch.item.ItemStreamWriter;
12+
import org.springframework.batch.item.ItemWriter;
1313
import org.springframework.core.task.SimpleAsyncTaskExecutor;
1414

1515
import lombok.Setter;
@@ -28,7 +28,7 @@ public class JobBuilder<I, O> {
2828
@Setter
2929
private ItemStreamReader<I> reader;
3030
@Setter
31-
private ItemStreamWriter<O> writer;
31+
private ItemWriter<O> writer;
3232
@Setter
3333
private ItemProcessor<I, O> processor;
3434
private JobBuilderFactory jobBuilderFactory;

src/main/java/com/redislabs/riot/cli/BaseCommand.java

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
2+
package com.redislabs.riot.cli;
3+
4+
import com.redislabs.riot.RiotApplication.RedisDriver;
5+
import com.redislabs.riot.redis.writer.JedisCommands;
6+
import com.redislabs.riot.redis.writer.LettuceCommands;
7+
import com.redislabs.riot.redis.writer.RedisCommands;
8+
9+
import lombok.Setter;
10+
11+
public class CommandsBuilder {
12+
13+
@Setter
14+
private RedisDriver driver;
15+
16+
protected RedisCommands build() {
17+
switch (driver) {
18+
case Lettuce:
19+
return lettuceCommands();
20+
default:
21+
return jedisCommands();
22+
}
23+
}
24+
25+
protected RedisCommands jedisCommands() {
26+
return new JedisCommands();
27+
}
28+
29+
protected RedisCommands lettuceCommands() {
30+
return new LettuceCommands();
31+
}
32+
33+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.redislabs.riot.cli;
2+
3+
import javax.sql.DataSource;
4+
5+
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
6+
7+
import com.zaxxer.hikari.HikariDataSource;
8+
9+
import lombok.Data;
10+
import picocli.CommandLine.Option;
11+
12+
@Data
13+
public class DatabaseConnectionOptions {
14+
15+
@Option(names = "--driver", description = "Fully qualified name of the JDBC driver. Auto-detected based on the URL by default.")
16+
private String driverClassName;
17+
@Option(names = "--url", description = "JDBC URL of the database.", required = true)
18+
private String url;
19+
@Option(names = "--username", description = "Login username of the database.")
20+
private String username;
21+
@Option(names = "--password", description = "Login password of the database.", interactive = true)
22+
private String password;
23+
24+
public DataSource dataSource() {
25+
DataSourceProperties properties = new DataSourceProperties();
26+
properties.setUrl(url);
27+
properties.setDriverClassName(driverClassName);
28+
properties.setUsername(username);
29+
properties.setPassword(password);
30+
return properties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
31+
}
32+
33+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.redislabs.riot.cli;
2+
3+
import java.util.Map;
4+
5+
import org.springframework.batch.item.database.JdbcBatchItemWriter;
6+
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
7+
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
8+
9+
import picocli.CommandLine.Command;
10+
import picocli.CommandLine.Mixin;
11+
import picocli.CommandLine.Option;
12+
13+
@Command(name = "db", description = "Export to a database")
14+
public class DatabaseExport extends ExportSub {
15+
16+
@Mixin
17+
private DatabaseConnectionOptions connection = new DatabaseConnectionOptions();
18+
19+
@Option(names = "--sql", description = "Insert query, e.g. \"INSERT INTO people (id, name) VALUES (:id, :name)\"", required = true)
20+
private String sql;
21+
22+
@Override
23+
protected JdbcBatchItemWriter<Map<String, Object>> writer() {
24+
JdbcBatchItemWriterBuilder<Map<String, Object>> builder = new JdbcBatchItemWriterBuilder<Map<String, Object>>();
25+
builder.itemSqlParameterSourceProvider(MapSqlParameterSource::new);
26+
builder.dataSource(connection.dataSource());
27+
builder.sql(sql);
28+
JdbcBatchItemWriter<Map<String, Object>> writer = builder.build();
29+
writer.afterPropertiesSet();
30+
return writer;
31+
}
32+
33+
}

0 commit comments

Comments
 (0)