Skip to content

Commit bfdc266

Browse files
taktv6Oliver Geiselhardt-Herms
and
Oliver Geiselhardt-Herms
authored
Connection Manager: Disconnect and reconnect on failure (#273)
Co-authored-by: Oliver Geiselhardt-Herms <ogh@deepl.com>
1 parent 6cdaa0c commit bfdc266

File tree

5 files changed

+191
-135
lines changed

5 files changed

+191
-135
lines changed

junos_collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func deviceInterfaceRegex(cfg *config.Config, host string) *regexp.Regexp {
7777
}
7878

7979
func clientForDevice(device *connector.Device, connManager *connector.SSHConnectionManager) (*rpc.Client, error) {
80-
conn, err := connManager.Connect(device)
80+
conn, err := connManager.GetSSHConnection(device)
8181
if err != nil {
8282
return nil, err
8383
}

main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func initChannels(ctx context.Context) {
165165

166166
func shutdown() {
167167
log.Infoln("Closing connections to devices")
168-
connManager.Close()
168+
connManager.CloseAll()
169169
os.Exit(0)
170170
}
171171

@@ -198,7 +198,7 @@ func reinitialize() error {
198198
defer configMu.Unlock()
199199

200200
if connManager != nil {
201-
connManager.Close()
201+
connManager.CloseAll()
202202
connManager = nil
203203
}
204204

pkg/connector/connection.go

Lines changed: 155 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,83 @@ import (
1010
"time"
1111

1212
"github.com/pkg/errors"
13+
log "github.com/sirupsen/logrus"
1314

1415
"golang.org/x/crypto/ssh"
1516
)
1617

1718
// SSHConnection encapsulates the connection to the device
1819
type SSHConnection struct {
19-
device *Device
20-
client *ssh.Client
21-
conn net.Conn
22-
lastUsed time.Time
23-
mu sync.Mutex
24-
done chan struct{}
20+
device *Device
21+
sshClient *ssh.Client
22+
tcpConn net.Conn
23+
isConnected bool
24+
mu sync.RWMutex // protects sshClient, tcpConn and isConnected
25+
lastUsed time.Time
26+
lastUsedMu sync.RWMutex
27+
done chan struct{}
28+
keepAliveInterval time.Duration
29+
keepAliveTimeout time.Duration
2530
}
2631

27-
// RunCommand runs a command against the device
28-
func (c *SSHConnection) RunCommand(cmd string) ([]byte, error) {
32+
func NewSSHConnection(device *Device, keepAliveInterval time.Duration, keepAliveTimeout time.Duration) *SSHConnection {
33+
return &SSHConnection{
34+
device: device,
35+
keepAliveInterval: keepAliveInterval,
36+
keepAliveTimeout: keepAliveTimeout,
37+
done: make(chan struct{}),
38+
}
39+
}
40+
41+
func (c *SSHConnection) Start(expiredConnectionTimeout time.Duration) error {
42+
err := c.connect()
43+
if err != nil {
44+
return err
45+
}
46+
47+
go c.keepalive(expiredConnectionTimeout)
48+
return nil
49+
}
50+
51+
func (c *SSHConnection) Stop(err error) {
52+
log.Infof("Stopping SSH connection with %s (reason: %v)", c.device.Host, err)
53+
2954
c.mu.Lock()
3055
defer c.mu.Unlock()
3156

32-
c.lastUsed = time.Now()
57+
if !c.isConnected {
58+
return
59+
}
60+
61+
close(c.done)
3362

34-
if c.client == nil {
35-
return nil, errors.New(fmt.Sprintf("not connected with %s", c.conn.RemoteAddr().String()))
63+
if c.sshClient != nil {
64+
c.sshClient.Close()
65+
c.sshClient = nil
3666
}
3767

38-
session, err := c.client.NewSession()
68+
if c.tcpConn != nil {
69+
c.tcpConn.Close()
70+
c.tcpConn = nil
71+
}
72+
73+
c.isConnected = false
74+
}
75+
76+
// RunCommand runs a command against the device
77+
func (c *SSHConnection) RunCommand(cmd string) ([]byte, error) {
78+
c.setLastUsed(time.Now())
79+
80+
sshClient := c.getSSHClient()
81+
if sshClient == nil {
82+
c.Stop(fmt.Errorf("No ssh client"))
83+
return nil, errors.New(fmt.Sprintf("no SSH client to %s", c.device.Host))
84+
}
85+
86+
session, err := c.sshClient.NewSession()
3987
if err != nil {
40-
return nil, errors.Wrapf(err, "could not open session with %s", c.conn.RemoteAddr().String())
88+
c.Stop(fmt.Errorf("SSH session failure"))
89+
return nil, errors.Wrapf(err, "could not open session with %s", c.device.Host)
4190
}
4291
defer session.Close()
4392

@@ -46,37 +95,114 @@ func (c *SSHConnection) RunCommand(cmd string) ([]byte, error) {
4695

4796
err = session.Run(cmd)
4897
if err != nil {
49-
return nil, errors.Wrapf(err, "could not run command %q on %s", cmd, c.conn.RemoteAddr().String())
98+
c.Stop(fmt.Errorf("failed running command"))
99+
return nil, errors.Wrapf(err, "could not run command %q on %s", cmd, c.device.Host)
50100
}
51101

52102
return b.Bytes(), nil
53103
}
54104

55-
func (c *SSHConnection) isConnected() bool {
56-
return c.conn != nil
105+
func (c *SSHConnection) keepalive(expiredConnectionTimeout time.Duration) {
106+
for {
107+
select {
108+
case <-time.After(c.keepAliveInterval):
109+
terminated := c.terminateIfLifetimeExpired(expiredConnectionTimeout)
110+
if terminated {
111+
return
112+
}
113+
114+
_ = c.tcpConn.SetDeadline(time.Now().Add(c.keepAliveTimeout))
115+
116+
ok := c.testSSHClient()
117+
if !ok {
118+
return
119+
}
120+
case <-c.done:
121+
return
122+
}
123+
}
57124
}
58125

59-
func (c *SSHConnection) terminate() {
60-
c.mu.Lock()
61-
defer c.mu.Unlock()
126+
func (c *SSHConnection) terminateIfLifetimeExpired(expiredConnectionTimeout time.Duration) bool {
127+
if time.Since(c.GetLastUsed()) > expiredConnectionTimeout {
128+
c.Stop(fmt.Errorf("lifetime expired"))
129+
return true
130+
}
131+
132+
return false
133+
}
134+
135+
func (c *SSHConnection) testSSHClient() bool {
136+
sshClient := c.getSSHClient()
62137

63-
c.conn.Close()
138+
_, _, err := sshClient.SendRequest("keepalive@golang.org", true, nil)
139+
if err != nil {
140+
log.Infof("SSH keepalive request to %s failed: %v", c.device, err)
141+
c.Stop(fmt.Errorf("keepalive failed"))
142+
return false
143+
}
64144

65-
c.client = nil
66-
c.conn = nil
145+
return true
67146
}
68147

69-
func (c *SSHConnection) close() {
148+
func (c *SSHConnection) connect() error {
149+
cfg := &ssh.ClientConfig{
150+
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
151+
Timeout: timeoutInSeconds * time.Second,
152+
}
153+
154+
c.device.Auth(cfg)
155+
156+
host := tcpAddressForHost(c.device.Host)
157+
log.Infof("Establishing TCP connection with %s", host)
158+
159+
tcpConn, err := net.DialTimeout("tcp", host, cfg.Timeout)
160+
if err != nil {
161+
return fmt.Errorf("could not open tcp connection: %w", err)
162+
}
163+
164+
sshConn, chans, reqs, err := ssh.NewClientConn(tcpConn, host, cfg)
165+
if err != nil {
166+
tcpConn.Close()
167+
return fmt.Errorf("could not connect to device: %w", err)
168+
}
169+
70170
c.mu.Lock()
71171
defer c.mu.Unlock()
72172

73-
if c.client != nil {
74-
c.client.Close()
75-
}
173+
c.tcpConn = tcpConn
174+
c.sshClient = ssh.NewClient(sshConn, chans, reqs)
175+
c.isConnected = true
176+
177+
return nil
178+
}
179+
180+
func (c *SSHConnection) setLastUsed(t time.Time) {
181+
c.lastUsedMu.Lock()
182+
defer c.lastUsedMu.Unlock()
183+
184+
c.lastUsed = t
185+
}
186+
187+
func (c *SSHConnection) GetLastUsed() time.Time {
188+
c.lastUsedMu.RLock()
189+
defer c.lastUsedMu.RUnlock()
190+
191+
return c.lastUsed
192+
}
193+
194+
func (c *SSHConnection) getSSHClient() *ssh.Client {
195+
c.mu.RLock()
196+
defer c.mu.RUnlock()
197+
198+
return c.sshClient
199+
}
200+
201+
func (c *SSHConnection) IsConnected() bool {
202+
c.mu.RLock()
203+
defer c.mu.RUnlock()
76204

77-
c.done <- struct{}{}
78-
c.conn = nil
79-
c.client = nil
205+
return c.isConnected
80206
}
81207

82208
// Host returns the hostname of the connected device

0 commit comments

Comments
 (0)