Skip to content

Commit a14af81

Browse files
committed
refactored commands
1 parent 30b42dc commit a14af81

29 files changed

+364
-447
lines changed

README.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ Download the https://github.com/Redislabs-Solution-Architects/riot/releases/late
2323
Use the `riot` script (`riot.bat` for Windows) to launch RIOT.
2424

2525
== Usage
26-
Follow the usage help provided with the `--help` option.
26+
Follow the usage help provided with the `help` command:
27+
[source,shell]
28+
----
29+
riot help
30+
----
2731

2832
=== Databases
2933

pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
<groupId>com.redislabs</groupId>
1414
<artifactId>riot</artifactId>
15-
<version>0.2.4</version>
15+
<version>0.2.5</version>
1616
<packaging>jar</packaging>
1717
<name>RIOT</name>
1818
<description>Redis Input/Output Tool</description>
@@ -81,7 +81,6 @@
8181
<dependency>
8282
<groupId>junit</groupId>
8383
<artifactId>junit</artifactId>
84-
<version>4.12</version>
8584
<scope>test</scope>
8685
</dependency>
8786
<dependency>

src/main/java/com/redislabs/riot/RiotApplication.java

Lines changed: 133 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,98 @@
11
package com.redislabs.riot;
22

3+
import java.net.InetAddress;
4+
import java.text.NumberFormat;
35
import java.util.List;
46
import java.util.Locale;
57

8+
import org.springframework.batch.core.ExitStatus;
9+
import org.springframework.batch.core.Job;
10+
import org.springframework.batch.core.JobExecution;
11+
import org.springframework.batch.core.JobParameters;
12+
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
13+
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
14+
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
15+
import org.springframework.batch.core.repository.JobRepository;
16+
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
17+
import org.springframework.batch.item.ItemProcessor;
18+
import org.springframework.batch.item.ItemStreamWriter;
19+
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
20+
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
621
import org.springframework.boot.CommandLineRunner;
722
import org.springframework.boot.SpringApplication;
823
import org.springframework.boot.autoconfigure.SpringBootApplication;
24+
import org.springframework.core.task.SyncTaskExecutor;
25+
import org.springframework.transaction.PlatformTransactionManager;
926

27+
import com.redislabs.riot.batch.JobBuilder;
1028
import com.redislabs.riot.cli.BaseCommand;
11-
import com.redislabs.riot.cli.in.ImportCommand;
29+
import com.redislabs.riot.cli.ImportCommand;
30+
import com.redislabs.riot.redis.RedisConnectionBuilder;
1231

32+
import io.lettuce.core.RedisURI;
33+
import lombok.Getter;
1334
import picocli.CommandLine;
1435
import picocli.CommandLine.Command;
1536
import picocli.CommandLine.DefaultExceptionHandler;
37+
import picocli.CommandLine.HelpCommand;
1638
import picocli.CommandLine.Option;
1739
import picocli.CommandLine.RunLast;
40+
import redis.clients.jedis.Protocol;
1841

