diff --git a/pkg/messaging/nats/options.go b/pkg/messaging/nats/options.go index 7d8ac71e26..8c5890564e 100644 --- a/pkg/messaging/nats/options.go +++ b/pkg/messaging/nats/options.go @@ -28,17 +28,42 @@ var ( } ) -const msgPrefix = "m" +const ( + msgPrefix = "m" + defaultMaxPendingMsgs = 1000 + defaultMaxPendingBytes = 5 * 1024 * 1024 + defaultEnableDroppedMsgTracking = true +) type options struct { - prefix string - jsStreamConfig jetstream.StreamConfig + prefix string + jsStreamConfig jetstream.StreamConfig + slowConsumerConfig *SlowConsumerConfig +} + +type SlowConsumerConfig struct { + // MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending + // Controls the maximum number of outstanding unacknowledged messages + MaxPendingMsgs int + + // MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes + // Controls the maximum bytes per batch request (closest JetStream equivalent) + MaxPendingBytes int + + // EnableDroppedMsgTracking enables logging of message redeliveries + // which can indicate slow consumer behavior in JetStream + EnableDroppedMsgTracking bool } func defaultOptions() options { return options{ prefix: msgPrefix, jsStreamConfig: jsStreamConfig, + slowConsumerConfig: &SlowConsumerConfig{ + MaxPendingMsgs: defaultMaxPendingMsgs, + MaxPendingBytes: defaultMaxPendingBytes, + EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking, + }, } } @@ -73,3 +98,19 @@ func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option { return nil } } + +// WithSlowConsumerConfig sets the slow consumer configuration. +func WithSlowConsumerConfig(config SlowConsumerConfig) messaging.Option { + return func(val interface{}) error { + switch v := val.(type) { + case *publisher: + v.slowConsumerConfig = &config + case *pubsub: + v.slowConsumerConfig = &config + default: + return ErrInvalidType + } + + return nil + } +} diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index ad347f79bb..552c49e2fe 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -52,7 +52,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes } } - conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects)) + conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects), broker.ErrorHandler(ps.natsErrorHandler)) if err != nil { return nil, err } @@ -72,6 +72,30 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes return ps, nil } +func (ps *pubsub) natsErrorHandler(nc *broker.Conn, sub *broker.Subscription, natsErr error) { + ps.logger.Error("NATS error occurred", + slog.String("error", natsErr.Error()), + slog.String("subject", sub.Subject), + ) + + if natsErr == broker.ErrSlowConsumer { + pendingMsgs, pendingBytes, err := sub.Pending() + if err != nil { + ps.logger.Error("couldn't get pending messages for slow consumer", + slog.String("error", err.Error()), + slog.String("subject", sub.Subject), + ) + return + } + + ps.logger.Warn("Slow consumer detected", + slog.String("subject", sub.Subject), + slog.Int("pending_messages", pendingMsgs), + slog.Int("pending_bytes", pendingBytes), + ) + } +} + func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { if cfg.ID == "" { return ErrEmptyID @@ -91,6 +115,23 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) FilterSubject: cfg.Topic, } + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingMsgs > 0 { + consumerConfig.MaxAckPending = ps.slowConsumerConfig.MaxPendingMsgs + } + + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingBytes > 0 { + consumerConfig.MaxRequestMaxBytes = ps.slowConsumerConfig.MaxPendingBytes + } + + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingMsgs > 0 { + ps.logger.Info("Applied slow consumer throttling to JetStream consumer", + slog.String("consumer", consumerConfig.Name), + slog.Int("max_ack_pending", consumerConfig.MaxAckPending), + slog.Int("max_request_max_bytes", consumerConfig.MaxRequestMaxBytes), + slog.Bool("dropped_msg_tracking", ps.slowConsumerConfig.EnableDroppedMsgTracking), + ) + } + if cfg.Ordered { consumerConfig.MaxAckPending = 1 } @@ -145,6 +186,16 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) func(m jetstream.Msg) slog.Uint64("stream_seq", meta.Sequence.Stream), slog.Uint64("consumer_seq", meta.Sequence.Consumer), ) + + if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableDroppedMsgTracking { + if meta.NumDelivered > 1 { + args = append(args, + slog.Uint64("delivery_count", meta.NumDelivered), + slog.String("redelivery_reason", "slow_consumer_or_ack_timeout"), + ) + ps.logger.Warn("Message redelivered (potential slow consumer)", args...) + } + } default: args = append(args, slog.String("metadata_error", err.Error()),