@@ -62,7 +62,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes
6262 if err != nil {
6363 return nil , err
6464 }
65- stream , err := js .CreateStream (ctx , ps .jsStreamConfig )
65+ stream , err := js .CreateStream (ctx , ps .jsStreamConfig . StreamConfig )
6666 if err != nil {
6767 return nil , err
6868 }
@@ -115,19 +115,24 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
115115 FilterSubject : cfg .Topic ,
116116 }
117117
118- if ps .slowConsumerConfig != nil {
119- consumerConfig .MaxAckPending = ps .slowConsumerConfig .MaxPendingMsgs
118+ // Apply consumer limits from the built-in ConsumerLimits
119+ if ps .jsStreamConfig .ConsumerLimits .MaxAckPending > 0 {
120+ consumerConfig .MaxAckPending = ps .jsStreamConfig .ConsumerLimits .MaxAckPending
121+ }
120122
121- if ps .slowConsumerConfig .MaxPendingBytes > 0 {
122- consumerConfig .MaxRequestMaxBytes = ps .slowConsumerConfig .MaxPendingBytes
123- }
123+ // Apply additional monitoring configuration
124+ monitoring := & ps .jsStreamConfig .SlowConsumer
125+ if monitoring .MaxPendingBytes > 0 {
126+ consumerConfig .MaxRequestMaxBytes = monitoring .MaxPendingBytes
127+ }
124128
129+ // Log the applied configuration
130+ if ps .jsStreamConfig .ConsumerLimits .MaxAckPending > 0 {
125131 ps .logger .Info ("Applied slow consumer throttling to JetStream consumer" ,
126132 slog .String ("consumer" , consumerConfig .Name ),
127133 slog .Int ("max_ack_pending" , consumerConfig .MaxAckPending ),
128134 slog .Int ("max_request_max_bytes" , consumerConfig .MaxRequestMaxBytes ),
129- slog .Bool ("dropped_msg_tracking" , ps .slowConsumerConfig .EnableDroppedMsgTracking ),
130- slog .Bool ("slow_consumer_detection" , ps .slowConsumerConfig .EnableSlowConsumerDetection ),
135+ slog .Bool ("dropped_msg_tracking" , monitoring .EnableDroppedMsgTracking ),
131136 )
132137 }
133138
@@ -151,10 +156,6 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
151156 return fmt .Errorf ("failed to consume: %w" , err )
152157 }
153158
154- if ps .slowConsumerConfig != nil && ps .slowConsumerConfig .EnableSlowConsumerDetection {
155- go ps .checkConsumerLag (ctx , consumerConfig .Name )
156- }
157-
158159 return nil
159160}
160161
@@ -190,20 +191,7 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) func(m jetstream.Msg)
190191 slog .Uint64 ("consumer_seq" , meta .Sequence .Consumer ),
191192 )
192193
193- if ps .slowConsumerConfig != nil && ps .slowConsumerConfig .EnableSlowConsumerDetection {
194- sequenceLag := meta .Sequence .Stream - meta .Sequence .Consumer
195- lagThreshold := uint64 (float64 (ps .slowConsumerConfig .MaxPendingMsgs ) * 0.7 )
196- if sequenceLag > lagThreshold {
197- args = append (args ,
198- slog .Uint64 ("sequence_lag" , sequenceLag ),
199- slog .Uint64 ("lag_threshold" , lagThreshold ),
200- slog .String ("slow_consumer_detection" , "lag_threshold_exceeded" ),
201- )
202- ps .logger .Warn ("JetStream slow consumer detected - sequence lag exceeds threshold" , args ... )
203- }
204- }
205-
206- if ps .slowConsumerConfig != nil && ps .slowConsumerConfig .EnableDroppedMsgTracking {
194+ if ps .jsStreamConfig .SlowConsumer .EnableDroppedMsgTracking {
207195 if meta .NumDelivered > 1 {
208196 args = append (args ,
209197 slog .Uint64 ("delivery_count" , meta .NumDelivered ),
@@ -287,40 +275,4 @@ func formatConsumerName(topic, id string) string {
287275 return fmt .Sprintf ("%s-%s" , topic , id )
288276}
289277
290- // checkConsumerLag checks the consumer lag and logs warnings if thresholds are exceeded.
291- // This provides additional slow consumer detection for JetStream consumers.
292- func (ps * pubsub ) checkConsumerLag (ctx context.Context , consumerName string ) {
293- if ps .slowConsumerConfig == nil || ! ps .slowConsumerConfig .EnableSlowConsumerDetection {
294- return
295- }
296-
297- consumer , err := ps .stream .Consumer (ctx , consumerName )
298- if err != nil {
299- ps .logger .Error ("failed to get consumer for lag check" ,
300- slog .String ("consumer" , consumerName ),
301- slog .String ("error" , err .Error ()),
302- )
303- return
304- }
305278
306- info , err := consumer .Info (ctx )
307- if err != nil {
308- ps .logger .Error ("failed to get consumer info for lag check" ,
309- slog .String ("consumer" , consumerName ),
310- slog .String ("error" , err .Error ()),
311- )
312- return
313- }
314-
315- pending := info .NumPending
316- lagThreshold := uint64 (float64 (ps .slowConsumerConfig .MaxPendingMsgs ) * 0.7 )
317- if pending > lagThreshold {
318- ps .logger .Warn ("JetStream consumer lag threshold exceeded" ,
319- slog .String ("consumer" , consumerName ),
320- slog .Uint64 ("pending_messages" , pending ),
321- slog .Int ("ack_pending" , info .NumAckPending ),
322- slog .Int ("redelivered" , info .NumRedelivered ),
323- slog .Uint64 ("threshold" , lagThreshold ),
324- )
325- }
326- }
0 commit comments