@@ -70,40 +70,33 @@ type NatsPlugin struct {
7070 config * NatsPluginConfig
7171
7272 natsClient * nats.Conn
73+
74+ enabled bool
7375}
7476
7577func (p * NatsPlugin ) Enable () error {
7678 p .logger .Info ("Initialising NATS plugin" )
7779
80+ p .enabled = true
81+
82+ return p .RecycleNatsConnection ()
83+ }
84+
85+ func (p * NatsPlugin ) Disable () error {
86+ p .logger .Debug ("Disabling plugin" )
7887 if p .natsClient != nil {
7988 p .natsClient .Close ()
8089 }
8190
82- conn , err := p .getNatsClient ()
83- if err != nil {
84- p .logger .WithError (err ).Error ("Failed to connect to NATS" )
85- return err
86- }
87-
88- p .natsClient = conn
89-
90- go func () {
91- p .logger .Debug ("Started listening on NATS messages" )
92- p .consumeMessages ()
93- p .natsClient .SetErrorHandler (func (c * nats.Conn , s * nats.Subscription , err error ) {
94- p .logger .WithError (err ).Error ("NATS error occured" )
95- })
96- p .natsClient .SetDisconnectHandler (func (c * nats.Conn ) {
97- p .logger .Warn ("NATS connection lost" )
98- })
99- p .natsClient .SetReconnectHandler (func (c * nats.Conn ) {
100- p .logger .Info ("Connection to NATS server restored" )
101- })
102- }()
91+ p .enabled = false
10392
10493 return nil
10594}
10695
96+ func (p * NatsPlugin ) SetMessageHandler (h plugin.MessageHandler ) {
97+ p .msgHandler = h
98+ }
99+
107100// RecycleNatsConnection replaces the currently used NATS connection.
108101// If none is establish it just creates a new one, otherwise it closes
109102// the existing one and creates a new one.
@@ -137,15 +130,6 @@ func (p *NatsPlugin) RecycleNatsConnection() error {
137130 return nil
138131}
139132
140- func (p * NatsPlugin ) Disable () error {
141- p .logger .Debug ("Disabling plugin" )
142- if p .natsClient != nil {
143- p .natsClient .Close ()
144- }
145-
146- return nil
147- }
148-
149133func (p * NatsPlugin ) consumeMessages () {
150134 p .logger .Debugf ("Listening on subject: %s" , p .config .Subject )
151135 p .natsClient .Subscribe (p .config .Subject , func (m * nats.Msg ) {
@@ -230,11 +214,13 @@ func (p *NatsPlugin) ValidateAndSetConfig(c interface{}) error {
230214 }
231215 p .logger .Info ("Validated configuration" )
232216
233- p .logger .Debug ("Replacing NATS connection with the new config" )
234- err := p .RecycleNatsConnection ()
235- if err != nil {
236- p .logger .WithError (err ).Error ("Failed to replace the old NATS connection with the new one" )
237- return err
217+ if p .enabled {
218+ p .logger .Debug ("Replacing NATS connection with the new config" )
219+ err := p .RecycleNatsConnection ()
220+ if err != nil {
221+ p .logger .WithError (err ).Error ("Failed to replace the old NATS connection with the new one" )
222+ return err
223+ }
238224 }
239225
240226 return nil
0 commit comments