Skip to content

Commit d8314f0

Browse files
nyagamunenedborovcanin
authored andcommitted
fix struct variable
Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
1 parent 59918fa commit d8314f0

File tree

4 files changed

+65
-88
lines changed

4 files changed

+65
-88
lines changed

pkg/events/nats/subscriber.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,16 @@ var _ events.Subscriber = (*subEventStore)(nil)
2222
var (
2323
eventsPrefix = "events"
2424

25-
jsStreamConfig = broker.SMQJetStreamConfig{
26-
StreamConfig: jetstream.StreamConfig{
27-
Name: "events",
28-
Description: "SuperMQ stream for sending and receiving messages in between SuperMQ events",
29-
Subjects: []string{"events.>"},
30-
Retention: jetstream.LimitsPolicy,
31-
MaxMsgsPerSubject: 1e9,
32-
MaxAge: time.Hour * 24,
33-
MaxMsgSize: 1024 * 1024,
34-
Discard: jetstream.DiscardOld,
35-
Storage: jetstream.FileStorage,
36-
},
25+
jsStreamConfig = jetstream.StreamConfig{
26+
Name: "events",
27+
Description: "SuperMQ stream for sending and receiving messages in between SuperMQ events",
28+
Subjects: []string{"events.>"},
29+
Retention: jetstream.LimitsPolicy,
30+
MaxMsgsPerSubject: 1e9,
31+
MaxAge: time.Hour * 24,
32+
MaxMsgSize: 1024 * 1024,
33+
Discard: jetstream.DiscardOld,
34+
Storage: jetstream.FileStorage,
3735
}
3836

3937
// ErrEmptyStream is returned when stream name is empty.

pkg/messaging/nats/options.go

Lines changed: 46 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -14,64 +14,63 @@ import (
1414
var (
1515
// ErrInvalidType is returned when the provided value is not of the expected type.
1616
ErrInvalidType = errors.New("invalid type")
17+
18+
jsStreamConfig = jetstream.StreamConfig{
19+
Name: "m",
20+
Description: "SuperMQ stream for sending and receiving messages in between SuperMQ channels",
21+
Subjects: []string{"m.>"},
22+
Retention: jetstream.LimitsPolicy,
23+
MaxMsgsPerSubject: 1e6,
24+
MaxAge: time.Hour * 24,
25+
MaxMsgSize: 1024 * 1024,
26+
Discard: jetstream.DiscardOld,
27+
Storage: jetstream.FileStorage,
28+
}
29+
)
30+
31+
const (
32+
msgPrefix = "m"
33+
defaultMaxPendingMsgs = 1000
34+
defaultMaxPendingBytes = 5 * 1024 * 1024
35+
defaultEnableDroppedMsgTracking = true
36+
defaultEnableSlowConsumerDetection = true
1737
)
1838

19-
// SMQJetStreamConfig extends jetstream.StreamConfig with slow consumer monitoring settings
20-
type SMQJetStreamConfig struct {
21-
jetstream.StreamConfig
22-
23-
SlowConsumer SlowConsumerConfig `json:"slow_consumer,omitempty"`
39+
type options struct {
40+
prefix string
41+
jsStreamConfig jetstream.StreamConfig
42+
slowConsumerConfig *SlowConsumerConfig
2443
}
2544

2645
type SlowConsumerConfig struct {
46+
// MaxPendingMsgs maps to JetStream ConsumerConfig.MaxAckPending
47+
// Controls the maximum number of outstanding unacknowledged messages
48+
// Also used for slow consumer detection at 70% of this value
49+
MaxPendingMsgs int
50+
2751
// MaxPendingBytes maps to JetStream ConsumerConfig.MaxRequestMaxBytes
2852
// Controls the maximum bytes per batch request (closest JetStream equivalent)
29-
MaxPendingBytes int `json:"max_pending_bytes,omitempty"`
53+
MaxPendingBytes int
3054

3155
// EnableDroppedMsgTracking enables logging of message redeliveries
3256
// which can indicate slow consumer behavior in JetStream
33-
EnableDroppedMsgTracking bool `json:"enable_dropped_msg_tracking,omitempty"`
34-
}
57+
EnableDroppedMsgTracking bool
3558

36-
var (
37-
defaultJetStreamConfig = SMQJetStreamConfig{
38-
StreamConfig: jetstream.StreamConfig{
39-
Name: "m",
40-
Description: "SuperMQ stream for sending and receiving messages in between SuperMQ channels",
41-
Subjects: []string{"m.>"},
42-
Retention: jetstream.LimitsPolicy,
43-
MaxMsgsPerSubject: 1e6,
44-
MaxAge: time.Hour * 24,
45-
MaxMsgSize: 1024 * 1024,
46-
Discard: jetstream.DiscardOld,
47-
Storage: jetstream.FileStorage,
48-
ConsumerLimits: jetstream.StreamConsumerLimits{
49-
MaxAckPending: defaultMaxPendingMsgs,
50-
},
51-
},
52-
SlowConsumer: SlowConsumerConfig{
53-
MaxPendingBytes: defaultMaxPendingBytes,
54-
EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking,
55-
},
56-
}
57-
)
58-
59-
const (
60-
msgPrefix = "m"
61-
defaultMaxPendingMsgs = 1000
62-
defaultMaxPendingBytes = 5 * 1024 * 1024
63-
defaultEnableDroppedMsgTracking = true
64-
)
65-
66-
type options struct {
67-
prefix string
68-
jsStreamConfig SMQJetStreamConfig
59+
// EnableSlowConsumerDetection enables automatic slow consumer detection
60+
// and logging when consumer lag exceeds 70% of MaxPendingMsgs
61+
EnableSlowConsumerDetection bool
6962
}
7063

7164
func defaultOptions() options {
7265
return options{
7366
prefix: msgPrefix,
74-
jsStreamConfig: defaultJetStreamConfig,
67+
jsStreamConfig: jsStreamConfig,
68+
slowConsumerConfig: &SlowConsumerConfig{
69+
MaxPendingMsgs: defaultMaxPendingMsgs,
70+
MaxPendingBytes: defaultMaxPendingBytes,
71+
EnableDroppedMsgTracking: defaultEnableDroppedMsgTracking,
72+
EnableSlowConsumerDetection: defaultEnableSlowConsumerDetection,
73+
},
7574
}
7675
}
7776

@@ -91,8 +90,8 @@ func Prefix(prefix string) messaging.Option {
9190
}
9291
}
9392

94-
// JSStreamConfig sets the JetStream configuration for the publisher or subscriber.
95-
func JSStreamConfig(jsStreamConfig SMQJetStreamConfig) messaging.Option {
93+
// JSStreamConfig sets the JetStream for the publisher or subscriber.
94+
func JSStreamConfig(jsStreamConfig jetstream.StreamConfig) messaging.Option {
9695
return func(val interface{}) error {
9796
switch v := val.(type) {
9897
case *publisher:
@@ -107,30 +106,14 @@ func JSStreamConfig(jsStreamConfig SMQJetStreamConfig) messaging.Option {
107106
}
108107
}
109108

110-
// WithSlowConsumerConfig sets the slow consumer configuration within the JetStream config.
109+
// WithSlowConsumerConfig sets the slow consumer configuration.
111110
func WithSlowConsumerConfig(config SlowConsumerConfig) messaging.Option {
112111
return func(val interface{}) error {
113112
switch v := val.(type) {
114113
case *publisher:
115-
v.jsStreamConfig.SlowConsumer = config
116-
case *pubsub:
117-
v.jsStreamConfig.SlowConsumer = config
118-
default:
119-
return ErrInvalidType
120-
}
121-
122-
return nil
123-
}
124-
}
125-
126-
// WithConsumerLimits sets the built-in JetStream consumer limits (MaxAckPending, etc.).
127-
func WithConsumerLimits(limits jetstream.StreamConsumerLimits) messaging.Option {
128-
return func(val interface{}) error {
129-
switch v := val.(type) {
130-
case *publisher:
131-
v.jsStreamConfig.ConsumerLimits = limits
114+
v.slowConsumerConfig = &config
132115
case *pubsub:
133-
v.jsStreamConfig.ConsumerLimits = limits
116+
v.slowConsumerConfig = &config
134117
default:
135118
return ErrInvalidType
136119
}

pkg/messaging/nats/publisher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func NewPublisher(ctx context.Context, url string, opts ...messaging.Option) (me
5555
if err != nil {
5656
return nil, err
5757
}
58-
if _, err := js.CreateStream(ctx, pub.jsStreamConfig.StreamConfig); err != nil {
58+
if _, err := js.CreateStream(ctx, pub.jsStreamConfig); err != nil {
5959
return nil, err
6060
}
6161
pub.js = js

pkg/messaging/nats/pubsub.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger, opts ...mes
6262
if err != nil {
6363
return nil, err
6464
}
65-
stream, err := js.CreateStream(ctx, ps.jsStreamConfig.StreamConfig)
65+
stream, err := js.CreateStream(ctx, ps.jsStreamConfig)
6666
if err != nil {
6767
return nil, err
6868
}
@@ -115,24 +115,20 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
115115
FilterSubject: cfg.Topic,
116116
}
117117

118-
// Apply consumer limits from the built-in ConsumerLimits
119-
if ps.jsStreamConfig.ConsumerLimits.MaxAckPending > 0 {
120-
consumerConfig.MaxAckPending = ps.jsStreamConfig.ConsumerLimits.MaxAckPending
118+
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingMsgs > 0 {
119+
consumerConfig.MaxAckPending = ps.slowConsumerConfig.MaxPendingMsgs
121120
}
122121

123-
// Apply additional monitoring configuration
124-
monitoring := &ps.jsStreamConfig.SlowConsumer
125-
if monitoring.MaxPendingBytes > 0 {
126-
consumerConfig.MaxRequestMaxBytes = monitoring.MaxPendingBytes
122+
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingBytes > 0 {
123+
consumerConfig.MaxRequestMaxBytes = ps.slowConsumerConfig.MaxPendingBytes
127124
}
128125

129-
// Log the applied configuration
130-
if ps.jsStreamConfig.ConsumerLimits.MaxAckPending > 0 {
126+
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.MaxPendingMsgs > 0 {
131127
ps.logger.Info("Applied slow consumer throttling to JetStream consumer",
132128
slog.String("consumer", consumerConfig.Name),
133129
slog.Int("max_ack_pending", consumerConfig.MaxAckPending),
134130
slog.Int("max_request_max_bytes", consumerConfig.MaxRequestMaxBytes),
135-
slog.Bool("dropped_msg_tracking", monitoring.EnableDroppedMsgTracking),
131+
slog.Bool("dropped_msg_tracking", ps.slowConsumerConfig.EnableDroppedMsgTracking),
136132
)
137133
}
138134

@@ -191,7 +187,7 @@ func (ps *pubsub) natsHandler(h messaging.MessageHandler) func(m jetstream.Msg)
191187
slog.Uint64("consumer_seq", meta.Sequence.Consumer),
192188
)
193189

194-
if ps.jsStreamConfig.SlowConsumer.EnableDroppedMsgTracking {
190+
if ps.slowConsumerConfig != nil && ps.slowConsumerConfig.EnableDroppedMsgTracking {
195191
if meta.NumDelivered > 1 {
196192
args = append(args,
197193
slog.Uint64("delivery_count", meta.NumDelivered),

0 commit comments

Comments
 (0)