Skip to content

Commit 82a3a23

Browse files
committed
address comments
Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
1 parent 0e8b57d commit 82a3a23

File tree

4 files changed

+90
-119
lines changed

4 files changed

+90
-119
lines changed

pkg/events/nats/subscriber.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,18 @@ var _ events.Subscriber = (*subEventStore)(nil)
2222
var (
2323
eventsPrefix = "events"
2424

25-
jsStreamConfig = jetstream.StreamConfig{
26-
Name: "events",
27-
Description: "SuperMQ stream for sending and receiving messages in between SuperMQ events",
28-
Subjects: []string{"events.>"},
29-
Retention: jetstream.LimitsPolicy,
30-
MaxMsgsPerSubject: 1e9,
31-
MaxAge: time.Hour * 24,
32-
MaxMsgSize: 1024 * 1024,
33-
Discard: jetstream.DiscardOld,
34-
Storage: jetstream.FileStorage,
25+
jsStreamConfig = broker.SMQJetStreamConfig{
26+
StreamConfig: jetstream.StreamConfig{
27+
Name: "events",
28+
Description: "SuperMQ stream for sending and receiving messages in between SuperMQ events",
29+
Subjects: []string{"events.>"},
30+
Retention: jetstream.LimitsPolicy,
31+
MaxMsgsPerSubject: 1e9,
32+
MaxAge: time.Hour * 24,
33+
MaxMsgSize: 1024 * 1024,
34+
Discard: jetstream.DiscardOld,
35+
Storage: jetstream.FileStorage,
36+
},
3537
}
3638

3739
// ErrEmptyStream is returned when stream name is empty.

pkg/messaging/nats/options.go

Lines changed: 63 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -14,63 +14,64 @@ import (
1414
var (
1515
// ErrInvalidType is returned when the provided value is not of the expected type.
1616
ErrInvalidType = errors.New("invalid type")
17-
18-
jsStreamConfig = jetstream.StreamConfig{
19-
Name: "m",
20-
Description: "SuperMQ stream for sending and receiving messages in between SuperMQ channels",
21-
Subjects: []string{"m.>"},
22-
Retention: jetstream.LimitsPolicy,
23-
MaxMsgsPerSubject: 1e6,
24-
MaxAge: time.Hour * 24,
25-
MaxMsgSize: 1024 * 1024,
26-
Discard: jetstream.DiscardOld,
27-
Storage: jetstream.FileStorage,
28-
}
29-
)
30-
31-
const (
32-
msgPrefix = "m"
33-
defaultMaxPendingMsgs = 1000
34-
defaultMaxPendingBytes = 5 * 1024 * 1024
35-
defaultEnableDroppedMsgTracking = true
36-
defaultEnableSlowConsumerDetection = true
3717
)
3818

39-
type options struct {
40-
prefix string
41-
jsStreamConfig jetstream.StreamConfig
42-
slowConsumerConfig *SlowConsumerConfig
19+
// SMQJetStreamConfig extends jetstream.StreamConfig with slow consumer monitoring settings
20+
type SMQJetStreamConfig struct {
21+
jetstream.StreamConfig
22+
23+
SlowConsumer SlowConsumerConfig `json:"slow_consumer,omitempty"`
4324
}
4425

4526
type SlowConsumerConfig struct {
46-
// MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending
47-
// Controls the maximum number of outstanding unacknowledged messages
48-
// Also used for slow consumer detection at 70% of this value
49-
MaxPendingMsgs int
50-
5127
// MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes
5228
// Controls the maximum bytes per batch request (closest JetStream equivalent)
53-
MaxPendingBytes int
29+
MaxPendingBytes int `json:"max_pending_bytes,omitempty"`
5430

5531
// EnableDroppedMsgTracking enables logging of message redeliveries
5632
// which can indicate slow consumer behavior in JetStream
57-
EnableDroppedMsgTracking bool
33+
EnableDroppedMsgTracking bool `json:"enable_dropped_msg_tracking,omitempty"`
34+
}
5835

59-
// EnableSlowConsumerDetection enables automatic slow consumer detection
60-
// and logging when consumer lag exceeds 70% of MaxPendingMsgs
61-
EnableSlowConsumerDetection bool
36+
var (
37+
defaultJetStreamConfig = SMQJetStreamConfig{
38+
StreamConfig: jetstream.StreamConfig{
39+
Name: "m",
40+
Description: "SuperMQ stream for sending and receiving messages in between SuperMQ channels",
41+
Subjects: []string{"m.>"},
42+
Retention: jetstream.LimitsPolicy,
43+
MaxMsgsPerSubject: 1e6,
44+
MaxAge: time.Hour * 24,
45+
MaxMsgSize: 1024 * 1024,
46+
Discard: jetstream.DiscardOld,
47+
Storage: jetstream.FileStorage,
48+
ConsumerLimits: jetstream.StreamConsumerLimits{
49+
MaxAckPending: defaultMaxPendingMsgs,
50+
},
51+
},
52+
SlowConsumer: SlowConsumerConfig{
53+
MaxPendingBytes: defaultMaxPendingBytes,
54+
EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking,
55+
},
56+
}
57+
)
58+
59+
const (
60+
msgPrefix = "m"
61+
defaultMaxPendingMsgs = 1000
62+
defaultMaxPendingBytes = 5 * 1024 * 1024
63+
defaultEnableDroppedMsgTracking = true
64+
)
65+
66+
type options struct {
67+
prefix string
68+
jsStreamConfig SMQJetStreamConfig
6269
}
6370

6471
func defaultOptions() options {
6572
return options{
6673
prefix: msgPrefix,
67-
jsStreamConfig: jsStreamConfig,
68-
slowConsumerConfig: &SlowConsumerConfig{
69-
MaxPendingMsgs: defaultMaxPendingMsgs,
70-
MaxPendingBytes: defaultMaxPendingBytes,
71-
EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking,
72-
EnableSlowConsumerDetection: defaultEnableSlowConsumerDetection,
73-
},
74+
jsStreamConfig: defaultJetStreamConfig,
7475
}
7576
}
7677

@@ -90,8 +91,8 @@ func Prefix(prefix string) messaging.Option {
9091
}
9192
}
9293

93-
// JSStreamConfig sets the JetStream for the publisher or subscriber.
94-
func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option {
94+
// JSStreamConfig sets the JetStream configuration for the publisher or subscriber.
95+
func JSStreamConfig(jsStreamConfig SMQJetStreamConfig) messaging.Option {
9596
return func(val any) error {
9697
switch v := val.(type) {
9798
case *publisher:
@@ -106,14 +107,30 @@ func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option {
106107
}
107108
}
108109

109-
// WithSlowConsumerConfig sets the slow consumer configuration.
110+
// WithSlowConsumerConfig sets the slow consumer configuration within the JetStream config.
110111
func WithSlowConsumerConfig(config SlowConsumerConfig) messaging.Option {
111112
return func(val interface{}) error {
112113
switch v := val.(type) {
113114
case *publisher:
114-
v.slowConsumerConfig = &config
115+
v.jsStreamConfig.SlowConsumer = config
116+
case *pubsub:
117+
v.jsStreamConfig.SlowConsumer = config
118+
default:
119+
return ErrInvalidType
120+
}
121+
122+
return nil
123+
}
124+
}
125+
126+
// WithConsumerLimits sets the built-in JetStream consumer limits (MaxAckPending, etc.).
127+
func WithConsumerLimits(limits jetstream.StreamConsumerLimits) messaging.Option {
128+
return func(val interface{}) error {
129+
switch v := val.(type) {
130+
case *publisher:
131+
v.jsStreamConfig.ConsumerLimits = limits
115132
case *pubsub:
116-
v.slowConsumerConfig = &config
133+
v.jsStreamConfig.ConsumerLimits = limits
117134
default:
118135
return ErrInvalidType
119136
}

pkg/messaging/nats/publisher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func NewPublisher(ctx context.Context, url string, opts ...messaging.Option) (me
5555
if err != nil {
5656
return nil, err
5757
}
58-
if _, err := js.CreateStream(ctx, pub.jsStreamConfig); err != nil {
58+
if _, err := js.CreateStream(ctx, pub.jsStreamConfig.StreamConfig); err != nil {
5959
return nil, err
6060
}
6161
pub.js = js

pkg/messaging/nats/pubsub.go

Lines changed: 14 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes
6262
if err != nil {
6363
return nil, err
6464
}
65-
stream, err := js.CreateStream(ctx, ps.jsStreamConfig)
65+
stream, err := js.CreateStream(ctx, ps.jsStreamConfig.StreamConfig)
6666
if err != nil {
6767
return nil, err
6868
}
@@ -115,19 +115,24 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
115115
FilterSubject: cfg.Topic,
116116
}
117117

118-
if ps.slowConsumerConfig != nil {
119-
consumerConfig.MaxAckPending = ps.slowConsumerConfig.MaxPendingMsgs
118+
// Apply consumer limits from the built-in ConsumerLimits
119+
if ps.jsStreamConfig.ConsumerLimits.MaxAckPending > 0 {
120+
consumerConfig.MaxAckPending = ps.jsStreamConfig.ConsumerLimits.MaxAckPending
121+
}
120122

121-
if ps.slowConsumerConfig.MaxPendingBytes > 0 {
122-
consumerConfig.MaxRequestMaxBytes = ps.slowConsumerConfig.MaxPendingBytes
123-
}
123+
// Apply additional monitoring configuration
124+
monitoring := &ps.jsStreamConfig.SlowConsumer
125+
if monitoring.MaxPendingBytes > 0 {
126+
consumerConfig.MaxRequestMaxBytes = monitoring.MaxPendingBytes
127+
}
124128

129+
// Log the applied configuration
130+
if ps.jsStreamConfig.ConsumerLimits.MaxAckPending > 0 {
125131
ps.logger.Info("Applied slow consumer throttling to JetStream consumer",
126132
slog.String("consumer", consumerConfig.Name),
127133
slog.Int("max_ack_pending", consumerConfig.MaxAckPending),
128134
slog.Int("max_request_max_bytes", consumerConfig.MaxRequestMaxBytes),
129-
slog.Bool("dropped_msg_tracking", ps.slowConsumerConfig.EnableDroppedMsgTracking),
130-
slog.Bool("slow_consumer_detection", ps.slowConsumerConfig.EnableSlowConsumerDetection),
135+
slog.Bool("dropped_msg_tracking", monitoring.EnableDroppedMsgTracking),
131136
)
132137
}
133138

@@ -151,10 +156,6 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
151156
return fmt.Errorf("failed to consume: %w", err)
152157
}
153158

154-
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableSlowConsumerDetection {
155-
go ps.checkConsumerLag(ctx, consumerConfig.Name)
156-
}
157-
158159
return nil
159160
}
160161

@@ -190,20 +191,7 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) func(m jetstream.Msg)
190191
slog.Uint64("consumer_seq", meta.Sequence.Consumer),
191192
)
192193

193-
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableSlowConsumerDetection {
194-
sequenceLag := meta.Sequence.Stream - meta.Sequence.Consumer
195-
lagThreshold := uint64(float64(ps.slowConsumerConfig.MaxPendingMsgs) * 0.7)
196-
if sequenceLag > lagThreshold {
197-
args = append(args,
198-
slog.Uint64("sequence_lag", sequenceLag),
199-
slog.Uint64("lag_threshold", lagThreshold),
200-
slog.String("slow_consumer_detection", "lag_threshold_exceeded"),
201-
)
202-
ps.logger.Warn("JetStream slow consumer detected - sequence lag exceeds threshold", args...)
203-
}
204-
}
205-
206-
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableDroppedMsgTracking {
194+
if ps.jsStreamConfig.SlowConsumer.EnableDroppedMsgTracking {
207195
if meta.NumDelivered > 1 {
208196
args = append(args,
209197
slog.Uint64("delivery_count", meta.NumDelivered),
@@ -287,40 +275,4 @@ func formatConsumerName(topic, id string) string {
287275
return fmt.Sprintf("%s-%s", topic, id)
288276
}
289277

290-
// checkConsumerLag checks the consumer lag and logs warnings if thresholds are exceeded.
291-
// This provides additional slow consumer detection for JetStream consumers.
292-
func (ps *pubsub) checkConsumerLag(ctx context.Context, consumerName string) {
293-
if ps.slowConsumerConfig == nil || !ps.slowConsumerConfig.EnableSlowConsumerDetection {
294-
return
295-
}
296-
297-
consumer, err := ps.stream.Consumer(ctx, consumerName)
298-
if err != nil {
299-
ps.logger.Error("failed to get consumer for lag check",
300-
slog.String("consumer", consumerName),
301-
slog.String("error", err.Error()),
302-
)
303-
return
304-
}
305278

306-
info, err := consumer.Info(ctx)
307-
if err != nil {
308-
ps.logger.Error("failed to get consumer info for lag check",
309-
slog.String("consumer", consumerName),
310-
slog.String("error", err.Error()),
311-
)
312-
return
313-
}
314-
315-
pending := info.NumPending
316-
lagThreshold := uint64(float64(ps.slowConsumerConfig.MaxPendingMsgs) * 0.7)
317-
if pending > lagThreshold {
318-
ps.logger.Warn("JetStream consumer lag threshold exceeded",
319-
slog.String("consumer", consumerName),
320-
slog.Uint64("pending_messages", pending),
321-
slog.Int("ack_pending", info.NumAckPending),
322-
slog.Int("redelivered", info.NumRedelivered),
323-
slog.Uint64("threshold", lagThreshold),
324-
)
325-
}
326-
}

0 commit comments

Comments
 (0)