Skip to content

Commit 84dc1cb

Browse files
committed
Adding new commands MExpire/MExpireAt, fixed the Ping command, minor performance improvements
1 parent 4a51821 commit 84dc1cb

File tree

4 files changed

+235
-23
lines changed

4 files changed

+235
-23
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## v2.2.0 (2023-04-05)
2+
3+
- Adding new commands MExpire/MExpireAt with support for executing multiple keys
4+
- Fixed the Ping command, now the check occurs on all shards and returns the first error for the sharded configuration
5+
- Minor performance improvements
6+
17
## v2.1.0 (2022-07-12)
28

39
- Added getting statistics of all connection pools

pool.go

Lines changed: 166 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,20 @@ func (p *Pool) TxPipelined(ctx context.Context, fn func(redis.Pipeliner) error)
206206
}
207207

208208
func (p *Pool) Ping(ctx context.Context) *redis.StatusCmd {
209-
// FIXME: use config to determine whether no key would access the master
210-
conn, err := p.connFactory.getMasterConn()
211-
if err != nil {
212-
return newErrorStatusCmd(err)
209+
if _, ok := p.connFactory.(*HAConnFactory); ok {
210+
conn, _ := p.connFactory.getMasterConn()
211+
return conn.Ping(ctx)
212+
}
213+
var result *redis.StatusCmd
214+
factory := p.connFactory.(*ShardConnFactory)
215+
for _, shard := range factory.shards {
216+
conn, _ := shard.getMasterConn()
217+
result = conn.Ping(ctx)
218+
if result.Err() != nil {
219+
return result
220+
}
213221
}
214-
return conn.Ping(ctx)
222+
return result
215223
}
216224

217225
func (p *Pool) Get(ctx context.Context, key string) *redis.StringCmd {
@@ -425,18 +433,17 @@ func (p *Pool) MSetWithGD(ctx context.Context, values ...interface{}) []*redis.S
425433
}
426434

427435
var wg sync.WaitGroup
428-
var mu sync.Mutex
429-
var result []*redis.StatusCmd
436+
result := make([]*redis.StatusCmd, len(index2Values))
437+
var i int
430438
for ind, vals := range index2Values {
431439
wg.Add(1)
432440
conn, _ := factory.shards[ind].getMasterConn()
433-
go func(conn *redis.Client, vals ...interface{}) {
434-
defer wg.Done()
441+
go func(i int, conn *redis.Client, vals ...interface{}) {
435442
status := conn.MSet(ctx, vals...)
436-
mu.Lock()
437-
result = append(result, status)
438-
mu.Unlock()
439-
}(conn, vals...)
443+
result[i] = status
444+
wg.Done()
445+
}(i, conn, vals...)
446+
i++
440447
}
441448
wg.Wait()
442449
return result
@@ -519,6 +526,79 @@ func (p *Pool) Expire(ctx context.Context, key string, expiration time.Duration)
519526
return conn.Expire(ctx, key, expiration)
520527
}
521528

