From c36b57942df85a5f0838b69298ad2cbea214d05c Mon Sep 17 00:00:00 2001 From: Ronak Jain <4195rj@gmail.com> Date: Sun, 6 Apr 2025 13:14:29 +0530 Subject: [PATCH] 3021_add_subscriptionMap_log_subscription --- extra/rediscensus/go.mod | 2 +- extra/rediscmd/go.mod | 2 +- extra/redisotel/go.mod | 2 +- extra/redisprometheus/go.mod | 2 +- pubsub.go | 36 ++++++++++++++++++++++++++++++++---- 5 files changed, 36 insertions(+), 8 deletions(-) diff --git a/extra/rediscensus/go.mod b/extra/rediscensus/go.mod index 7033e805f..b39f7dd4c 100644 --- a/extra/rediscensus/go.mod +++ b/extra/rediscensus/go.mod @@ -19,6 +19,6 @@ require ( ) retract ( - v9.5.3 // This version was accidentally released. v9.7.2 // This version was accidentally released. + v9.5.3 // This version was accidentally released. ) diff --git a/extra/rediscmd/go.mod b/extra/rediscmd/go.mod index c1cff3e90..93cc423db 100644 --- a/extra/rediscmd/go.mod +++ b/extra/rediscmd/go.mod @@ -16,6 +16,6 @@ require ( ) retract ( - v9.5.3 // This version was accidentally released. v9.7.2 // This version was accidentally released. + v9.5.3 // This version was accidentally released. ) diff --git a/extra/redisotel/go.mod b/extra/redisotel/go.mod index e5b442e61..c5b29dffa 100644 --- a/extra/redisotel/go.mod +++ b/extra/redisotel/go.mod @@ -24,6 +24,6 @@ require ( ) retract ( - v9.5.3 // This version was accidentally released. v9.7.2 // This version was accidentally released. + v9.5.3 // This version was accidentally released. ) diff --git a/extra/redisprometheus/go.mod b/extra/redisprometheus/go.mod index 8bff00086..c934767e0 100644 --- a/extra/redisprometheus/go.mod +++ b/extra/redisprometheus/go.mod @@ -23,6 +23,6 @@ require ( ) retract ( - v9.5.3 // This version was accidentally released. v9.7.2 // This version was accidentally released. + v9.5.3 // This version was accidentally released. ) diff --git a/pubsub.go b/pubsub.go index 20c085f1f..4591416ee 100644 --- a/pubsub.go +++ b/pubsub.go @@ -3,6 +3,7 @@ package redis import ( "context" "fmt" + "sort" "strings" "sync" "time" @@ -571,6 +572,9 @@ type channel struct { chanSize int chanSendTimeout time.Duration checkInterval time.Duration + + subscriptions map[string]struct{} + subscriptionsMu sync.RWMutex } func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel { @@ -580,6 +584,8 @@ func newChannel(pubSub *PubSub, opts ...ChannelOption) *channel { chanSize: 100, chanSendTimeout: time.Minute, checkInterval: 3 * time.Second, + + subscriptions: make(map[string]struct{}), } for _, opt := range opts { opt(c) @@ -618,6 +624,20 @@ func (c *channel) initHealthCheck() { }() } +// Helper function to format subscription information +func (c *channel) getSubscriptionInfo() string { + if len(c.subscriptions) == 0 { + return "none" + } + + subs := make([]string, 0, len(c.subscriptions)) + for sub := range c.subscriptions { + subs = append(subs, sub) + } + sort.Strings(subs) // Sort for consistent output + return strings.Join(subs, ", ") +} + // initMsgChan must be in sync with initAllChan. func (c *channel) initMsgChan() { ctx := context.TODO() @@ -663,9 +683,13 @@ func (c *channel) initMsgChan() { <-timer.C } case <-timer.C: + c.subscriptionsMu.RLock() + subInfo := c.getSubscriptionInfo() + c.subscriptionsMu.RUnlock() + internal.Logger.Printf( - ctx, "redis: %s channel is full for %s (message is dropped)", - c, c.chanSendTimeout) + ctx, "redis: Channel is full for %s (message is dropped), subscriptions: %s", + c.chanSendTimeout, subInfo) } default: internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg) @@ -717,9 +741,13 @@ func (c *channel) initAllChan() { <-timer.C } case <-timer.C: + c.subscriptionsMu.RLock() + subInfo := c.getSubscriptionInfo() + c.subscriptionsMu.RUnlock() + internal.Logger.Printf( - ctx, "redis: %s channel is full for %s (message is dropped)", - c, c.chanSendTimeout) + ctx, "redis: Channel is full for %s (message is dropped), subscriptions: %s", + c.chanSendTimeout, subInfo) } default: internal.Logger.Printf(ctx, "redis: unknown message type: %T", msg)