@@ -14,64 +14,63 @@ import (
1414var (
1515 // ErrInvalidType is returned when the provided value is not of the expected type.
1616 ErrInvalidType = errors .New ("invalid type" )
17+
18+ jsStreamConfig = jetstream.StreamConfig {
19+ Name : "m" ,
20+ Description : "SuperMQ stream for sending and receiving messages in between SuperMQ channels" ,
21+ Subjects : []string {"m.>" },
22+ Retention : jetstream .LimitsPolicy ,
23+ MaxMsgsPerSubject : 1e6 ,
24+ MaxAge : time .Hour * 24 ,
25+ MaxMsgSize : 1024 * 1024 ,
26+ Discard : jetstream .DiscardOld ,
27+ Storage : jetstream .FileStorage ,
28+ }
29+ )
30+
31+ const (
32+ msgPrefix = "m"
33+ defaultMaxPendingMsgs = 1000
34+ defaultMaxPendingBytes = 5 * 1024 * 1024
35+ defaultEnableDroppedMsgTracking = true
36+ defaultEnableSlowConsumerDetection = true
1737)
1838
19- // SMQJetStreamConfig extends jetstream.StreamConfig with slow consumer monitoring settings
20- type SMQJetStreamConfig struct {
21- jetstream.StreamConfig
22-
23- SlowConsumer SlowConsumerConfig `json:"slow_consumer,omitempty"`
39+ type options struct {
40+ prefix string
41+ jsStreamConfig jetstream.StreamConfig
42+ slowConsumerConfig * SlowConsumerConfig
2443}
2544
2645type SlowConsumerConfig struct {
46+ // MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending
47+ // Controls the maximum number of outstanding unacknowledged messages
48+ // Also used for slow consumer detection at 70% of this value
49+ MaxPendingMsgs int
50+
2751 // MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes
2852 // Controls the maximum bytes per batch request (closest JetStream equivalent)
29- MaxPendingBytes int `json:"max_pending_bytes,omitempty"`
53+ MaxPendingBytes int
3054
3155 // EnableDroppedMsgTracking enables logging of message redeliveries
3256 // which can indicate slow consumer behavior in JetStream
33- EnableDroppedMsgTracking bool `json:"enable_dropped_msg_tracking,omitempty"`
34- }
57+ EnableDroppedMsgTracking bool
3558
36- var (
37- defaultJetStreamConfig = SMQJetStreamConfig {
38- StreamConfig : jetstream.StreamConfig {
39- Name : "m" ,
40- Description : "SuperMQ stream for sending and receiving messages in between SuperMQ channels" ,
41- Subjects : []string {"m.>" },
42- Retention : jetstream .LimitsPolicy ,
43- MaxMsgsPerSubject : 1e6 ,
44- MaxAge : time .Hour * 24 ,
45- MaxMsgSize : 1024 * 1024 ,
46- Discard : jetstream .DiscardOld ,
47- Storage : jetstream .FileStorage ,
48- ConsumerLimits : jetstream.StreamConsumerLimits {
49- MaxAckPending : defaultMaxPendingMsgs ,
50- },
51- },
52- SlowConsumer : SlowConsumerConfig {
53- MaxPendingBytes : defaultMaxPendingBytes ,
54- EnableDroppedMsgTracking : defaultEnableDroppedMsgTracking ,
55- },
56- }
57- )
58-
59- const (
60- msgPrefix = "m"
61- defaultMaxPendingMsgs = 1000
62- defaultMaxPendingBytes = 5 * 1024 * 1024
63- defaultEnableDroppedMsgTracking = true
64- )
65-
66- type options struct {
67- prefix string
68- jsStreamConfig SMQJetStreamConfig
59+ // EnableSlowConsumerDetection enables automatic slow consumer detection
60+ // and logging when consumer lag exceeds 70% of MaxPendingMsgs
61+ EnableSlowConsumerDetection bool
6962}
7063
7164func defaultOptions () options {
7265 return options {
7366 prefix : msgPrefix ,
74- jsStreamConfig : defaultJetStreamConfig ,
67+ jsStreamConfig : jsStreamConfig ,
68+ slowConsumerConfig : & SlowConsumerConfig {
69+ MaxPendingMsgs : defaultMaxPendingMsgs ,
70+ MaxPendingBytes : defaultMaxPendingBytes ,
71+ EnableDroppedMsgTracking : defaultEnableDroppedMsgTracking ,
72+ EnableSlowConsumerDetection : defaultEnableSlowConsumerDetection ,
73+ },
7574 }
7675}
7776
@@ -91,8 +90,8 @@ func Prefix(prefix string) messaging.Option {
9190 }
9291}
9392
94- // JSStreamConfig sets the JetStream configuration for the publisher or subscriber.
95- func JSStreamConfig (jsStreamConfig SMQJetStreamConfig ) messaging.Option {
93+ // JSStreamConfig sets the JetStream for the publisher or subscriber.
94+ func JSStreamConfig (jsStreamConfig jetstream. StreamConfig ) messaging.Option {
9695 return func (val any ) error {
9796 switch v := val .(type ) {
9897 case * publisher :
@@ -107,30 +106,14 @@ func JSStreamConfig(jsStreamConfig SMQJetStreamConfig) messaging.Option {
107106 }
108107}
109108
110- // WithSlowConsumerConfig sets the slow consumer configuration within the JetStream config .
109+ // WithSlowConsumerConfig sets the slow consumer configuration.
111110func WithSlowConsumerConfig (config SlowConsumerConfig ) messaging.Option {
112111 return func (val interface {}) error {
113112 switch v := val .(type ) {
114113 case * publisher :
115- v .jsStreamConfig .SlowConsumer = config
116- case * pubsub :
117- v .jsStreamConfig .SlowConsumer = config
118- default :
119- return ErrInvalidType
120- }
121-
122- return nil
123- }
124- }
125-
126- // WithConsumerLimits sets the built-in JetStream consumer limits (MaxAckPending, etc.).
127- func WithConsumerLimits (limits jetstream.StreamConsumerLimits ) messaging.Option {
128- return func (val interface {}) error {
129- switch v := val .(type ) {
130- case * publisher :
131- v .jsStreamConfig .ConsumerLimits = limits
114+ v .slowConsumerConfig = & config
132115 case * pubsub :
133- v .jsStreamConfig . ConsumerLimits = limits
116+ v .slowConsumerConfig = & config
134117 default :
135118 return ErrInvalidType
136119 }
0 commit comments