1942
@SpringBootApplication
20-
@Command(name = "riot", subcommands = { ImportCommand.class })
43+
@Command(name = "riot", subcommands = { HelpCommand.class, ImportCommand.class })
2144
public class RiotApplication extends BaseCommand implements CommandLineRunner {
2245

46+
public static final String DEFAULT_HOST = "localhost";
47+
48+
public enum RedisDriver {
49+
Jedis, Lettuce
50+
}
51+
2352
/**
2453
* Just to avoid picocli complain in Eclipse console
2554
*/
2655
@Option(names = "--spring.output.ansi.enabled", hidden = true)
2756
private String ansiEnabled;
57+
@Option(names = "--max", description = "Maximum number of items to read.", paramLabel = "<count>")
58+
private Integer maxCount;
59+
@Option(names = "--threads", description = "Number of partitions to use for processing. (default: ${DEFAULT-VALUE}).")
60+
private int threads = 1;
61+
@Option(names = "--chunk-size", description = "The chunk size commit interval. (default: ${DEFAULT-VALUE}).")
62+
private int chunkSize = JobBuilder.DEFAULT_CHUNK_SIZE;
63+
@Option(names = "--sleep", description = "Sleep duration in milliseconds between each read.")
64+
private Long sleep;
65+
@Option(names = "--host", description = "Redis server host. (default: localhost).")
66+
private InetAddress host;
67+
@Getter
68+
@Option(names = "--port", description = "Redis server port. (default: ${DEFAULT-VALUE}).")
69+
private int port = RedisURI.DEFAULT_REDIS_PORT;
70+
@Option(names = "--command-timeout", description = "Redis command timeout in seconds for synchronous command execution (default: ${DEFAULT-VALUE}).")
71+
private long commandTimeout = RedisURI.DEFAULT_TIMEOUT;
72+
@Getter
73+
@Option(names = "--connection-timeout", description = "Redis connect timeout in milliseconds. (default: ${DEFAULT-VALUE}).")
74+
private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
75+
@Option(names = "--socket-timeout", description = "Redis socket timeout in milliseconds. (default: ${DEFAULT-VALUE}).")
76+
private int socketTimeout = Protocol.DEFAULT_TIMEOUT;
77+
@Getter
78+
@Option(names = "--password", description = "Redis database password.", interactive = true)
79+
protected String password;
80+
@Option(names = "--max-idle", description = "Maximum number of idle connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (default: ${DEFAULT-VALUE}).")
81+
private int maxIdle = 8;
82+
@Option(names = "--min-idle", description = "Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if it is positive. (default: ${DEFAULT-VALUE}).")
83+
private int minIdle = 0;
84+
@Getter
85+
@Option(names = "--max-total", description = "Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (default: ${DEFAULT-VALUE})")
86+
private int maxTotal = 8;
87+
@Option(names = "--max-wait", description = "Maximum amount of time in milliseconds a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely (default).")
88+
private long maxWait = -1L;
89+
@Option(names = "--database", description = "Redis database number. Databases are only available for Redis Standalone and Redis Master/Slave. (default: ${DEFAULT-VALUE}).")
90+
private int database = 0;
91+
@Option(names = "--client-name", description = "Redis client name.")
92+
private String clientName;
93+
@Getter
94+
@Option(names = "--driver", description = "Redis driver: ${COMPLETION-CANDIDATES}. (default: ${DEFAULT-VALUE})")
95+
private RedisDriver driver = RedisDriver.Jedis;
2896

2997
public static void main(String[] args) {
3098
SpringApplication.run(RiotApplication.class, args);
@@ -42,9 +110,69 @@ public void run(String... args) {
42110
commandLine.parseWithHandlers(handler, exceptionHandler, args);
43111
}
44112

45-
@Override
46-
public void run() {
47-
CommandLine.usage(this, System.out);
113+
public RedisConnectionBuilder redisConnectionBuilder() {
114+
RedisConnectionBuilder builder = new RedisConnectionBuilder();
115+
builder.setClientName(clientName);
116+
builder.setCommandTimeout(commandTimeout);
117+
builder.setConnectionTimeout(connectionTimeout);
118+
builder.setDatabase(database);
119+
builder.setHost(getHostname());
120+
builder.setMaxTotal(maxTotal);
121+
builder.setMaxIdle(maxIdle);
122+
builder.setMaxWait(maxWait);
123+
builder.setMinIdle(minIdle);
124+
builder.setPassword(password);
125+
builder.setPort(port);
126+
builder.setSocketTimeout(socketTimeout);
127+
return builder;
128+
}
129+
130+
public String getHostname() {
131+
if (host != null) {
132+
return host.getHostName();
133+
}
134+
return DEFAULT_HOST;
135+
}
136+
137+
public <I, O> ExitStatus call(AbstractItemCountingItemStreamItemReader<I> reader, ItemProcessor<I, O> processor,
138+
ItemStreamWriter<O> writer) throws Exception {
139+
PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
140+
MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(transactionManager);
141+
jobRepositoryFactory.afterPropertiesSet();
142+
JobRepository jobRepository = jobRepositoryFactory.getObject();
143+
JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
144+
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
145+
JobBuilder<I, O> builder = new JobBuilder<>(jobBuilderFactory, stepBuilderFactory);
146+
builder.setChunkSize(chunkSize);
147+
if (maxCount != null) {
148+
reader.setMaxItemCount(maxCount);
149+
}
150+
builder.setReader(reader);
151+
builder.setProcessor(processor);
152+
builder.setWriter(writer);
153+
builder.setPartitions(threads);
154+
builder.setSleep(sleep);
155+
Job job = builder.build();
156+
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
157+
jobLauncher.setJobRepository(jobRepository);
158+
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
159+
jobLauncher.afterPropertiesSet();
160+
long startTime = System.currentTimeMillis();
161+
System.out.println("Running job");
162+
JobExecution execution = jobLauncher.run(job, new JobParameters());
163+
if (execution.getExitStatus().equals(ExitStatus.FAILED)) {
164+
execution.getAllFailureExceptions().forEach(e -> e.printStackTrace());
165+
} else if (execution.getExitStatus().equals(ExitStatus.COMPLETED)) {
166+
long endTime = System.currentTimeMillis();
167+
long duration = endTime - startTime;
168+
NumberFormat numberFormat = NumberFormat.getIntegerInstance();
169+
double durationInSeconds = (double) duration / 1000;
170+
int writeCount = execution.getStepExecutions().iterator().next().getWriteCount();
171+
double throughput = writeCount / durationInSeconds;
172+
System.out.println("Processed " + numberFormat.format(writeCount) + " items in " + durationInSeconds
173+
+ " seconds (" + numberFormat.format(throughput) + " writes/sec)");
174+
}
175+
return execution.getExitStatus();
48176
}
49177

50178
}

src/main/java/com/redislabs/riot/batch/JobBuilder.java

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,13 @@
44
import org.springframework.batch.core.Step;
55
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
66
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
7-
import org.springframework.batch.core.repository.JobRepository;
8-
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
97
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
108
import org.springframework.batch.core.step.tasklet.TaskletStep;
119
import org.springframework.batch.item.ItemProcessor;
1210
import org.springframework.batch.item.ItemReader;
1311
import org.springframework.batch.item.ItemStreamReader;
1412
import org.springframework.batch.item.ItemStreamWriter;
15-
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
1613
import org.springframework.core.task.SimpleAsyncTaskExecutor;
17-
import org.springframework.transaction.PlatformTransactionManager;
1814

1915
import lombok.Setter;
2016

@@ -35,15 +31,16 @@ public class JobBuilder<I, O> {
3531
private ItemStreamWriter<O> writer;
3632
@Setter
3733
private ItemProcessor<I, O> processor;
38-
39-
private PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
4034
private JobBuilderFactory jobBuilderFactory;
41-
private MapJobRepositoryFactoryBean jobRepositoryFactory;
42-
private JobRepository jobRepository;
4335
private StepBuilderFactory stepBuilderFactory;
4436

37+
public JobBuilder(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
38+
this.jobBuilderFactory = jobBuilderFactory;
39+
this.stepBuilderFactory = stepBuilderFactory;
40+
}
41+
4542
private TaskletStep taskletStep() throws Exception {
46-
SimpleStepBuilder<I, O> builder = stepBuilderFactory().get("tasklet-step").<I, O>chunk(chunkSize);
43+
SimpleStepBuilder<I, O> builder = stepBuilderFactory.get("tasklet-step").<I, O>chunk(chunkSize);
4744
builder.reader(reader());
4845
if (processor != null) {
4946
builder.processor(processor);
@@ -53,13 +50,6 @@ private TaskletStep taskletStep() throws Exception {
5350
return builder.build();
5451
}
5552

56-
private StepBuilderFactory stepBuilderFactory() throws Exception {
57-
if (stepBuilderFactory == null) {
58-
stepBuilderFactory = new StepBuilderFactory(jobRepository(), transactionManager);
59-
}
60-
return stepBuilderFactory;
61-
}
62-
6353
private ItemReader<? extends I> reader() {
6454
if (sleep == null) {
6555
return reader;
@@ -69,7 +59,7 @@ private ItemReader<? extends I> reader() {
6959

7060
public Job build() throws Exception {
7161
TaskletStep taskletStep = taskletStep();
72-
return jobBuilderFactory().get("riot-job").start(step(taskletStep)).build();
62+
return jobBuilderFactory.get("riot-job").start(step(taskletStep)).build();
7363
}
7464

7565
private Step step(TaskletStep taskletStep) throws Exception {
@@ -78,30 +68,8 @@ private Step step(TaskletStep taskletStep) throws Exception {
7868
}
7969
IndexedPartitioner indexedPartitioner = new IndexedPartitioner(partitions);
8070
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
81-
return stepBuilderFactory().get("partitioner-step").partitioner("delegate-step", indexedPartitioner)
71+
return stepBuilderFactory.get("partitioner-step").partitioner("delegate-step", indexedPartitioner)
8272
.step(taskletStep).taskExecutor(taskExecutor).build();
8373
}
8474

85-
public JobBuilderFactory jobBuilderFactory() throws Exception {
86-
if (jobBuilderFactory == null) {
87-
jobBuilderFactory = new JobBuilderFactory(jobRepository());
88-
}
89-
return jobBuilderFactory;
90-
}
91-
92-
public JobRepository jobRepository() throws Exception {
93-
if (jobRepository == null) {
94-
jobRepository = jobRepositoryFactory().getObject();
95-
}
96-
return jobRepository;
97-
}
98-
99-
public MapJobRepositoryFactoryBean jobRepositoryFactory() throws Exception {
100-
if (jobRepositoryFactory == null) {
101-
jobRepositoryFactory = new MapJobRepositoryFactoryBean(transactionManager);
102-
jobRepositoryFactory.afterPropertiesSet();
103-
}
104-
return jobRepositoryFactory;
105-
}
106-
10775
}

0 commit comments

Comments
 (0)