@@ -80,13 +80,13 @@ func (sm *StreamManager) Run() error {
80
80
sm .Metrics .setLoginTime ()
81
81
case StateDisconnected :
82
82
// Reconnect on disconnection
83
- return sm .resume (e . SMState )
83
+ return sm .resume ()
84
84
case StateStreamError :
85
85
sm .client .Disconnect ()
86
86
// Only try reconnecting if we have not been kicked by another session to avoid connection loop.
87
87
// TODO: Make this conflict exception a permanent error
88
88
if e .StreamError != "conflict" {
89
- return sm .connect ()
89
+ return sm .resume ()
90
90
}
91
91
case StatePermanentError :
92
92
// Do not attempt to reconnect
@@ -113,19 +113,32 @@ func (sm *StreamManager) Stop() {
113
113
}
114
114
115
115
func (sm * StreamManager ) connect () error {
116
- var state SMState
117
- return sm .resume (state )
116
+ if sm .client != nil {
117
+ if c , ok := sm .client .(* Client ); ok {
118
+ if c .CurrentState .getState () == StateDisconnected {
119
+ sm .Metrics = initMetrics ()
120
+ err := c .Connect ()
121
+ if err != nil {
122
+ return err
123
+ }
124
+ if sm .PostConnect != nil {
125
+ sm .PostConnect (sm .client )
126
+ }
127
+ return nil
128
+ }
129
+ }
130
+ }
131
+ return errors .New ("client is not disconnected" )
118
132
}
119
133
120
134
// resume manages the reconnection loop and apply the define backoff to avoid overloading the server.
121
- func (sm * StreamManager ) resume (state SMState ) error {
135
+ func (sm * StreamManager ) resume () error {
122
136
var backoff backoff // TODO: Group backoff calculation features with connection manager?
123
137
124
138
for {
125
139
var err error
126
140
// TODO: Make it possible to define logger to log disconnect and reconnection attempts
127
141
sm .Metrics = initMetrics ()
128
-
129
142
if err = sm .client .Resume (); err != nil {
130
143
var actualErr ConnError
131
144
if xerrors .As (err , & actualErr ) {
0 commit comments