Skip to content

Commit c1c2d86

Browse files
committed
initial implementation
Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
1 parent 4927333 commit c1c2d86

File tree

2 files changed

+150
-3
lines changed

2 files changed

+150
-3
lines changed

pkg/messaging/nats/options.go

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,40 @@ var (
3131
const msgPrefix = "m"
3232

3333
type options struct {
34-
prefix string
35-
jsStreamConfig jetstream.StreamConfig
34+
prefix string
35+
jsStreamConfig jetstream.StreamConfig
36+
slowConsumerConfig *SlowConsumerConfig
37+
}
38+
39+
type SlowConsumerConfig struct {
40+
// MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending
41+
// Controls the maximum number of outstanding unacknowledged messages
42+
// Also used for slow consumer detection at 70% of this value
43+
MaxPendingMsgs int
44+
45+
// MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes
46+
// Controls the maximum bytes per batch request (closest JetStream equivalent)
47+
MaxPendingBytes int
48+
49+
// EnableDroppedMsgTracking enables logging of message redeliveries
50+
// which can indicate slow consumer behavior in JetStream
51+
EnableDroppedMsgTracking bool
52+
53+
// EnableSlowConsumerDetection enables automatic slow consumer detection
54+
// and logging when consumer lag exceeds 70% of MaxPendingMsgs
55+
EnableSlowConsumerDetection bool
3656
}
3757

3858
func defaultOptions() options {
3959
return options{
4060
prefix: msgPrefix,
4161
jsStreamConfig: jsStreamConfig,
62+
slowConsumerConfig: &SlowConsumerConfig{
63+
MaxPendingMsgs: 1000,
64+
MaxPendingBytes: 5 * 1024 * 1024,
65+
EnableDroppedMsgTracking: true,
66+
EnableSlowConsumerDetection: true, // Will warn at 70% of MaxPendingMsgs (700 messages)
67+
},
4268
}
4369
}
4470

@@ -73,3 +99,19 @@ func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option {
7399
return nil
74100
}
75101
}
102+
103+
// WithSlowConsumerConfig sets the slow consumer configuration.
104+
func WithSlowConsumerConfig(config SlowConsumerConfig) messaging.Option {
105+
return func(val interface{}) error {
106+
switch v := val.(type) {
107+
case *publisher:
108+
v.slowConsumerConfig = &config
109+
case *pubsub:
110+
v.slowConsumerConfig = &config
111+
default:
112+
return ErrInvalidType
113+
}
114+
115+
return nil
116+
}
117+
}

pkg/messaging/nats/pubsub.go

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes
5252
}
5353
}
5454

55-
conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects))
55+
conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects), broker.ErrorHandler(ps.natsErrorHandler))
5656
if err != nil {
5757
return nil, err
5858
}
@@ -72,6 +72,30 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes
7272
return ps, nil
7373
}
7474

75+
func (ps *pubsub) natsErrorHandler(nc *broker.Conn, sub *broker.Subscription, natsErr error) {
76+
ps.logger.Error("NATS error occurred",
77+
slog.String("error", natsErr.Error()),
78+
slog.String("subject", sub.Subject),
79+
)
80+
81+
if natsErr == broker.ErrSlowConsumer {
82+
pendingMsgs, pendingBytes, err := sub.Pending()
83+
if err != nil {
84+
ps.logger.Error("couldn't get pending messages for slow consumer",
85+
slog.String("error", err.Error()),
86+
slog.String("subject", sub.Subject),
87+
)
88+
return
89+
}
90+
91+
ps.logger.Warn("Slow consumer detected",
92+
slog.String("subject", sub.Subject),
93+
slog.Int("pending_messages", pendingMsgs),
94+
slog.Int("pending_bytes", pendingBytes),
95+
)
96+
}
97+
}
98+
7599
func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error {
76100
if cfg.ID == "" {
77101
return ErrEmptyID
@@ -91,6 +115,22 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
91115
FilterSubject: cfg.Topic,
92116
}
93117

118+
if ps.slowConsumerConfig != nil {
119+
consumerConfig.MaxAckPending = ps.slowConsumerConfig.MaxPendingMsgs
120+
121+
if ps.slowConsumerConfig.MaxPendingBytes > 0 {
122+
consumerConfig.MaxRequestMaxBytes = ps.slowConsumerConfig.MaxPendingBytes
123+
}
124+
125+
ps.logger.Info("Applied slow consumer throttling to JetStream consumer",
126+
slog.String("consumer", consumerConfig.Name),
127+
slog.Int("max_ack_pending", consumerConfig.MaxAckPending),
128+
slog.Int("max_request_max_bytes", consumerConfig.MaxRequestMaxBytes),
129+
slog.Bool("dropped_msg_tracking", ps.slowConsumerConfig.EnableDroppedMsgTracking),
130+
slog.Bool("slow_consumer_detection", ps.slowConsumerConfig.EnableSlowConsumerDetection),
131+
)
132+
}
133+
94134
if cfg.Ordered {
95135
consumerConfig.MaxAckPending = 1
96136
}
@@ -111,6 +151,10 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
111151
return fmt.Errorf("failed to consume: %w", err)
112152
}
113153

