From 383aa85a5796fdacdffda906406fcc7f82a75320 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Wed, 20 Aug 2025 20:40:07 +0300 Subject: [PATCH 1/6] initial implementation Signed-off-by: nyagamunene --- pkg/messaging/nats/options.go | 46 ++++++++++++++- pkg/messaging/nats/pubsub.go | 107 +++++++++++++++++++++++++++++++++- 2 files changed, 150 insertions(+), 3 deletions(-) diff --git a/pkg/messaging/nats/options.go b/pkg/messaging/nats/options.go index 7d8ac71e26..034e148c05 100644 --- a/pkg/messaging/nats/options.go +++ b/pkg/messaging/nats/options.go @@ -31,14 +31,40 @@ var ( const msgPrefix = "m" type options struct { - prefix string - jsStreamConfig jetstream.StreamConfig + prefix string + jsStreamConfig jetstream.StreamConfig + slowConsumerConfig *SlowConsumerConfig +} + +type SlowConsumerConfig struct { + // MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending + // Controls the maximum number of outstanding unacknowledged messages + // Also used for slow consumer detection at 70% of this value + MaxPendingMsgs int + + // MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes + // Controls the maximum bytes per batch request (closest JetStream equivalent) + MaxPendingBytes int + + // EnableDroppedMsgTracking enables logging of message redeliveries + // which can indicate slow consumer behavior in JetStream + EnableDroppedMsgTracking bool + + // EnableSlowConsumerDetection enables automatic slow consumer detection + // and logging when consumer lag exceeds 70% of MaxPendingMsgs + EnableSlowConsumerDetection bool } func defaultOptions() options { return options{ prefix: msgPrefix, jsStreamConfig: jsStreamConfig, + slowConsumerConfig: &SlowConsumerConfig{ + MaxPendingMsgs: 1000, + MaxPendingBytes: 5 * 1024 * 1024, + EnableDroppedMsgTracking: true, + EnableSlowConsumerDetection: true, // Will warn at 70% of MaxPendingMsgs (700 messages) + }, } } @@ -73,3 +99,19 @@ func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option { return nil } } + +// WithSlowConsumerConfig sets the slow consumer configuration. +func WithSlowConsumerConfig(config SlowConsumerConfig) messaging.Option { + return func(val interface{}) error { + switch v := val.(type) { + case *publisher: + v.slowConsumerConfig = &config + case *pubsub: + v.slowConsumerConfig = &config + default: + return ErrInvalidType + } + + return nil + } +} diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index ad347f79bb..f9b45e809e 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -52,7 +52,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes } } - conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects)) + conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects), broker.ErrorHandler(ps.natsErrorHandler)) if err != nil { return nil, err } @@ -72,6 +72,30 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes return ps, nil } +func (ps *pubsub) natsErrorHandler(nc *broker.Conn, sub *broker.Subscription, natsErr error) { + ps.logger.Error("NATS error occurred", + slog.String("error", natsErr.Error()), + slog.String("subject", sub.Subject), + ) + + if natsErr == broker.ErrSlowConsumer { + pendingMsgs, pendingBytes, err := sub.Pending() + if err != nil { + ps.logger.Error("couldn't get pending messages for slow consumer", + slog.String("error", err.Error()), + slog.String("subject", sub.Subject), + ) + return + } + + ps.logger.Warn("Slow consumer detected", + slog.String("subject", sub.Subject), + slog.Int("pending_messages", pendingMsgs), + slog.Int("pending_bytes", pendingBytes), + ) + } +} + func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { if cfg.ID == "" { return ErrEmptyID @@ -91,6 +115,22 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) FilterSubject: cfg.Topic, } + if ps.slowConsumerConfig != nil { + consumerConfig.MaxAckPending = ps.slowConsumerConfig.MaxPendingMsgs + + if ps.slowConsumerConfig.MaxPendingBytes > 0 { + consumerConfig.MaxRequestMaxBytes = ps.slowConsumerConfig.MaxPendingBytes + } + + ps.logger.Info("Applied slow consumer throttling to JetStream consumer", + slog.String("consumer", consumerConfig.Name), + slog.Int("max_ack_pending", consumerConfig.MaxAckPending), + slog.Int("max_request_max_bytes", consumerConfig.MaxRequestMaxBytes), + slog.Bool("dropped_msg_tracking", ps.slowConsumerConfig.EnableDroppedMsgTracking), + slog.Bool("slow_consumer_detection", ps.slowConsumerConfig.EnableSlowConsumerDetection), + ) + } + if cfg.Ordered { consumerConfig.MaxAckPending = 1 } @@ -111,6 +151,10 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) return fmt.Errorf("failed to consume: %w", err) } + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableSlowConsumerDetection { + go ps.checkConsumerLag(ctx, consumerConfig.Name) + } + return nil } @@ -145,6 +189,29 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) func(m jetstream.Msg) slog.Uint64("stream_seq", meta.Sequence.Stream), slog.Uint64("consumer_seq", meta.Sequence.Consumer), ) + + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableSlowConsumerDetection { + sequenceLag := meta.Sequence.Stream - meta.Sequence.Consumer + lagThreshold := uint64(float64(ps.slowConsumerConfig.MaxPendingMsgs) * 0.7) + if sequenceLag > lagThreshold { + args = append(args, + slog.Uint64("sequence_lag", sequenceLag), + slog.Uint64("lag_threshold", lagThreshold), + slog.String("slow_consumer_detection", "lag_threshold_exceeded"), + ) + ps.logger.Warn("JetStream slow consumer detected - sequence lag exceeds threshold", args...) + } + } + + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableDroppedMsgTracking { + if meta.NumDelivered > 1 { + args = append(args, + slog.Uint64("delivery_count", meta.NumDelivered), + slog.String("redelivery_reason", "slow_consumer_or_ack_timeout"), + ) + ps.logger.Warn("Message redelivered (potential slow consumer)", args...) + } + } default: args = append(args, slog.String("metadata_error", err.Error()), @@ -219,3 +286,41 @@ func formatConsumerName(topic, id string) string { return fmt.Sprintf("%s-%s", topic, id) } + +// checkConsumerLag checks the consumer lag and logs warnings if thresholds are exceeded. +// This provides additional slow consumer detection for JetStream consumers. +func (ps *pubsub) checkConsumerLag(ctx context.Context, consumerName string) { + if ps.slowConsumerConfig == nil || !ps.slowConsumerConfig.EnableSlowConsumerDetection { + return + } + + consumer, err := ps.stream.Consumer(ctx, consumerName) + if err != nil { + ps.logger.Error("failed to get consumer for lag check", + slog.String("consumer", consumerName), + slog.String("error", err.Error()), + ) + return + } + + info, err := consumer.Info(ctx) + if err != nil { + ps.logger.Error("failed to get consumer info for lag check", + slog.String("consumer", consumerName), + slog.String("error", err.Error()), + ) + return + } + + pending := info.NumPending + lagThreshold := uint64(float64(ps.slowConsumerConfig.MaxPendingMsgs) * 0.7) + if pending > lagThreshold { + ps.logger.Warn("JetStream consumer lag threshold exceeded", + slog.String("consumer", consumerName), + slog.Uint64("pending_messages", pending), + slog.Int("ack_pending", info.NumAckPending), + slog.Int("redelivered", info.NumRedelivered), + slog.Uint64("threshold", lagThreshold), + ) + } +} From 557bacc3c073d99c765d2b272debcab91e5f8d65 Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 21 Aug 2025 11:07:51 +0300 Subject: [PATCH 2/6] remove unneccesary comments Signed-off-by: nyagamunene --- pkg/messaging/nats/options.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/messaging/nats/options.go b/pkg/messaging/nats/options.go index 034e148c05..2a904cf0d8 100644 --- a/pkg/messaging/nats/options.go +++ b/pkg/messaging/nats/options.go @@ -28,7 +28,13 @@ var ( } ) -const msgPrefix = "m" +const ( + msgPrefix = "m" + defaultMaxPendingMsgs = 1000 + defaultMaxPendingBytes = 5 * 1024 * 1024 + defaultEnableDroppedMsgTracking = true + defaultEnableSlowConsumerDetection = true +) type options struct { prefix string @@ -60,10 +66,10 @@ func defaultOptions() options { prefix: msgPrefix, jsStreamConfig: jsStreamConfig, slowConsumerConfig: &SlowConsumerConfig{ - MaxPendingMsgs: 1000, - MaxPendingBytes: 5 * 1024 * 1024, - EnableDroppedMsgTracking: true, - EnableSlowConsumerDetection: true, // Will warn at 70% of MaxPendingMsgs (700 messages) + MaxPendingMsgs: defaultMaxPendingMsgs, + MaxPendingBytes: defaultMaxPendingBytes, + EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking, + EnableSlowConsumerDetection: defaultEnableSlowConsumerDetection, }, } } From 252b82a324933dddb57a19f374bcc494e7b4143a Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 21 Aug 2025 13:32:10 +0300 Subject: [PATCH 3/6] address comments Signed-off-by: nyagamunene --- pkg/events/nats/subscriber.go | 22 ++++--- pkg/messaging/nats/options.go | 109 ++++++++++++++++++-------------- pkg/messaging/nats/publisher.go | 2 +- pkg/messaging/nats/pubsub.go | 76 ++++------------------ 4 files changed, 90 insertions(+), 119 deletions(-) diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go index 4dde230d58..0cd003d508 100644 --- a/pkg/events/nats/subscriber.go +++ b/pkg/events/nats/subscriber.go @@ -22,16 +22,18 @@ var _ events.Subscriber = (*subEventStore)(nil) var ( eventsPrefix = "events" - jsStreamConfig = jetstream.StreamConfig{ - Name: "events", - Description: "SuperMQ stream for sending and receiving messages in between SuperMQ events", - Subjects: []string{"events.>"}, - Retention: jetstream.LimitsPolicy, - MaxMsgsPerSubject: 1e9, - MaxAge: time.Hour * 24, - MaxMsgSize: 1024 * 1024, - Discard: jetstream.DiscardOld, - Storage: jetstream.FileStorage, + jsStreamConfig = broker.SMQJetStreamConfig{ + StreamConfig: jetstream.StreamConfig{ + Name: "events", + Description: "SuperMQ stream for sending and receiving messages in between SuperMQ events", + Subjects: []string{"events.>"}, + Retention: jetstream.LimitsPolicy, + MaxMsgsPerSubject: 1e9, + MaxAge: time.Hour * 24, + MaxMsgSize: 1024 * 1024, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, + }, } // ErrEmptyStream is returned when stream name is empty. diff --git a/pkg/messaging/nats/options.go b/pkg/messaging/nats/options.go index 2a904cf0d8..a737590cf0 100644 --- a/pkg/messaging/nats/options.go +++ b/pkg/messaging/nats/options.go @@ -14,63 +14,64 @@ import ( var ( // ErrInvalidType is returned when the provided value is not of the expected type. ErrInvalidType = errors.New("invalid type") - - jsStreamConfig = jetstream.StreamConfig{ - Name: "m", - Description: "SuperMQ stream for sending and receiving messages in between SuperMQ channels", - Subjects: []string{"m.>"}, - Retention: jetstream.LimitsPolicy, - MaxMsgsPerSubject: 1e6, - MaxAge: time.Hour * 24, - MaxMsgSize: 1024 * 1024, - Discard: jetstream.DiscardOld, - Storage: jetstream.FileStorage, - } -) - -const ( - msgPrefix = "m" - defaultMaxPendingMsgs = 1000 - defaultMaxPendingBytes = 5 * 1024 * 1024 - defaultEnableDroppedMsgTracking = true - defaultEnableSlowConsumerDetection = true ) -type options struct { - prefix string - jsStreamConfig jetstream.StreamConfig - slowConsumerConfig *SlowConsumerConfig +// SMQJetStreamConfig extends jetstream.StreamConfig with slow consumer monitoring settings +type SMQJetStreamConfig struct { + jetstream.StreamConfig + + SlowConsumer SlowConsumerConfig `json:"slow_consumer,omitempty"` } type SlowConsumerConfig struct { - // MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending - // Controls the maximum number of outstanding unacknowledged messages - // Also used for slow consumer detection at 70% of this value - MaxPendingMsgs int - // MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes // Controls the maximum bytes per batch request (closest JetStream equivalent) - MaxPendingBytes int + MaxPendingBytes int `json:"max_pending_bytes,omitempty"` // EnableDroppedMsgTracking enables logging of message redeliveries // which can indicate slow consumer behavior in JetStream - EnableDroppedMsgTracking bool + EnableDroppedMsgTracking bool `json:"enable_dropped_msg_tracking,omitempty"` +} - // EnableSlowConsumerDetection enables automatic slow consumer detection - // and logging when consumer lag exceeds 70% of MaxPendingMsgs - EnableSlowConsumerDetection bool +var ( + defaultJetStreamConfig = SMQJetStreamConfig{ + StreamConfig: jetstream.StreamConfig{ + Name: "m", + Description: "SuperMQ stream for sending and receiving messages in between SuperMQ channels", + Subjects: []string{"m.>"}, + Retention: jetstream.LimitsPolicy, + MaxMsgsPerSubject: 1e6, + MaxAge: time.Hour * 24, + MaxMsgSize: 1024 * 1024, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, + ConsumerLimits: jetstream.StreamConsumerLimits{ + MaxAckPending: defaultMaxPendingMsgs, + }, + }, + SlowConsumer: SlowConsumerConfig{ + MaxPendingBytes: defaultMaxPendingBytes, + EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking, + }, + } +) + +const ( + msgPrefix = "m" + defaultMaxPendingMsgs = 1000 + defaultMaxPendingBytes = 5 * 1024 * 1024 + defaultEnableDroppedMsgTracking = true +) + +type options struct { + prefix string + jsStreamConfig SMQJetStreamConfig } func defaultOptions() options { return options{ prefix: msgPrefix, - jsStreamConfig: jsStreamConfig, - slowConsumerConfig: &SlowConsumerConfig{ - MaxPendingMsgs: defaultMaxPendingMsgs, - MaxPendingBytes: defaultMaxPendingBytes, - EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking, - EnableSlowConsumerDetection: defaultEnableSlowConsumerDetection, - }, + jsStreamConfig: defaultJetStreamConfig, } } @@ -90,8 +91,8 @@ func Prefix(prefix string) messaging.Option { } } -// JSStreamConfig sets the JetStream for the publisher or subscriber. -func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option { +// JSStreamConfig sets the JetStream configuration for the publisher or subscriber. +func JSStreamConfig(jsStreamConfig SMQJetStreamConfig) messaging.Option { return func(val any) error { switch v := val.(type) { case *publisher: @@ -106,14 +107,30 @@ func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option { } } -// WithSlowConsumerConfig sets the slow consumer configuration. +// WithSlowConsumerConfig sets the slow consumer configuration within the JetStream config. func WithSlowConsumerConfig(config SlowConsumerConfig) messaging.Option { return func(val interface{}) error { switch v := val.(type) { case *publisher: - v.slowConsumerConfig = &config + v.jsStreamConfig.SlowConsumer = config + case *pubsub: + v.jsStreamConfig.SlowConsumer = config + default: + return ErrInvalidType + } + + return nil + } +} + +// WithConsumerLimits sets the built-in JetStream consumer limits (MaxAckPending, etc.). +func WithConsumerLimits(limits jetstream.StreamConsumerLimits) messaging.Option { + return func(val interface{}) error { + switch v := val.(type) { + case *publisher: + v.jsStreamConfig.ConsumerLimits = limits case *pubsub: - v.slowConsumerConfig = &config + v.jsStreamConfig.ConsumerLimits = limits default: return ErrInvalidType } diff --git a/pkg/messaging/nats/publisher.go b/pkg/messaging/nats/publisher.go index 93317b4f13..c6949b3ad0 100644 --- a/pkg/messaging/nats/publisher.go +++ b/pkg/messaging/nats/publisher.go @@ -55,7 +55,7 @@ func NewPublisher(ctx context.Context, url string, opts ...messaging.Option) (me if err != nil { return nil, err } - if _, err := js.CreateStream(ctx, pub.jsStreamConfig); err != nil { + if _, err := js.CreateStream(ctx, pub.jsStreamConfig.StreamConfig); err != nil { return nil, err } pub.js = js diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index f9b45e809e..e7caff2f59 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -62,7 +62,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes if err != nil { return nil, err } - stream, err := js.CreateStream(ctx, ps.jsStreamConfig) + stream, err := js.CreateStream(ctx, ps.jsStreamConfig.StreamConfig) if err != nil { return nil, err } @@ -115,19 +115,24 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) FilterSubject: cfg.Topic, } - if ps.slowConsumerConfig != nil { - consumerConfig.MaxAckPending = ps.slowConsumerConfig.MaxPendingMsgs + // Apply consumer limits from the built-in ConsumerLimits + if ps.jsStreamConfig.ConsumerLimits.MaxAckPending > 0 { + consumerConfig.MaxAckPending = ps.jsStreamConfig.ConsumerLimits.MaxAckPending + } - if ps.slowConsumerConfig.MaxPendingBytes > 0 { - consumerConfig.MaxRequestMaxBytes = ps.slowConsumerConfig.MaxPendingBytes - } + // Apply additional monitoring configuration + monitoring := &ps.jsStreamConfig.SlowConsumer + if monitoring.MaxPendingBytes > 0 { + consumerConfig.MaxRequestMaxBytes = monitoring.MaxPendingBytes + } + // Log the applied configuration + if ps.jsStreamConfig.ConsumerLimits.MaxAckPending > 0 { ps.logger.Info("Applied slow consumer throttling to JetStream consumer", slog.String("consumer", consumerConfig.Name), slog.Int("max_ack_pending", consumerConfig.MaxAckPending), slog.Int("max_request_max_bytes", consumerConfig.MaxRequestMaxBytes), - slog.Bool("dropped_msg_tracking", ps.slowConsumerConfig.EnableDroppedMsgTracking), - slog.Bool("slow_consumer_detection", ps.slowConsumerConfig.EnableSlowConsumerDetection), + slog.Bool("dropped_msg_tracking", monitoring.EnableDroppedMsgTracking), ) } @@ -151,10 +156,6 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) return fmt.Errorf("failed to consume: %w", err) } - if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableSlowConsumerDetection { - go ps.checkConsumerLag(ctx, consumerConfig.Name) - } - return nil } @@ -190,20 +191,7 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) func(m jetstream.Msg) slog.Uint64("consumer_seq", meta.Sequence.Consumer), ) - if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableSlowConsumerDetection { - sequenceLag := meta.Sequence.Stream - meta.Sequence.Consumer - lagThreshold := uint64(float64(ps.slowConsumerConfig.MaxPendingMsgs) * 0.7) - if sequenceLag > lagThreshold { - args = append(args, - slog.Uint64("sequence_lag", sequenceLag), - slog.Uint64("lag_threshold", lagThreshold), - slog.String("slow_consumer_detection", "lag_threshold_exceeded"), - ) - ps.logger.Warn("JetStream slow consumer detected - sequence lag exceeds threshold", args...) - } - } - - if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableDroppedMsgTracking { + if ps.jsStreamConfig.SlowConsumer.EnableDroppedMsgTracking { if meta.NumDelivered > 1 { args = append(args, slog.Uint64("delivery_count", meta.NumDelivered), @@ -287,40 +275,4 @@ func formatConsumerName(topic, id string) string { return fmt.Sprintf("%s-%s", topic, id) } -// checkConsumerLag checks the consumer lag and logs warnings if thresholds are exceeded. -// This provides additional slow consumer detection for JetStream consumers. -func (ps *pubsub) checkConsumerLag(ctx context.Context, consumerName string) { - if ps.slowConsumerConfig == nil || !ps.slowConsumerConfig.EnableSlowConsumerDetection { - return - } - - consumer, err := ps.stream.Consumer(ctx, consumerName) - if err != nil { - ps.logger.Error("failed to get consumer for lag check", - slog.String("consumer", consumerName), - slog.String("error", err.Error()), - ) - return - } - info, err := consumer.Info(ctx) - if err != nil { - ps.logger.Error("failed to get consumer info for lag check", - slog.String("consumer", consumerName), - slog.String("error", err.Error()), - ) - return - } - - pending := info.NumPending - lagThreshold := uint64(float64(ps.slowConsumerConfig.MaxPendingMsgs) * 0.7) - if pending > lagThreshold { - ps.logger.Warn("JetStream consumer lag threshold exceeded", - slog.String("consumer", consumerName), - slog.Uint64("pending_messages", pending), - slog.Int("ack_pending", info.NumAckPending), - slog.Int("redelivered", info.NumRedelivered), - slog.Uint64("threshold", lagThreshold), - ) - } -} From 2bb7d313dfbfeb4d059869bf2868c0b5e2b4e68f Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 21 Aug 2025 13:56:22 +0300 Subject: [PATCH 4/6] fix struct variable Signed-off-by: nyagamunene --- pkg/events/nats/subscriber.go | 22 +++---- pkg/messaging/nats/options.go | 109 ++++++++++++++------------------ pkg/messaging/nats/publisher.go | 2 +- pkg/messaging/nats/pubsub.go | 20 +++--- 4 files changed, 65 insertions(+), 88 deletions(-) diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go index 0cd003d508..4dde230d58 100644 --- a/pkg/events/nats/subscriber.go +++ b/pkg/events/nats/subscriber.go @@ -22,18 +22,16 @@ var _ events.Subscriber = (*subEventStore)(nil) var ( eventsPrefix = "events" - jsStreamConfig = broker.SMQJetStreamConfig{ - StreamConfig: jetstream.StreamConfig{ - Name: "events", - Description: "SuperMQ stream for sending and receiving messages in between SuperMQ events", - Subjects: []string{"events.>"}, - Retention: jetstream.LimitsPolicy, - MaxMsgsPerSubject: 1e9, - MaxAge: time.Hour * 24, - MaxMsgSize: 1024 * 1024, - Discard: jetstream.DiscardOld, - Storage: jetstream.FileStorage, - }, + jsStreamConfig = jetstream.StreamConfig{ + Name: "events", + Description: "SuperMQ stream for sending and receiving messages in between SuperMQ events", + Subjects: []string{"events.>"}, + Retention: jetstream.LimitsPolicy, + MaxMsgsPerSubject: 1e9, + MaxAge: time.Hour * 24, + MaxMsgSize: 1024 * 1024, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, } // ErrEmptyStream is returned when stream name is empty. diff --git a/pkg/messaging/nats/options.go b/pkg/messaging/nats/options.go index a737590cf0..2a904cf0d8 100644 --- a/pkg/messaging/nats/options.go +++ b/pkg/messaging/nats/options.go @@ -14,64 +14,63 @@ import ( var ( // ErrInvalidType is returned when the provided value is not of the expected type. ErrInvalidType = errors.New("invalid type") + + jsStreamConfig = jetstream.StreamConfig{ + Name: "m", + Description: "SuperMQ stream for sending and receiving messages in between SuperMQ channels", + Subjects: []string{"m.>"}, + Retention: jetstream.LimitsPolicy, + MaxMsgsPerSubject: 1e6, + MaxAge: time.Hour * 24, + MaxMsgSize: 1024 * 1024, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, + } +) + +const ( + msgPrefix = "m" + defaultMaxPendingMsgs = 1000 + defaultMaxPendingBytes = 5 * 1024 * 1024 + defaultEnableDroppedMsgTracking = true + defaultEnableSlowConsumerDetection = true ) -// SMQJetStreamConfig extends jetstream.StreamConfig with slow consumer monitoring settings -type SMQJetStreamConfig struct { - jetstream.StreamConfig - - SlowConsumer SlowConsumerConfig `json:"slow_consumer,omitempty"` +type options struct { + prefix string + jsStreamConfig jetstream.StreamConfig + slowConsumerConfig *SlowConsumerConfig } type SlowConsumerConfig struct { + // MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending + // Controls the maximum number of outstanding unacknowledged messages + // Also used for slow consumer detection at 70% of this value + MaxPendingMsgs int + // MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes // Controls the maximum bytes per batch request (closest JetStream equivalent) - MaxPendingBytes int `json:"max_pending_bytes,omitempty"` + MaxPendingBytes int // EnableDroppedMsgTracking enables logging of message redeliveries // which can indicate slow consumer behavior in JetStream - EnableDroppedMsgTracking bool `json:"enable_dropped_msg_tracking,omitempty"` -} + EnableDroppedMsgTracking bool -var ( - defaultJetStreamConfig = SMQJetStreamConfig{ - StreamConfig: jetstream.StreamConfig{ - Name: "m", - Description: "SuperMQ stream for sending and receiving messages in between SuperMQ channels", - Subjects: []string{"m.>"}, - Retention: jetstream.LimitsPolicy, - MaxMsgsPerSubject: 1e6, - MaxAge: time.Hour * 24, - MaxMsgSize: 1024 * 1024, - Discard: jetstream.DiscardOld, - Storage: jetstream.FileStorage, - ConsumerLimits: jetstream.StreamConsumerLimits{ - MaxAckPending: defaultMaxPendingMsgs, - }, - }, - SlowConsumer: SlowConsumerConfig{ - MaxPendingBytes: defaultMaxPendingBytes, - EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking, - }, - } -) - -const ( - msgPrefix = "m" - defaultMaxPendingMsgs = 1000 - defaultMaxPendingBytes = 5 * 1024 * 1024 - defaultEnableDroppedMsgTracking = true -) - -type options struct { - prefix string - jsStreamConfig SMQJetStreamConfig + // EnableSlowConsumerDetection enables automatic slow consumer detection + // and logging when consumer lag exceeds 70% of MaxPendingMsgs + EnableSlowConsumerDetection bool } func defaultOptions() options { return options{ prefix: msgPrefix, - jsStreamConfig: defaultJetStreamConfig, + jsStreamConfig: jsStreamConfig, + slowConsumerConfig: &SlowConsumerConfig{ + MaxPendingMsgs: defaultMaxPendingMsgs, + MaxPendingBytes: defaultMaxPendingBytes, + EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking, + EnableSlowConsumerDetection: defaultEnableSlowConsumerDetection, + }, } } @@ -91,8 +90,8 @@ func Prefix(prefix string) messaging.Option { } } -// JSStreamConfig sets the JetStream configuration for the publisher or subscriber. -func JSStreamConfig(jsStreamConfig SMQJetStreamConfig) messaging.Option { +// JSStreamConfig sets the JetStream for the publisher or subscriber. +func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option { return func(val any) error { switch v := val.(type) { case *publisher: @@ -107,30 +106,14 @@ func JSStreamConfig(jsStreamConfig SMQJetStreamConfig) messaging.Option { } } -// WithSlowConsumerConfig sets the slow consumer configuration within the JetStream config. +// WithSlowConsumerConfig sets the slow consumer configuration. func WithSlowConsumerConfig(config SlowConsumerConfig) messaging.Option { return func(val interface{}) error { switch v := val.(type) { case *publisher: - v.jsStreamConfig.SlowConsumer = config - case *pubsub: - v.jsStreamConfig.SlowConsumer = config - default: - return ErrInvalidType - } - - return nil - } -} - -// WithConsumerLimits sets the built-in JetStream consumer limits (MaxAckPending, etc.). -func WithConsumerLimits(limits jetstream.StreamConsumerLimits) messaging.Option { - return func(val interface{}) error { - switch v := val.(type) { - case *publisher: - v.jsStreamConfig.ConsumerLimits = limits + v.slowConsumerConfig = &config case *pubsub: - v.jsStreamConfig.ConsumerLimits = limits + v.slowConsumerConfig = &config default: return ErrInvalidType } diff --git a/pkg/messaging/nats/publisher.go b/pkg/messaging/nats/publisher.go index c6949b3ad0..93317b4f13 100644 --- a/pkg/messaging/nats/publisher.go +++ b/pkg/messaging/nats/publisher.go @@ -55,7 +55,7 @@ func NewPublisher(ctx context.Context, url string, opts ...messaging.Option) (me if err != nil { return nil, err } - if _, err := js.CreateStream(ctx, pub.jsStreamConfig.StreamConfig); err != nil { + if _, err := js.CreateStream(ctx, pub.jsStreamConfig); err != nil { return nil, err } pub.js = js diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index e7caff2f59..8e7d4ff57d 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -62,7 +62,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes if err != nil { return nil, err } - stream, err := js.CreateStream(ctx, ps.jsStreamConfig.StreamConfig) + stream, err := js.CreateStream(ctx, ps.jsStreamConfig) if err != nil { return nil, err } @@ -115,24 +115,20 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) FilterSubject: cfg.Topic, } - // Apply consumer limits from the built-in ConsumerLimits - if ps.jsStreamConfig.ConsumerLimits.MaxAckPending > 0 { - consumerConfig.MaxAckPending = ps.jsStreamConfig.ConsumerLimits.MaxAckPending + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingMsgs > 0 { + consumerConfig.MaxAckPending = ps.slowConsumerConfig.MaxPendingMsgs } - // Apply additional monitoring configuration - monitoring := &ps.jsStreamConfig.SlowConsumer - if monitoring.MaxPendingBytes > 0 { - consumerConfig.MaxRequestMaxBytes = monitoring.MaxPendingBytes + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingBytes > 0 { + consumerConfig.MaxRequestMaxBytes = ps.slowConsumerConfig.MaxPendingBytes } - // Log the applied configuration - if ps.jsStreamConfig.ConsumerLimits.MaxAckPending > 0 { + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingMsgs > 0 { ps.logger.Info("Applied slow consumer throttling to JetStream consumer", slog.String("consumer", consumerConfig.Name), slog.Int("max_ack_pending", consumerConfig.MaxAckPending), slog.Int("max_request_max_bytes", consumerConfig.MaxRequestMaxBytes), - slog.Bool("dropped_msg_tracking", monitoring.EnableDroppedMsgTracking), + slog.Bool("dropped_msg_tracking", ps.slowConsumerConfig.EnableDroppedMsgTracking), ) } @@ -191,7 +187,7 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) func(m jetstream.Msg) slog.Uint64("consumer_seq", meta.Sequence.Consumer), ) - if ps.jsStreamConfig.SlowConsumer.EnableDroppedMsgTracking { + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableDroppedMsgTracking { if meta.NumDelivered > 1 { args = append(args, slog.Uint64("delivery_count", meta.NumDelivered), From e22b596f8bba3d1fd6737ae403f961e6167bd92d Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 21 Aug 2025 14:03:30 +0300 Subject: [PATCH 5/6] remove unused code Signed-off-by: nyagamunene --- pkg/messaging/nats/options.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/pkg/messaging/nats/options.go b/pkg/messaging/nats/options.go index 2a904cf0d8..8c5890564e 100644 --- a/pkg/messaging/nats/options.go +++ b/pkg/messaging/nats/options.go @@ -29,11 +29,10 @@ var ( ) const ( - msgPrefix = "m" - defaultMaxPendingMsgs = 1000 - defaultMaxPendingBytes = 5 * 1024 * 1024 - defaultEnableDroppedMsgTracking = true - defaultEnableSlowConsumerDetection = true + msgPrefix = "m" + defaultMaxPendingMsgs = 1000 + defaultMaxPendingBytes = 5 * 1024 * 1024 + defaultEnableDroppedMsgTracking = true ) type options struct { @@ -45,7 +44,6 @@ type options struct { type SlowConsumerConfig struct { // MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending // Controls the maximum number of outstanding unacknowledged messages - // Also used for slow consumer detection at 70% of this value MaxPendingMsgs int // MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes @@ -55,10 +53,6 @@ type SlowConsumerConfig struct { // EnableDroppedMsgTracking enables logging of message redeliveries // which can indicate slow consumer behavior in JetStream EnableDroppedMsgTracking bool - - // EnableSlowConsumerDetection enables automatic slow consumer detection - // and logging when consumer lag exceeds 70% of MaxPendingMsgs - EnableSlowConsumerDetection bool } func defaultOptions() options { @@ -66,10 +60,9 @@ func defaultOptions() options { prefix: msgPrefix, jsStreamConfig: jsStreamConfig, slowConsumerConfig: &SlowConsumerConfig{ - MaxPendingMsgs: defaultMaxPendingMsgs, - MaxPendingBytes: defaultMaxPendingBytes, - EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking, - EnableSlowConsumerDetection: defaultEnableSlowConsumerDetection, + MaxPendingMsgs: defaultMaxPendingMsgs, + MaxPendingBytes: defaultMaxPendingBytes, + EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking, }, } } From 0f6df078c79fff891a065fe7f64f70f085c9c1de Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Thu, 21 Aug 2025 14:06:57 +0300 Subject: [PATCH 6/6] fix failing linter Signed-off-by: nyagamunene --- pkg/messaging/nats/pubsub.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index 8e7d4ff57d..552c49e2fe 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -270,5 +270,3 @@ func formatConsumerName(topic, id string) string { return fmt.Sprintf("%s-%s", topic, id) } - -