529+
// MExpire gives the result for each group of keys
530+
func (p *Pool) MExpire(ctx context.Context, expiration time.Duration, keys ...string) map[string]error {
531+
keyErrorsMap := func(results []redis.Cmder) map[string]error {
532+
if len(results) == 0 {
533+
return nil
534+
}
535+
keyErrors := make(map[string]error, 0)
536+
for _, result := range results {
537+
if result.Err() != nil {
538+
args := result.Args()
539+
for i, arg := range args {
540+
if i == 0 || i == 2 {
541+
continue
542+
}
543+
keyErrors[arg.(string)] = result.Err()
544+
}
545+
}
546+
}
547+
return keyErrors
548+
}
549+
550+
if _, ok := p.connFactory.(*HAConnFactory); ok {
551+
conn, _ := p.connFactory.getMasterConn()
552+
pipe := conn.Pipeline()
553+
for _, key := range keys {
554+
pipe.Expire(ctx, key, expiration)
555+
}
556+
results, err := pipe.Exec(ctx)
557+
_ = pipe.Close()
558+
if err != nil {
559+
return keyErrorsMap(results)
560+
}
561+
return nil
562+
}
563+
564+
factory := p.connFactory.(*ShardConnFactory)
565+
index2Keys := make(map[uint32][]string)
566+
for _, key := range keys {
567+
ind := factory.getShardIndex(key)
568+
if _, ok := index2Keys[ind]; !ok {
569+
index2Keys[ind] = make([]string, 0)
570+
}
571+
index2Keys[ind] = append(index2Keys[ind], key)
572+
}
573+
574+
var wg sync.WaitGroup
575+
var mu sync.Mutex
576+
var results []redis.Cmder
577+
var i int
578+
for ind, keys := range index2Keys {
579+
wg.Add(1)
580+
conn, _ := factory.shards[ind].getMasterConn()
581+
go func(i int, conn *redis.Client, keys ...string) {
582+
pipe := conn.Pipeline()
583+
for _, key := range keys {
584+
pipe.Expire(ctx, key, expiration)
585+
}
586+
result, err := pipe.Exec(ctx)
587+
_ = pipe.Close()
588+
if err != nil {
589+
mu.Lock()
590+
results = append(results, result...)
591+
mu.Unlock()
592+
}
593+
wg.Done()
594+
}(i, conn, keys...)
595+
i++
596+
}
597+
wg.Wait()
598+
599+
return keyErrorsMap(results)
600+
}
601+
522602
func (p *Pool) ExpireAt(ctx context.Context, key string, tm time.Time) *redis.BoolCmd {
523603
conn, err := p.connFactory.getMasterConn(key)
524604
if err != nil {
@@ -527,6 +607,79 @@ func (p *Pool) ExpireAt(ctx context.Context, key string, tm time.Time) *redis.Bo
527607
return conn.ExpireAt(ctx, key, tm)
528608
}
529609

610+
// MExpireAt gives the result for each group of keys
611+
func (p *Pool) MExpireAt(ctx context.Context, tm time.Time, keys ...string) map[string]error {
612+
keyErrorsMap := func(results []redis.Cmder) map[string]error {
613+
if len(results) == 0 {
614+
return nil
615+
}
616+
keyErrors := make(map[string]error, 0)
617+
for _, result := range results {
618+
if result.Err() != nil {
619+
args := result.Args()
620+
for i, arg := range args {
621+
if i == 0 || i == 2 {
622+
continue
623+
}
624+
keyErrors[arg.(string)] = result.Err()
625+
}
626+
}
627+
}
628+
return keyErrors
629+
}
630+
631+
if _, ok := p.connFactory.(*HAConnFactory); ok {
632+
conn, _ := p.connFactory.getMasterConn()
633+
pipe := conn.Pipeline()
634+
for _, key := range keys {
635+
pipe.ExpireAt(ctx, key, tm)
636+
}
637+
results, err := pipe.Exec(ctx)
638+
_ = pipe.Close()
639+
if err != nil {
640+
return keyErrorsMap(results)
641+
}
642+
return nil
643+
}
644+
645+
factory := p.connFactory.(*ShardConnFactory)
646+
index2Keys := make(map[uint32][]string)
647+
for _, key := range keys {
648+
ind := factory.getShardIndex(key)
649+
if _, ok := index2Keys[ind]; !ok {
650+
index2Keys[ind] = make([]string, 0)
651+
}
652+
index2Keys[ind] = append(index2Keys[ind], key)
653+
}
654+
655+
var wg sync.WaitGroup
656+
var mu sync.Mutex
657+
var results []redis.Cmder
658+
var i int
659+
for ind, keys := range index2Keys {
660+
wg.Add(1)
661+
conn, _ := factory.shards[ind].getMasterConn()
662+
go func(i int, conn *redis.Client, keys ...string) {
663+
pipe := conn.Pipeline()
664+
for _, key := range keys {
665+
pipe.ExpireAt(ctx, key, tm)
666+
}
667+
result, err := pipe.Exec(ctx)
668+
_ = pipe.Close()
669+
if err != nil {
670+
mu.Lock()
671+
results = append(results, result...)
672+
mu.Unlock()
673+
}
674+
wg.Done()
675+
}(i, conn, keys...)
676+
i++
677+
}
678+
wg.Wait()
679+
680+
return keyErrorsMap(results)
681+
}
682+
530683
func (p *Pool) TTL(ctx context.Context, key string) *redis.DurationCmd {
531684
conn, err := p.connFactory.getSlaveConn(key)
532685
if err != nil {

pool_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1820,6 +1820,14 @@ var _ = Describe("Pool_GD", func() {
18201820
})
18211821

18221822
Describe("Commands", func() {
1823+
It("ping", func() {
1824+
for _, pool := range pools {
1825+
_, err := pool.Ping(ctx).Result()
1826+
Expect(err).To(HaveOccurred())
1827+
Expect(err.Error()).To(Equal("dial tcp 127.0.0.1:8384: connect: connection refused"))
1828+
}
1829+
})
1830+
18231831
It("gd", func() {
18241832
kvs := []string{"a3", "a3", "b3", "b3", "c3", "c3", "d3", "d3"}
18251833
keys := make([]string, 0)
@@ -1844,5 +1852,53 @@ var _ = Describe("Pool_GD", func() {
18441852
_, _ = pool.Del(ctx, keys...)
18451853
}
18461854
})
1855+
1856+
It("mexpire", func() {
1857+
kvs := []string{"a3", "a3", "b3", "b3", "c3", "c3", "d3", "d3"}
1858+
keys := make([]string, 0)
1859+
for i := 0; i < len(kvs); i += 2 {
1860+
keys = append(keys, kvs[i])
1861+
}
1862+
for _, pool := range pools {
1863+
statuses := pool.MSetWithGD(ctx, kvs)
1864+
time.Sleep(10 * time.Millisecond)
1865+
sort.Slice(statuses, func(i, j int) bool {
1866+
return statuses[i].Err() == nil
1867+
})
1868+
Expect(statuses[0].Err()).NotTo(HaveOccurred())
1869+
Expect(statuses[1].Err()).To(HaveOccurred())
1870+
keyErrors := pool.MExpire(ctx, 5*time.Minute, keys...)
1871+
time.Sleep(10 * time.Millisecond)
1872+
Expect(keyErrors).Should(HaveKey("a3"))
1873+
Expect(keyErrors).Should(HaveKey("c3"))
1874+
Expect(pool.TTL(ctx, "b3").Val()).NotTo(Equal(-1))
1875+
Expect(pool.TTL(ctx, "d3").Val()).NotTo(Equal(-1))
1876+
_, _ = pool.Del(ctx, keys...)
1877+
}
1878+
})
1879+
1880+
It("mexpireat", func() {
1881+
kvs := []string{"a3", "a3", "b3", "b3", "c3", "c3", "d3", "d3"}
1882+
keys := make([]string, 0)
1883+
for i := 0; i < len(kvs); i += 2 {
1884+
keys = append(keys, kvs[i])
1885+
}
1886+
for _, pool := range pools {
1887+
statuses := pool.MSetWithGD(ctx, kvs)
1888+
time.Sleep(10 * time.Millisecond)
1889+
sort.Slice(statuses, func(i, j int) bool {
1890+
return statuses[i].Err() == nil
1891+
})
1892+
Expect(statuses[0].Err()).NotTo(HaveOccurred())
1893+
Expect(statuses[1].Err()).To(HaveOccurred())
1894+
keyErrors := pool.MExpireAt(ctx, time.Now().Add(5*time.Minute), keys...)
1895+
time.Sleep(10 * time.Millisecond)
1896+
Expect(keyErrors).Should(HaveKey("a3"))
1897+
Expect(keyErrors).Should(HaveKey("c3"))
1898+
Expect(pool.TTL(ctx, "b3").Val()).NotTo(Equal(-1))
1899+
Expect(pool.TTL(ctx, "d3").Val()).NotTo(Equal(-1))
1900+
_, _ = pool.Del(ctx, keys...)
1901+
}
1902+
})
18471903
})
18481904
})

shard_conn_factory.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,12 @@ func NewShardConnFactory(cfg *ShardConfig) (*ShardConnFactory, error) {
6565

6666
func (factory *ShardConnFactory) stats() map[string]*redis.PoolStats {
6767
results := make(map[string]*redis.PoolStats, len(factory.shards))
68-
6968
for _, shard := range factory.shards {
7069
result := shard.stats()
7170
for addr, stats := range result {
7271
results[addr] = stats
7372
}
7473
}
75-
7674
return results
7775
}
7876

@@ -152,18 +150,17 @@ func (factory *ShardConnFactory) doMultiKeys(fn multiKeyFn, keys ...string) []re
152150
return []redis.Cmder{fn(factory, keys...)}
153151
}
154152

155-
var mu sync.Mutex
156153
var wg sync.WaitGroup
157-
var results []redis.Cmder
154+
results := make([]redis.Cmder, len(index2Keys))
155+
var i int
158156
for _, keyList := range index2Keys {
159157
wg.Add(1)
160-
go func(keyList []string) {
161-
defer wg.Done()
158+
go func(keyList []string, i int) {
162159
result := fn(factory, keyList...)
163-
mu.Lock()
164-
results = append(results, result)
165-
mu.Unlock()
166-
}(keyList)
160+
results[i] = result
161+
wg.Done()
162+
}(keyList, i)
163+
i++
167164
}
168165
wg.Wait()
169166
return results

0 commit comments

Comments
 (0)