154+
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableSlowConsumerDetection {
155+
go ps.checkConsumerLag(ctx, consumerConfig.Name)
156+
}
157+
114158
return nil
115159
}
116160

@@ -145,6 +189,29 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) func(m jetstream.Msg)
145189
slog.Uint64("stream_seq", meta.Sequence.Stream),
146190
slog.Uint64("consumer_seq", meta.Sequence.Consumer),
147191
)
192+
193+
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableSlowConsumerDetection {
194+
sequenceLag := meta.Sequence.Stream - meta.Sequence.Consumer
195+
lagThreshold := uint64(float64(ps.slowConsumerConfig.MaxPendingMsgs) * 0.7)
196+
if sequenceLag > lagThreshold {
197+
args = append(args,
198+
slog.Uint64("sequence_lag", sequenceLag),
199+
slog.Uint64("lag_threshold", lagThreshold),
200+
slog.String("slow_consumer_detection", "lag_threshold_exceeded"),
201+
)
202+
ps.logger.Warn("JetStream slow consumer detected - sequence lag exceeds threshold", args...)
203+
}
204+
}
205+
206+
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableDroppedMsgTracking {
207+
if meta.NumDelivered > 1 {
208+
args = append(args,
209+
slog.Uint64("delivery_count", meta.NumDelivered),
210+
slog.String("redelivery_reason", "slow_consumer_or_ack_timeout"),
211+
)
212+
ps.logger.Warn("Message redelivered (potential slow consumer)", args...)
213+
}
214+
}
148215
default:
149216
args = append(args,
150217
slog.String("metadata_error", err.Error()),
@@ -219,3 +286,41 @@ func formatConsumerName(topic, id string) string {
219286

220287
return fmt.Sprintf("%s-%s", topic, id)
221288
}
289+
290+
// checkConsumerLag checks the consumer lag and logs warnings if thresholds are exceeded.
291+
// This provides additional slow consumer detection for JetStream consumers.
292+
func (ps *pubsub) checkConsumerLag(ctx context.Context, consumerName string) {
293+
if ps.slowConsumerConfig == nil || !ps.slowConsumerConfig.EnableSlowConsumerDetection {
294+
return
295+
}
296+
297+
consumer, err := ps.stream.Consumer(ctx, consumerName)
298+
if err != nil {
299+
ps.logger.Error("failed to get consumer for lag check",
300+
slog.String("consumer", consumerName),
301+
slog.String("error", err.Error()),
302+
)
303+
return
304+
}
305+
306+
info, err := consumer.Info(ctx)
307+
if err != nil {
308+
ps.logger.Error("failed to get consumer info for lag check",
309+
slog.String("consumer", consumerName),
310+
slog.String("error", err.Error()),
311+
)
312+
return
313+
}
314+
315+
pending := info.NumPending
316+
lagThreshold := uint64(float64(ps.slowConsumerConfig.MaxPendingMsgs) * 0.7)
317+
if pending > lagThreshold {
318+
ps.logger.Warn("JetStream consumer lag threshold exceeded",
319+
slog.String("consumer", consumerName),
320+
slog.Uint64("pending_messages", pending),
321+
slog.Int("ack_pending", info.NumAckPending),
322+
slog.Int("redelivered", info.NumRedelivered),
323+
slog.Uint64("threshold", lagThreshold),
324+
)
325+
}
326+
}

0 commit comments

Comments
 (0)