diff --git a/ring.go b/ring.go index 4da0b21a6..fd169e2b7 100644 --- a/ring.go +++ b/ring.go @@ -784,6 +784,8 @@ func (c *Ring) generalProcessPipeline( } var wg sync.WaitGroup + errs := make(chan error, len(cmdsMap)) + for hash, cmds := range cmdsMap { wg.Add(1) go func(hash string, cmds []Cmder) { @@ -796,16 +798,23 @@ func (c *Ring) generalProcessPipeline( return } + hook := shard.Client.processPipelineHook if tx { - cmds = wrapMultiExec(ctx, cmds) - _ = shard.Client.processTxPipelineHook(ctx, cmds) - } else { - _ = shard.Client.processPipelineHook(ctx, cmds) + cmds, hook = wrapMultiExec(ctx, cmds), shard.Client.processTxPipelineHook + } + + if err = hook(ctx, cmds); err != nil { + errs <- err } }(hash, cmds) } wg.Wait() + close(errs) + + if err := <-errs; err != nil { + return err + } return cmdsFirstErr(cmds) } diff --git a/ring_test.go b/ring_test.go index aaac74dc9..908807ac3 100644 --- a/ring_test.go +++ b/ring_test.go @@ -271,6 +271,21 @@ var _ = Describe("Redis Ring", func() { Expect(ringShard1.Info(ctx).Val()).ToNot(ContainSubstring("keys=")) Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=100")) }) + + It("return dial timeout error", func() { + opt := redisRingOptions() + opt.DialTimeout = 250 * time.Millisecond + opt.Addrs = map[string]string{"ringShardNotExist": ":1997"} + ring = redis.NewRing(opt) + + _, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.HSet(ctx, "key", "value") + pipe.Expire(ctx, "key", time.Minute) + return nil + }) + + Expect(err).To(HaveOccurred()) + }) }) Describe("new client callback", func() {