Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions pkg/messaging/nats/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,42 @@ var (
}
)

const msgPrefix = "m"
const (
msgPrefix = "m"
defaultMaxPendingMsgs = 1000
defaultMaxPendingBytes = 5 * 1024 * 1024
defaultEnableDroppedMsgTracking = true
)

type options struct {
prefix string
jsStreamConfig jetstream.StreamConfig
prefix string
jsStreamConfig jetstream.StreamConfig
slowConsumerConfig *SlowConsumerConfig
}

type SlowConsumerConfig struct {
// MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending
// Controls the maximum number of outstanding unacknowledged messages
MaxPendingMsgs int

// MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes
// Controls the maximum bytes per batch request (closest JetStream equivalent)
MaxPendingBytes int

// EnableDroppedMsgTracking enables logging of message redeliveries
// which can indicate slow consumer behavior in JetStream
EnableDroppedMsgTracking bool
}

func defaultOptions() options {
return options{
prefix: msgPrefix,
jsStreamConfig: jsStreamConfig,
slowConsumerConfig: &SlowConsumerConfig{
MaxPendingMsgs: defaultMaxPendingMsgs,
MaxPendingBytes: defaultMaxPendingBytes,
EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking,
},
}
}

Expand Down Expand Up @@ -73,3 +98,19 @@ func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option {
return nil
}
}

// WithSlowConsumerConfig sets the slow consumer configuration.
func WithSlowConsumerConfig(config SlowConsumerConfig) messaging.Option {
return func(val interface{}) error {
switch v := val.(type) {
case *publisher:
v.slowConsumerConfig = &config
case *pubsub:
v.slowConsumerConfig = &config
default:
return ErrInvalidType
}

return nil
}
}
53 changes: 52 additions & 1 deletion pkg/messaging/nats/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes
}
}

conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects))
conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects), broker.ErrorHandler(ps.natsErrorHandler))
if err != nil {
return nil, err
}
Expand All @@ -72,6 +72,30 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes
return ps, nil
}

func (ps *pubsub) natsErrorHandler(nc *broker.Conn, sub *broker.Subscription, natsErr error) {
ps.logger.Error("NATS error occurred",
slog.String("error", natsErr.Error()),
slog.String("subject", sub.Subject),
)

if natsErr == broker.ErrSlowConsumer {
pendingMsgs, pendingBytes, err := sub.Pending()
if err != nil {
ps.logger.Error("couldn't get pending messages for slow consumer",
slog.String("error", err.Error()),
slog.String("subject", sub.Subject),
)
return
}

ps.logger.Warn("Slow consumer detected",
slog.String("subject", sub.Subject),
slog.Int("pending_messages", pendingMsgs),
slog.Int("pending_bytes", pendingBytes),
)
}
}

func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error {
if cfg.ID == "" {
return ErrEmptyID
Expand All @@ -91,6 +115,23 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
FilterSubject: cfg.Topic,
}

if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingMsgs > 0 {
consumerConfig.MaxAckPending = ps.slowConsumerConfig.MaxPendingMsgs
}

if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingBytes > 0 {
consumerConfig.MaxRequestMaxBytes = ps.slowConsumerConfig.MaxPendingBytes
}

if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingMsgs > 0 {
ps.logger.Info("Applied slow consumer throttling to JetStream consumer",
slog.String("consumer", consumerConfig.Name),
slog.Int("max_ack_pending", consumerConfig.MaxAckPending),
slog.Int("max_request_max_bytes", consumerConfig.MaxRequestMaxBytes),
slog.Bool("dropped_msg_tracking", ps.slowConsumerConfig.EnableDroppedMsgTracking),
)
}

if cfg.Ordered {
consumerConfig.MaxAckPending = 1
}
Expand Down Expand Up @@ -145,6 +186,16 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) func(m jetstream.Msg)
slog.Uint64("stream_seq", meta.Sequence.Stream),
slog.Uint64("consumer_seq", meta.Sequence.Consumer),
)

if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableDroppedMsgTracking {
if meta.NumDelivered > 1 {
args = append(args,
slog.Uint64("delivery_count", meta.NumDelivered),
slog.String("redelivery_reason", "slow_consumer_or_ack_timeout"),
)
ps.logger.Warn("Message redelivered (potential slow consumer)", args...)
}
}
default:
args = append(args,
slog.String("metadata_error", err.Error()),
Expand Down
Loading