Skip to content

Redis cluster support and timeout control on single query result #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@
| test dependencies
-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.7.0</version>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/org/mybatis/caches/redis/JDKSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;

import org.apache.ibatis.cache.CacheException;

Expand All @@ -31,21 +31,24 @@ private JDKSerializer() {
// prevent instantiation
}

public byte[] serialize(Object object) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
public byte[] serialize(long timestamp, Object object) {
try (
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(object);
baos.write(longToBytes(timestamp));
return baos.toByteArray();
} catch (Exception e) {
throw new CacheException(e);
}
}

public Object unserialize(byte[] bytes) {
if (bytes == null) {
if (bytes == null || bytes.length < 8) {
return null;
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
try (
ByteArrayInputStream bais = new ByteArrayInputStream(Arrays.copyOf(bytes, bytes.length - 8));
ObjectInputStream ois = new ObjectInputStream(bais)) {
return ois.readObject();
} catch (Exception e) {
Expand Down
19 changes: 11 additions & 8 deletions src/main/java/org/mybatis/caches/redis/KryoSerializer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,7 @@ private KryoSerializer() {
fallbackSerializer = JDKSerializer.INSTANCE;// use JDKSerializer as fallback
}

public byte[] serialize(Object object) {
public byte[] serialize(long timestamp, Object object) {
output.clear();
if (!unnormalClassSet.contains(object.getClass())) {
/**
Expand All @@ -66,30 +66,33 @@ public byte[] serialize(Object object) {
*/
try {
kryo.writeClassAndObject(output, object);
// Add 8 bytes of timestamp to the tail
output.write(longToBytes(timestamp));
return output.toBytes();
} catch (Exception e) {
// For unnormal class occurred for the first time, exception will be thrown
unnormalClassSet.add(object.getClass());
return fallbackSerializer.serialize(object);// use fallback Serializer to resolve
return fallbackSerializer.serialize(timestamp, object);// use fallback Serializer to resolve
}
} else {
// For unnormal class
return fallbackSerializer.serialize(object);
return fallbackSerializer.serialize(timestamp, object);
}
}

public Object unserialize(byte[] bytes) {
if (bytes == null) {
if (bytes == null || bytes.length < 8) {
return null;
}
int hashCode = Arrays.hashCode(bytes);
if (!unnormalBytesHashCodeSet.contains(hashCode)) {
byte[] stripped = Arrays.copyOf(bytes, bytes.length - 8);
int hashCode = Arrays.hashCode(stripped);
if (!unnormalBytesHashCodeSet.contains(stripped)) {
/**
* In the following cases: 1. This bytes occurs for the first time. 2. This bytes have occurred and can be
* resolved by default kryo serializer
*/
try {
input.setBuffer(bytes);
input.setBuffer(stripped);
return kryo.readClassAndObject(input);
} catch (Exception e) {
// For unnormal bytes occurred for the first time, exception will be thrown
Expand Down
92 changes: 35 additions & 57 deletions src/main/java/org/mybatis/caches/redis/RedisCache.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,8 @@

import org.apache.ibatis.cache.Cache;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import org.mybatis.caches.redis.client.GenericRedisClient;
import org.mybatis.caches.redis.client.RedisClientBuilder;

/**
* Cache adapter for Redis.
Expand All @@ -34,9 +34,9 @@ public final class RedisCache implements Cache {

private String id;

private static JedisPool pool;
private static GenericRedisClient client;

private final RedisConfig redisConfig;
private static RedisConfig redisConfig;

private Integer timeout;

Expand All @@ -45,20 +45,16 @@ public RedisCache(final String id) {
throw new IllegalArgumentException("Cache instances require an ID");
}
this.id = id;
redisConfig = RedisConfigurationBuilder.getInstance().parseConfiguration();
pool = new JedisPool(redisConfig, redisConfig.getHost(), redisConfig.getPort(), redisConfig.getConnectionTimeout(),
redisConfig.getSoTimeout(), redisConfig.getPassword(), redisConfig.getDatabase(), redisConfig.getClientName(),
redisConfig.isSsl(), redisConfig.getSslSocketFactory(), redisConfig.getSslParameters(),
redisConfig.getHostnameVerifier());
}

// TODO Review this is UNUSED
private Object execute(RedisCallback callback) {
Jedis jedis = pool.getResource();
try {
return callback.doWithRedis(jedis);
} finally {
jedis.close();
synchronized (RedisCache.class) {
if (client == null) {
redisConfig = RedisConfigurationBuilder.getInstance().parseConfiguration();
RedisClientBuilder builder = new RedisClientBuilder(redisConfig, redisConfig.getHosts(),
redisConfig.getConnectionTimeout(), redisConfig.getSoTimeout(), redisConfig.getMaxAttempts(),
redisConfig.getPassword(), redisConfig.getDatabase(), redisConfig.getClientName(), redisConfig.isSsl(),
redisConfig.getSslSocketFactory(), redisConfig.getSslParameters(), redisConfig.getHostnameVerifier(),
redisConfig.getHostAndPortMap());
client = builder.getClient();
}
}
}

Expand All @@ -69,60 +65,42 @@ public String getId() {

@Override
public int getSize() {
return (Integer) execute(new RedisCallback() {
@Override
public Object doWithRedis(Jedis jedis) {
Map<byte[], byte[]> result = jedis.hgetAll(id.getBytes());
return result.size();
}
});
Map<byte[], byte[]> result = client.hgetAll(id.getBytes());
return result.size();
}

@Override
public void putObject(final Object key, final Object value) {
execute(new RedisCallback() {
@Override
public Object doWithRedis(Jedis jedis) {
final byte[] idBytes = id.getBytes();
jedis.hset(idBytes, key.toString().getBytes(), redisConfig.getSerializer().serialize(value));
if (timeout != null && jedis.ttl(idBytes) == -1) {
jedis.expire(idBytes, timeout);
}
return null;
}
});
final byte[] idBytes = id.getBytes();
long ts = 0;
if (timeout != null) {
ts = System.currentTimeMillis() + timeout * 1000;
}
final byte[] objBytes = redisConfig.getSerializer().serialize(ts, value);
client.hset(idBytes, key.toString().getBytes(), objBytes);
}

@Override
public Object getObject(final Object key) {
return execute(new RedisCallback() {
@Override
public Object doWithRedis(Jedis jedis) {
return redisConfig.getSerializer().unserialize(jedis.hget(id.getBytes(), key.toString().getBytes()));
}
});
byte[] objBytes = client.hget(id.getBytes(), key.toString().getBytes());
if (objBytes == null || objBytes.length < 8) return null;
long ts = redisConfig.getSerializer().getTimestamp(objBytes);
if (ts > 0 && ts < System.currentTimeMillis()) {
client.hdel(id, key.toString());
return null;
} else {
return redisConfig.getSerializer().unserialize(objBytes);
}
}

@Override
public Object removeObject(final Object key) {
return execute(new RedisCallback() {
@Override
public Object doWithRedis(Jedis jedis) {
return jedis.hdel(id, key.toString());
}
});
return client.hdel(id, key.toString());
}

@Override
public void clear() {
execute(new RedisCallback() {
@Override
public Object doWithRedis(Jedis jedis) {
jedis.del(id);
return null;
}
});

client.del(id);
}

@Override
Expand Down
23 changes: 0 additions & 23 deletions src/main/java/org/mybatis/caches/redis/RedisCallback.java

This file was deleted.

35 changes: 21 additions & 14 deletions src/main/java/org/mybatis/caches/redis/RedisConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,22 +19,24 @@
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;

import redis.clients.jedis.JedisClusterHostAndPortMap;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;

public class RedisConfig extends JedisPoolConfig {

private String host = Protocol.DEFAULT_HOST;
private int port = Protocol.DEFAULT_PORT;
private String hosts = Protocol.DEFAULT_HOST + ':' + Protocol.DEFAULT_PORT;
private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
private int soTimeout = Protocol.DEFAULT_TIMEOUT;
private int maxAttempts = 5;
private String password;
private int database = Protocol.DEFAULT_DATABASE;
private String clientName;
private boolean ssl;
private SSLSocketFactory sslSocketFactory;
private SSLParameters sslParameters;
private HostnameVerifier hostnameVerifier;
private JedisClusterHostAndPortMap hostAndPortMap;
private Serializer serializer = JDKSerializer.INSTANCE;

public boolean isSsl() {
Expand Down Expand Up @@ -69,23 +71,20 @@ public void setHostnameVerifier(HostnameVerifier hostnameVerifier) {
this.hostnameVerifier = hostnameVerifier;
}

public String getHost() {
return host;
public JedisClusterHostAndPortMap getHostAndPortMap() {
return hostAndPortMap;
}

public void setHost(String host) {
if (host == null || "".equals(host)) {
host = Protocol.DEFAULT_HOST;
}
this.host = host;
public void setHostAndPortMap(JedisClusterHostAndPortMap hostAndPortMap) {
this.hostAndPortMap = hostAndPortMap;
}

public int getPort() {
return port;
public String getHosts() {
return hosts;
}

public void setPort(int port) {
this.port = port;
public void setHosts(String hosts) {
this.hosts = hosts;
}

public String getPassword() {
Expand Down Expand Up @@ -134,6 +133,14 @@ public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
}

public int getMaxAttempts() {
return maxAttempts;
}

public void setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
}

public Serializer getSerializer() {
return serializer;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -118,7 +118,8 @@ private void setConfigProperties(Properties properties, RedisConfig jedisConfig)
// Custom serializer is not supported yet.
throw new CacheException("Unknown serializer: '" + value + "'");
}
} else if (Arrays.asList("sslSocketFactory", "sslParameters", "hostnameVerifier").contains(name)) {
} else if (Arrays.asList("sslSocketFactory", "sslParameters", "hostnameVerifier", "hostAndPortMap")
.contains(name)) {
setInstance(metaCache, name, value);
} else if (metaCache.hasSetter(name)) {
Class<?> type = metaCache.getSetterType(name);
Expand Down
Loading