Skip to content

Commit 1903a49

Browse files
authored
Merge pull request #48 from KyberNetwork/reconnectable-redis
Implement reconnectable redis client
2 parents e527f78 + e41d0e9 commit 1903a49

File tree

5 files changed

+300
-1
lines changed

5 files changed

+300
-1
lines changed

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ toolchain go1.22.3
77
require (
88
github.com/KyberNetwork/kutils v0.3.2
99
github.com/KyberNetwork/kyber-trace-go v0.1.2
10+
github.com/alicebob/miniredis/v2 v2.33.0
1011
github.com/bufbuild/protovalidate-go v0.6.4
1112
github.com/cenkalti/backoff/v4 v4.3.0
1213
github.com/ethereum/go-ethereum v1.14.8
@@ -16,6 +17,7 @@ require (
1617
github.com/pkg/errors v0.9.1
1718
github.com/redis/go-redis/extra/redisotel/v9 v9.5.3
1819
github.com/redis/go-redis/v9 v9.6.1
20+
github.com/stretchr/testify v1.9.0
1921
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0
2022
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0
2123
go.opentelemetry.io/otel v1.28.0
@@ -31,6 +33,7 @@ require (
3133
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.34.2-20240717164558-a6c49f84cc0f.2 // indirect
3234
github.com/KyberNetwork/logger v0.1.0 // indirect
3335
github.com/Microsoft/go-winio v0.6.2 // indirect
36+
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
3437
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
3538
github.com/bits-and-blooms/bitset v1.13.0 // indirect
3639
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
@@ -75,6 +78,7 @@ require (
7578
github.com/tklauser/go-sysconf v0.3.14 // indirect
7679
github.com/tklauser/numcpus v0.8.0 // indirect
7780
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
81+
github.com/yuin/gopher-lua v1.1.1 // indirect
7882
github.com/yusufpapurcu/wmi v1.2.4 // indirect
7983
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 // indirect
8084
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.28.0 // indirect
@@ -94,5 +98,6 @@ require (
9498
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
9599
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
96100
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
101+
gopkg.in/yaml.v3 v3.0.1 // indirect
97102
rsc.io/tmplfunc v0.0.3 // indirect
98103
)

go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
1212
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
1313
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
1414
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
15+
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
16+
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
17+
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
18+
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
1519
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
1620
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
1721
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@@ -249,6 +253,8 @@ github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6S
249253
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
250254
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
251255
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
256+
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
257+
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
252258
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
253259
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
254260
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 h1:9G6E0TXzGFVfTnawRzrPl83iHOAV7L8NJiR8RSGYV1g=
@@ -373,6 +379,8 @@ google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjr
373379
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
374380
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
375381
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
382+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
383+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
376384
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
377385
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
378386
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=

pkg/client/redis.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/KyberNetwork/kutils/klog"
88
"github.com/KyberNetwork/kyber-trace-go/pkg/metric"
99
"github.com/KyberNetwork/kyber-trace-go/pkg/tracer"
10+
"github.com/KyberNetwork/service-framework/pkg/client/redis/reconnectable"
1011
"github.com/redis/go-redis/extra/redisotel/v9"
1112
"github.com/redis/go-redis/v9"
1213
)
@@ -48,7 +49,7 @@ func (*RedisCfg) OnUpdate(old, new *RedisCfg) {
4849

4950
func newRedisClient(opts *redis.UniversalOptions) redis.UniversalClient {
5051
if opts.MasterName == "" {
51-
return redis.NewUniversalClient(opts)
52+
return reconnectable.New(opts)
5253
}
5354
failoverOpts := opts.Failover()
5455
failoverOpts.RouteByLatency = opts.RouteByLatency
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package reconnectable
2+
3+
import (
4+
"context"
5+
"net"
6+
"strings"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/redis/go-redis/v9"
11+
)
12+
13+
type RedisClient struct {
14+
redis.UniversalClient
15+
16+
opts *redis.UniversalOptions
17+
18+
lastRefreshTime atomic.Value
19+
refreshCooldown time.Duration
20+
refreshInProgress atomic.Bool
21+
}
22+
23+
func New(cfg *redis.UniversalOptions) *RedisClient {
24+
rc := &RedisClient{
25+
opts: cfg,
26+
refreshCooldown: 10 * time.Second,
27+
}
28+
29+
rc.UniversalClient = redis.NewUniversalClient(cfg)
30+
rc.UniversalClient.AddHook(rc)
31+
32+
return rc
33+
}
34+
35+
func (r *RedisClient) canRefreshClient() bool {
36+
lastRefresh, ok := r.lastRefreshTime.Load().(time.Time)
37+
if !ok {
38+
return true
39+
}
40+
return time.Since(lastRefresh) >= r.refreshCooldown
41+
}
42+
43+
func (r *RedisClient) refreshClient() {
44+
if r.refreshInProgress.Load() || !r.refreshInProgress.CompareAndSwap(false, true) {
45+
return
46+
}
47+
48+
defer r.refreshInProgress.Store(false)
49+
50+
if !r.canRefreshClient() {
51+
return
52+
}
53+
54+
newClient := redis.NewUniversalClient(r.opts)
55+
56+
oldClient := r.UniversalClient
57+
r.UniversalClient = newClient
58+
r.UniversalClient.AddHook(r)
59+
r.lastRefreshTime.Store(time.Now())
60+
61+
go func() {
62+
if oldClient != nil {
63+
_ = oldClient.Close()
64+
}
65+
}()
66+
}
67+
68+
func shouldRefreshClient(err error) bool {
69+
return err != nil && strings.Contains(err.Error(), "connection refused")
70+
}
71+
72+
func (r *RedisClient) DialHook(next redis.DialHook) redis.DialHook {
73+
return func(ctx context.Context, network, addr string) (net.Conn, error) {
74+
return next(ctx, network, addr)
75+
}
76+
}
77+
78+
func (r *RedisClient) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
79+
return func(ctx context.Context, cmd redis.Cmder) error {
80+
err := next(ctx, cmd)
81+
if shouldRefreshClient(err) {
82+
r.refreshClient()
83+
}
84+
85+
return err
86+
}
87+
}
88+
89+
func (r *RedisClient) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
90+
return func(ctx context.Context, cmds []redis.Cmder) error {
91+
err := next(ctx, cmds)
92+
if shouldRefreshClient(err) {
93+
r.refreshClient()
94+
}
95+
96+
return err
97+
}
98+
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package reconnectable
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/alicebob/miniredis/v2"
11+
"github.com/redis/go-redis/v9"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestRefreshClientWhenServerCrash(t *testing.T) {
17+
// Step 1: Start the mock Redis server
18+
srv, err := miniredis.Run()
19+
if err != nil {
20+
t.Fatalf("could not start mock Redis server: %v", err)
21+
}
22+
defer srv.Close()
23+
24+
// Step 2: Create the Redis client with the mock server's address
25+
cfg := &redis.UniversalOptions{
26+
Addrs: []string{srv.Addr()},
27+
}
28+
rc := New(cfg)
29+
oldClient := rc.UniversalClient
30+
31+
// Step 3: Simulate server failure by stopping the Redis server
32+
srv.Close()
33+
34+
// Step 4: Perform concurrent Set operations that should fail due to the server being down
35+
var wg sync.WaitGroup
36+
for i := 1; i <= 100; i++ {
37+
wg.Add(1)
38+
go func(i int) {
39+
defer wg.Done()
40+
rc.Set(context.Background(), fmt.Sprint(i), "1", 0) // This should fail due to the server being down
41+
}(i)
42+
}
43+
wg.Wait()
44+
45+
// Step 5: Ensure the client was refreshed after the error (server down)
46+
newClient := rc.UniversalClient
47+
48+
// Check that the client refresh was triggered and the refresh time is updated
49+
require.NotNil(t, rc.lastRefreshTime)
50+
lastRefresh := rc.lastRefreshTime.Load().(time.Time)
51+
assert.True(t, time.Since(lastRefresh) < time.Second, "Refresh time should be updated")
52+
53+
// Verify that the client refresh process was completed (no ongoing refresh)
54+
assert.False(t, rc.refreshInProgress.Load())
55+
56+
// Ensure that a new client instance was created
57+
assert.NotEqual(t, oldClient, newClient, "UniversalClient should be updated")
58+
59+
// Step 6: Restart the Redis server to simulate recovery
60+
srv.Start()
61+
62+
// Step 7: Verify that the client can now successfully ping the server
63+
require.NoError(t, rc.Ping(context.Background()).Err(), "Ping should succeed after client refresh and server recovery")
64+
65+
// Verify that the refresh time hasn't been updated since the recovery (no new refresh required)
66+
assert.True(t, lastRefresh == rc.lastRefreshTime.Load().(time.Time), "Refresh time shouldn't be updated")
67+
68+
// Ensure that the client is still the refreshed one (not the old client)
69+
assert.Equal(t, newClient, rc.UniversalClient, "UniversalClient should still be the updated one")
70+
}
71+
72+
func BenchmarkSetCommandWithWrappedClient(b *testing.B) {
73+
srv, err := miniredis.Run()
74+
if err != nil {
75+
b.Fatalf("could not start mock Redis server: %v", err)
76+
}
77+
defer srv.Close()
78+
79+
cfg := &redis.UniversalOptions{
80+
Addrs: []string{srv.Addr()},
81+
}
82+
83+
rc := New(cfg)
84+
b.ResetTimer()
85+
86+
b.Run("SetCommand", func(b *testing.B) {
87+
for i := 0; i < b.N; i++ {
88+
err := rc.Set(context.Background(), "key", "value", 0).Err()
89+
if err != nil {
90+
b.Fatalf("Failed to set key: %v", err)
91+
}
92+
}
93+
})
94+
}
95+
96+
func BenchmarkGetCommandWithWrappedClient(b *testing.B) {
97+
srv, err := miniredis.Run()
98+
if err != nil {
99+
b.Fatalf("could not start mock Redis server: %v", err)
100+
}
101+
defer srv.Close()
102+
103+
cfg := &redis.UniversalOptions{
104+
Addrs: []string{srv.Addr()},
105+
}
106+
107+
rc := New(cfg)
108+
rc.Set(context.Background(), "key", "value", 0)
109+
110+
b.ResetTimer()
111+
112+
b.Run("GetCommand", func(b *testing.B) {
113+
for i := 0; i < b.N; i++ {
114+
_, err := rc.Get(context.Background(), "key").Result()
115+
if err != nil {
116+
b.Fatalf("Failed to get key: %v", err)
117+
}
118+
}
119+
})
120+
}
121+
122+
func BenchmarkGetCommandWithOriginalClient(b *testing.B) {
123+
srv, err := miniredis.Run()
124+
if err != nil {
125+
b.Fatalf("could not start mock Redis server: %v", err)
126+
}
127+
defer srv.Close()
128+
129+
cfg := &redis.UniversalOptions{
130+
Addrs: []string{srv.Addr()},
131+
}
132+
133+
rc := redis.NewUniversalClient(&redis.UniversalOptions{
134+
Addrs: cfg.Addrs,
135+
MasterName: cfg.MasterName,
136+
Password: cfg.Password,
137+
SentinelPassword: cfg.SentinelPassword,
138+
DB: cfg.DB,
139+
ReadTimeout: cfg.ReadTimeout,
140+
WriteTimeout: cfg.WriteTimeout,
141+
})
142+
rc.Set(context.Background(), "key", "value", 0)
143+
144+
b.ResetTimer()
145+
146+
b.Run("GetCommand", func(b *testing.B) {
147+
for i := 0; i < b.N; i++ {
148+
_, err := rc.Get(context.Background(), "key").Result()
149+
if err != nil {
150+
b.Fatalf("Failed to get key: %v", err)
151+
}
152+
}
153+
})
154+
}
155+
156+
func BenchmarkSetCommandWithOriginalClient(b *testing.B) {
157+
srv, err := miniredis.Run()
158+
if err != nil {
159+
b.Fatalf("could not start mock Redis server: %v", err)
160+
}
161+
defer srv.Close()
162+
163+
cfg := &redis.UniversalOptions{
164+
Addrs: []string{srv.Addr()},
165+
}
166+
167+
rc := redis.NewUniversalClient(&redis.UniversalOptions{
168+
Addrs: cfg.Addrs,
169+
MasterName: cfg.MasterName,
170+
Password: cfg.Password,
171+
SentinelPassword: cfg.SentinelPassword,
172+
DB: cfg.DB,
173+
ReadTimeout: cfg.ReadTimeout,
174+
WriteTimeout: cfg.WriteTimeout,
175+
})
176+
177+
b.ResetTimer()
178+
179+
b.Run("SetCommand", func(b *testing.B) {
180+
for i := 0; i < b.N; i++ {
181+
err := rc.Set(context.Background(), "key", "value", 0).Err()
182+
if err != nil {
183+
b.Fatalf("Failed to set key: %v", err)
184+
}
185+
}
186+
})
187+
}

0 commit comments

Comments
 (0)