Skip to content

Commit 3c207d2

Browse files
committed
added support for lettuce reactive api
1 parent de53f02 commit 3c207d2

27 files changed

+470
-109
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package com.redislabs.riot.cli;
22

33
public enum RedisDriver {
4-
Jedis, Lettuce
4+
Jedis, LettuceAsync, LettuceReactive
55
}

src/main/java/com/redislabs/riot/cli/RootCommand.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ public class RootCommand extends AbstractCommand {
3131
*/
3232
@Option(names = "--spring.output.ansi.enabled", hidden = true)
3333
private String ansiEnabled;
34-
@Getter
3534
@Option(names = "--host", description = "Redis server host. (default: localhost).")
3635
private InetAddress host;
3736
@Option(names = "--port", description = "Redis server port. (default: ${DEFAULT-VALUE}).")
@@ -56,6 +55,9 @@ public class RootCommand extends AbstractCommand {
5655
private int database = 0;
5756
@Option(names = "--client-name", description = "Redis client name.")
5857
private String clientName;
58+
@Getter
59+
@Option(names = "--driver", description = "Redis driver: ${COMPLETION-CANDIDATES}. (default: ${DEFAULT-VALUE})")
60+
private RedisDriver driver = RedisDriver.Jedis;
5961

6062
public String getHostname() {
6163
if (host != null) {

src/main/java/com/redislabs/riot/cli/in/AbstractRedisImportWriterCommand.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,26 @@
44

55
import org.springframework.batch.item.ItemWriter;
66

7-
import com.redislabs.riot.cli.RedisDriver;
7+
import com.redislabs.lettusearch.RediSearchClient;
88
import com.redislabs.riot.redis.writer.AbstractRedisDataStructureItemWriter;
99
import com.redislabs.riot.redis.writer.JedisWriter;
10-
import com.redislabs.riot.redis.writer.LettuceWriter;
11-
12-
import picocli.CommandLine.Option;
10+
import com.redislabs.riot.redis.writer.LettuceAsyncWriter;
11+
import com.redislabs.riot.redis.writer.LettuceReactiveWriter;
1312

1413
public abstract class AbstractRedisImportWriterCommand extends AbstractImportWriterCommand {
1514

16-
@Option(names = "--driver", description = "Redis driver: ${COMPLETION-CANDIDATES}. (default: ${DEFAULT-VALUE})")
17-
private RedisDriver driver = RedisDriver.Jedis;
18-
1915
@Override
2016
protected ItemWriter<Map<String, Object>> writer() {
2117
AbstractRedisDataStructureItemWriter itemWriter = redisItemWriter();
2218
itemWriter.setConverter(redisConverter());
23-
switch (driver) {
24-
case Jedis:
25-
return new JedisWriter(getRoot().jedisPool(), itemWriter);
19+
switch (getRoot().getDriver()) {
20+
case LettuceAsync:
21+
return new LettuceAsyncWriter(getRoot().lettucePool(), itemWriter);
22+
case LettuceReactive:
23+
RediSearchClient client = getRoot().lettuceClient();
24+
return new LettuceReactiveWriter(client.connect(), itemWriter);
2625
default:
27-
return new LettuceWriter(getRoot().lettucePool(), itemWriter);
26+
return new JedisWriter(getRoot().jedisPool(), itemWriter);
2827
}
2928
}
3029

src/main/java/com/redislabs/riot/cli/in/redis/AbstractRediSearchImport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import org.springframework.batch.item.ItemWriter;
66

77
import com.redislabs.riot.cli.in.AbstractImportWriterCommand;
8-
import com.redislabs.riot.redis.writer.LettuceWriter;
8+
import com.redislabs.riot.redis.writer.LettuceAsyncWriter;
99
import com.redislabs.riot.redis.writer.search.AbstractLettuSearchItemWriter;
1010

1111
import lombok.Getter;
@@ -30,7 +30,7 @@ protected String getTargetDescription() {
3030

3131
@Override
3232
protected ItemWriter<Map<String, Object>> writer() {
33-
return new LettuceWriter(getRoot().lettucePool(), lettuceItemWriter());
33+
return new LettuceAsyncWriter(getRoot().lettucePool(), lettuceItemWriter());
3434
}
3535

3636
protected abstract ItemWriter<Map<String, Object>> jedisSearchWriter();

src/main/java/com/redislabs/riot/cli/in/redis/SearchImport.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import com.redislabs.lettusearch.search.AddOptions;
88
import com.redislabs.lettusearch.search.Language;
9+
import com.redislabs.riot.redis.writer.search.AbstractLettuSearchItemWriter;
910
import com.redislabs.riot.redis.writer.search.JedisSearchWriter;
11+
import com.redislabs.riot.redis.writer.search.LettuSearchAddPayloadWriter;
1012
import com.redislabs.riot.redis.writer.search.LettuSearchAddWriter;
1113

1214
import io.redisearch.client.AddOptions.ReplacementPolicy;
@@ -29,20 +31,27 @@ public class SearchImport extends AbstractRediSearchImport {
2931
@Option(names = "--if", description = "Applicable only in conjunction with REPLACE and optionally PARTIAL. Update the document only if a boolean expression applies to the document before the update.")
3032
private String ifCondition;
3133
@Option(names = "--score", description = "Name of the field to use for scores.")
32-
private String score;
34+
private String scoreField;
3335
@Option(names = "--default-score", description = "Default score to use when score field is not present. (default: ${DEFAULT-VALUE}).")
3436
private double defaultScore = 1d;
3537
@Option(names = "--payload", description = "Name of the field containing the payload")
36-
private String payload;
38+
private String payloadField;
3739

3840
@Override
39-
protected LettuSearchAddWriter rediSearchItemWriter() {
40-
LettuSearchAddWriter writer = new LettuSearchAddWriter();
41+
protected AbstractLettuSearchItemWriter rediSearchItemWriter() {
42+
AddOptions options = AddOptions.builder().ifCondition(ifCondition).language(language).noSave(noSave)
43+
.replace(replace).replacePartial(partial).build();
44+
if (payloadField == null) {
45+
LettuSearchAddWriter writer = new LettuSearchAddWriter();
46+
writer.setDefaultScore(defaultScore);
47+
writer.setOptions(options);
48+
writer.setScoreField(scoreField);
49+
return writer;
50+
}
51+
LettuSearchAddPayloadWriter writer = new LettuSearchAddPayloadWriter();
4152
writer.setDefaultScore(defaultScore);
42-
writer.setOptions(AddOptions.builder().ifCondition(ifCondition).language(language).noSave(noSave)
43-
.replace(replace).replacePartial(partial).build());
44-
writer.setPayloadField(payload);
45-
writer.setScoreField(score);
53+
writer.setOptions(options);
54+
writer.setScoreField(scoreField);
4655
return writer;
4756
}
4857

@@ -53,8 +62,8 @@ protected ItemWriter<Map<String, Object>> jedisSearchWriter() {
5362
writer.setClient(new Client(getIndex(), pool));
5463
writer.setConverter(redisConverter());
5564
writer.setDefaultScore((float) defaultScore);
56-
writer.setScoreField(score);
57-
writer.setPayloadField(payload);
65+
writer.setScoreField(scoreField);
66+
writer.setPayloadField(payloadField);
5867
io.redisearch.client.AddOptions options = new io.redisearch.client.AddOptions();
5968
if (language != null) {
6069
options.setLanguage(language.name());

src/main/java/com/redislabs/riot/cli/in/redis/StreamImport.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package com.redislabs.riot.cli.in.redis;
22

33
import com.redislabs.riot.cli.in.AbstractRedisImportWriterCommand;
4+
import com.redislabs.riot.redis.writer.AbstractRedisDataStructureItemWriter;
5+
import com.redislabs.riot.redis.writer.StreamIdMaxlenWriter;
6+
import com.redislabs.riot.redis.writer.StreamIdWriter;
7+
import com.redislabs.riot.redis.writer.StreamMaxlenWriter;
48
import com.redislabs.riot.redis.writer.StreamWriter;
59

610
import picocli.CommandLine.Command;
@@ -18,7 +22,7 @@ public class StreamImport extends AbstractRedisImportWriterCommand {
1822
@Option(names = "--maxlen", description = "Limit stream to maxlen entries.")
1923
private Long maxlen;
2024
@Option(names = "--id", description = "Field used for stream entry IDs.")
21-
private String id;
25+
private String idField;
2226

2327
@Override
2428
protected String getKeyspace() {
@@ -31,10 +35,24 @@ protected String[] getKeys() {
3135
}
3236

3337
@Override
34-
protected StreamWriter redisItemWriter() {
35-
StreamWriter writer = new StreamWriter();
38+
protected AbstractRedisDataStructureItemWriter redisItemWriter() {
39+
if (idField == null) {
40+
if (maxlen == null) {
41+
return new StreamWriter();
42+
}
43+
StreamMaxlenWriter writer = new StreamMaxlenWriter();
44+
writer.setMaxlen(maxlen);
45+
writer.setApproximateTrimming(approximateTrimming);
46+
return writer;
47+
}
48+
if (maxlen == null) {
49+
StreamIdWriter writer = new StreamIdWriter();
50+
writer.setIdField(idField);
51+
return writer;
52+
}
53+
StreamIdMaxlenWriter writer = new StreamIdMaxlenWriter();
3654
writer.setApproximateTrimming(approximateTrimming);
37-
writer.setIdField(id);
55+
writer.setIdField(idField);
3856
writer.setMaxlen(maxlen);
3957
return writer;
4058
}

src/main/java/com/redislabs/riot/cli/in/redis/SuggestImport.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.springframework.batch.item.ItemWriter;
66

77
import com.redislabs.riot.redis.writer.search.AbstractLettuSearchItemWriter;
8+
import com.redislabs.riot.redis.writer.search.LettuSearchSuggestPayloadWriter;
89
import com.redislabs.riot.redis.writer.search.LettuSearchSuggestWriter;
910

1011
import picocli.CommandLine.Command;
@@ -14,24 +15,32 @@
1415
public class SuggestImport extends AbstractRediSearchImport {
1516

1617
@Option(names = "--score", description = "Name of the field to use for scores.")
17-
private String score;
18+
private String scoreField;
1819
@Option(names = "--default-score", description = "Default score to use when score field is not present. (default: ${DEFAULT-VALUE}).")
1920
private Double defaultScore = 1d;
2021
@Option(names = "--increment", description = "Use increment to set value")
2122
private boolean increment;
2223
@Option(names = "--suggest", description = "Name of the field containing the suggestion")
23-
private String suggest;
24+
private String suggestField;
2425
@Option(names = "--payload", description = "Name of the field containing the payload")
25-
private String payload;
26+
private String payloadField;
2627

2728
@Override
2829
protected AbstractLettuSearchItemWriter rediSearchItemWriter() {
29-
LettuSearchSuggestWriter writer = new LettuSearchSuggestWriter();
30+
if (payloadField == null) {
31+
LettuSearchSuggestWriter writer = new LettuSearchSuggestWriter();
32+
writer.setDefaultScore(defaultScore);
33+
writer.setField(suggestField);
34+
writer.setIncrement(increment);
35+
writer.setScoreField(scoreField);
36+
return writer;
37+
}
38+
LettuSearchSuggestPayloadWriter writer = new LettuSearchSuggestPayloadWriter();
3039
writer.setDefaultScore(defaultScore);
31-
writer.setField(suggest);
40+
writer.setField(suggestField);
3241
writer.setIncrement(increment);
33-
writer.setPayloadField(payload);
34-
writer.setScoreField(score);
42+
writer.setPayloadField(payloadField);
43+
writer.setScoreField(scoreField);
3544
return writer;
3645
}
3746

src/main/java/com/redislabs/riot/redis/writer/AbstractCollectionRedisItemWriter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
import io.lettuce.core.RedisFuture;
66
import io.lettuce.core.api.async.RedisAsyncCommands;
7+
import io.lettuce.core.api.reactive.RedisReactiveCommands;
78
import lombok.Setter;
9+
import reactor.core.publisher.Mono;
810
import redis.clients.jedis.Pipeline;
911
import redis.clients.jedis.Response;
1012

@@ -32,4 +34,12 @@ protected RedisFuture<?> write(RedisAsyncCommands<String, String> commands, Stri
3234
protected abstract RedisFuture<?> write(RedisAsyncCommands<String, String> commands, String key, String member,
3335
Map<String, Object> item);
3436

37+
@Override
38+
protected Mono<?> write(RedisReactiveCommands<String, String> commands, String key, Map<String, Object> item) {
39+
return write(commands, key, member(item), item);
40+
}
41+
42+
protected abstract Mono<?> write(RedisReactiveCommands<String, String> commands, String key, String member,
43+
Map<String, Object> item);
44+
3545
}

src/main/java/com/redislabs/riot/redis/writer/AbstractRedisDataStructureItemWriter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import io.lettuce.core.RedisFuture;
66
import io.lettuce.core.api.async.RedisAsyncCommands;
7+
import io.lettuce.core.api.reactive.RedisReactiveCommands;
8+
import reactor.core.publisher.Mono;
79
import redis.clients.jedis.Pipeline;
810
import redis.clients.jedis.Response;
911

@@ -25,4 +27,12 @@ public RedisFuture<?> write(RedisAsyncCommands<String, String> commands, Map<Str
2527
protected abstract RedisFuture<?> write(RedisAsyncCommands<String, String> commands, String key,
2628
Map<String, Object> item);
2729

30+
@Override
31+
public Mono<?> write(RedisReactiveCommands<String, String> commands, Map<String, Object> item) {
32+
return write(commands, key(item), item);
33+
}
34+
35+
protected abstract Mono<?> write(RedisReactiveCommands<String, String> commands, String key,
36+
Map<String, Object> item);
37+
2838
}

src/main/java/com/redislabs/riot/redis/writer/GeoWriter.java

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,46 +4,80 @@
44

55
import io.lettuce.core.RedisFuture;
66
import io.lettuce.core.api.async.RedisAsyncCommands;
7+
import io.lettuce.core.api.reactive.RedisReactiveCommands;
78
import lombok.Setter;
9+
import lombok.Value;
10+
import reactor.core.publisher.Mono;
811
import redis.clients.jedis.Pipeline;
912
import redis.clients.jedis.Response;
1013

1114
public class GeoWriter extends AbstractCollectionRedisItemWriter {
1215

16+
@Value
17+
private static class Point {
18+
private Double longitude;
19+
private Double latitude;
20+
21+
public boolean isInvalid() {
22+
return longitude == null || latitude == null;
23+
}
24+
25+
}
26+
1327
@Setter
1428
private String longitudeField;
1529
@Setter
1630
private String latitudeField;
1731

1832
@Override
1933
protected Response<Long> write(Pipeline pipeline, String key, String member, Map<String, Object> item) {
20-
Object longitude = item.get(longitudeField);
21-
if (longitude == null || longitude.equals("")) {
34+
Point point = point(item);
35+
if (point.isInvalid()) {
2236
return null;
2337
}
24-
Object latitude = item.get(latitudeField);
25-
if (latitude == null || latitude.equals("")) {
26-
return null;
38+
return pipeline.geoadd(key, point.getLongitude(), point.getLatitude(), member);
39+
}
40+
41+
private Point point(Map<String, Object> item) {
42+
return new Point(longitude(item), latitude(item));
43+
}
44+
45+
private Double coordinate(Map<String, Object> item, String field) {
46+
if (item.containsKey(field)) {
47+
Object value = item.get(field);
48+
if (value != null && !"".equals(value)) {
49+
return convert(value, Double.class);
50+
}
2751
}
28-
double lon = convert(longitude, Double.class);
29-
double lat = convert(latitude, Double.class);
30-
return pipeline.geoadd(key, lon, lat, member);
52+
return null;
53+
}
54+
55+
private Double longitude(Map<String, Object> item) {
56+
return coordinate(item, longitudeField);
57+
}
58+
59+
private Double latitude(Map<String, Object> item) {
60+
return coordinate(item, latitudeField);
3161
}
3262

3363
@Override
3464
protected RedisFuture<?> write(RedisAsyncCommands<String, String> commands, String key, String member,
3565
Map<String, Object> item) {
36-
Object longitude = item.get(longitudeField);
37-
if (longitude == null || longitude.equals("")) {
66+
Point point = point(item);
67+
if (point.isInvalid()) {
3868
return null;
3969
}
40-
Object latitude = item.get(latitudeField);
41-
if (latitude == null || latitude.equals("")) {
70+
return commands.geoadd(key, point.getLongitude(), point.getLatitude(), member);
71+
}
72+
73+
@Override
74+
protected Mono<?> write(RedisReactiveCommands<String, String> commands, String key, String member,
75+
Map<String, Object> item) {
76+
Point point = point(item);
77+
if (point.isInvalid()) {
4278
return null;
4379
}
44-
double lon = convert(longitude, Double.class);
45-
double lat = convert(latitude, Double.class);
46-
return commands.geoadd(key, lon, lat, member);
80+
return commands.geoadd(key, point.getLongitude(), point.getLatitude(), member);
4781
}
4882

4983
}

src/main/java/com/redislabs/riot/redis/writer/HashWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import io.lettuce.core.RedisFuture;
66
import io.lettuce.core.api.async.RedisAsyncCommands;
7+
import io.lettuce.core.api.reactive.RedisReactiveCommands;
8+
import reactor.core.publisher.Mono;
79
import redis.clients.jedis.Pipeline;
810
import redis.clients.jedis.Response;
911

@@ -19,4 +21,9 @@ protected RedisFuture<?> write(RedisAsyncCommands<String, String> commands, Stri
1921
return commands.hmset(key, stringMap(item));
2022
}
2123

24+
@Override
25+
protected Mono<String> write(RedisReactiveCommands<String, String> commands, String key, Map<String, Object> item) {
26+
return commands.hmset(key, stringMap(item));
27+
}
28+
2229
}

0 commit comments

Comments
 (0)