Skip to content

Commit c493f0c

Browse files
Roasbeefguggero
authored andcommitted
peer+lnd: add new CLI option to control if we D/C on slow pongs
In this commit, we add a new CLI option to control if we D/C on slow pongs or not. Due to the existence of head-of-the-line blocking at various levels of abstraction (app buffer, slow processing, TCP kernel buffers, etc), if there's a flurry of gossip messages (eg: 1K channel updates), then even with a reasonable processing latency, a peer may still not read our ping in time. To give users another option, we add a flag that allows users to disable this behavior. The default remains.
1 parent b0cba7d commit c493f0c

File tree

6 files changed

+175
-78
lines changed

6 files changed

+175
-78
lines changed

config.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,11 @@ const (
251251

252252
defaultPrunedNodeMaxPeers = 4
253253
defaultNeutrinoMaxPeers = 8
254+
255+
// defaultNoDisconnectOnPongFailure is the default value for whether we
256+
// should *not* disconnect from a peer if we don't receive a pong
257+
// response in time after we send a ping.
258+
defaultNoDisconnectOnPongFailure = false
254259
)
255260

256261
var (
@@ -527,6 +532,10 @@ type Config struct {
527532
// NumRestrictedSlots is the number of restricted slots we'll allocate
528533
// in the server.
529534
NumRestrictedSlots uint64 `long:"num-restricted-slots" description:"The number of restricted slots we'll allocate in the server."`
535+
536+
// NoDisconnectOnPongFailure controls if we'll disconnect if a peer
537+
// doesn't respond to a pong in time.
538+
NoDisconnectOnPongFailure bool `long:"no-disconnect-on-pong-failure" description:"If true, a peer will *not* be disconnected if a pong is not received in time or is mismatched. Defaults to false, meaning peers *will* be disconnected on pong failure."`
530539
}
531540

532541
// GRPCConfig holds the configuration options for the gRPC server.
@@ -747,10 +756,11 @@ func DefaultConfig() Config {
747756
ServerPingTimeout: defaultGrpcServerPingTimeout,
748757
ClientPingMinWait: defaultGrpcClientPingMinWait,
749758
},
750-
LogConfig: build.DefaultLogConfig(),
751-
WtClient: lncfg.DefaultWtClientCfg(),
752-
HTTPHeaderTimeout: DefaultHTTPHeaderTimeout,
753-
NumRestrictedSlots: DefaultNumRestrictedSlots,
759+
LogConfig: build.DefaultLogConfig(),
760+
WtClient: lncfg.DefaultWtClientCfg(),
761+
HTTPHeaderTimeout: DefaultHTTPHeaderTimeout,
762+
NumRestrictedSlots: DefaultNumRestrictedSlots,
763+
NoDisconnectOnPongFailure: defaultNoDisconnectOnPongFailure,
754764
}
755765
}
756766

peer/brontide.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ const (
9494
torTimeoutMultiplier = 3
9595

9696
// msgStreamSize is the size of the message streams.
97-
msgStreamSize = 5
97+
msgStreamSize = 50
9898
)
9999

100100
var (
@@ -455,6 +455,10 @@ type Config struct {
455455
// experimental endorsement signals should be set.
456456
ShouldFwdExpEndorsement func() bool
457457

458+
// NoDisconnectOnPongFailure indicates whether the peer should *not* be
459+
// disconnected if a pong is not received in time or is mismatched.
460+
NoDisconnectOnPongFailure bool
461+
458462
// Quit is the server's quit channel. If this is closed, we halt operation.
459463
Quit chan struct{}
460464
}
@@ -735,11 +739,27 @@ func NewBrontide(cfg Config) *Brontide {
735739
SendPing: func(ping *lnwire.Ping) {
736740
p.queueMsg(ping, nil)
737741
},
738-
OnPongFailure: func(err error) {
739-
eStr := "pong response failure for %s: %v " +
740-
"-- disconnecting"
741-
p.log.Warnf(eStr, p, err)
742-
go p.Disconnect(fmt.Errorf(eStr, p, err))
742+
OnPongFailure: func(reason error,
743+
timeWaitedForPong time.Duration,
744+
lastKnownRTT time.Duration) {
745+
746+
logMsg := fmt.Sprintf("pong response "+
747+
"failure for %s: %v. Time waited for this "+
748+
"pong: %v. Last successful RTT: %v.",
749+
p, reason, timeWaitedForPong, lastKnownRTT)
750+
751+
// If NoDisconnectOnPongFailure is true, we don't
752+
// disconnect. Otherwise (if it's false, the default),
753+
// we disconnect.
754+
if p.cfg.NoDisconnectOnPongFailure {
755+
p.log.Warnf("%s -- not disconnecting "+
756+
"due to config", logMsg)
757+
return
758+
}
759+
760+
p.log.Warnf("%s -- disconnecting", logMsg)
761+
762+
go p.Disconnect(fmt.Errorf("pong failure: %w", reason))
743763
},
744764
})
745765

peer/ping_manager.go

Lines changed: 79 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync/atomic"
88
"time"
99

10+
"github.com/lightningnetwork/lnd/fn/v2"
1011
"github.com/lightningnetwork/lnd/lnwire"
1112
)
1213

@@ -36,7 +37,8 @@ type PingManagerConfig struct {
3637
// OnPongFailure is a closure that is responsible for executing the
3738
// logic when a Pong message is either late or does not match our
3839
// expectations for that Pong
39-
OnPongFailure func(error)
40+
OnPongFailure func(failureReason error, timeWaitedForPong time.Duration,
41+
lastKnownRTT time.Duration)
4042
}
4143

4244
// PingManager is a structure that is designed to manage the internal state
@@ -108,6 +110,26 @@ func (m *PingManager) Start() error {
108110
return err
109111
}
110112

113+
// getLastRTT safely retrieves the last known RTT, returning 0 if none exists.
114+
func (m *PingManager) getLastRTT() time.Duration {
115+
rttPtr := m.pingTime.Load()
116+
if rttPtr == nil {
117+
return 0
118+
}
119+
120+
return *rttPtr
121+
}
122+
123+
// pendingPingWait calculates the time waited since the last ping was sent. If
124+
// no ping time is reported, None is returned. defaultDuration.
125+
func (m *PingManager) pendingPingWait() fn.Option[time.Duration] {
126+
if m.pingLastSend != nil {
127+
return fn.Some(time.Since(*m.pingLastSend))
128+
}
129+
130+
return fn.None[time.Duration]()
131+
}
132+
111133
// pingHandler is the main goroutine responsible for enforcing the ping/pong
112134
// protocol.
113135
func (m *PingManager) pingHandler() {
@@ -119,6 +141,10 @@ func (m *PingManager) pingHandler() {
119141
<-m.pingTimeout.C
120142
}
121143

144+
// Because we don't know if the OnPingFailure callback actually
145+
// disconnects a peer (dependent on user config), we should never return
146+
// from this loop unless the ping manager is stopped explicitly (which
147+
// happens on disconnect).
122148
for {
123149
select {
124150
case <-m.pingTicker.C:
@@ -127,12 +153,20 @@ func (m *PingManager) pingHandler() {
127153
// awaiting a pong response. This should never occur,
128154
// but if it does, it implies a timeout.
129155
if m.outstandingPongSize >= 0 {
130-
e := errors.New("impossible: new ping" +
131-
"in unclean state",
156+
// Ping was outstanding, meaning it timed out by
157+
// the arrival of the next ping interval.
158+
timeWaited := m.pendingPingWait().UnwrapOr(
159+
m.cfg.IntervalDuration,
160+
)
161+
lastRTT := m.getLastRTT()
162+
163+
m.cfg.OnPongFailure(
164+
errors.New("ping timed "+
165+
"out by next interval"),
166+
timeWaited, lastRTT,
132167
)
133-
m.cfg.OnPongFailure(e)
134168

135-
return
169+
m.resetPingState()
136170
}
137171

138172
pongSize := m.cfg.NewPongSize()
@@ -143,53 +177,67 @@ func (m *PingManager) pingHandler() {
143177

144178
// Set up our bookkeeping for the new Ping.
145179
if err := m.setPingState(pongSize); err != nil {
146-
m.cfg.OnPongFailure(err)
180+
// This is an internal error related to timer
181+
// reset. Pass it to OnPongFailure as it's
182+
// critical. Current and last RTT are not
183+
// directly applicable here.
184+
m.cfg.OnPongFailure(err, 0, 0)
185+
186+
m.resetPingState()
147187

148-
return
188+
continue
149189
}
150190

151191
m.cfg.SendPing(ping)
152192

153193
case <-m.pingTimeout.C:
154-
m.resetPingState()
155-
156-
e := errors.New("timeout while waiting for " +
157-
"pong response",
194+
timeWaited := m.pendingPingWait().UnwrapOr(
195+
m.cfg.TimeoutDuration,
158196
)
197+
lastRTT := m.getLastRTT()
159198

160-
m.cfg.OnPongFailure(e)
199+
m.cfg.OnPongFailure(
200+
errors.New("timeout while waiting for "+
201+
"pong response"),
202+
timeWaited, lastRTT,
203+
)
161204

162-
return
205+
m.resetPingState()
163206

164207
case pong := <-m.pongChan:
165208
pongSize := int32(len(pong.PongBytes))
166209

167-
// Save off values we are about to override when we
168-
// call resetPingState.
210+
// Save off values we are about to override when we call
211+
// resetPingState.
169212
expected := m.outstandingPongSize
170-
lastPing := m.pingLastSend
213+
lastPingTime := m.pingLastSend
171214

172-
m.resetPingState()
215+
// This is an unexpected pong, we'll continue.
216+
if lastPingTime == nil {
217+
continue
218+
}
173219

174-
// If the pong we receive doesn't match the ping we
175-
// sent out, then we fail out.
220+
actualRTT := time.Since(*lastPingTime)
221+
222+
// If the pong we receive doesn't match the ping we sent
223+
// out, then we fail out.
176224
if pongSize != expected {
177-
e := errors.New("pong response does " +
178-
"not match expected size",
179-
)
225+
e := fmt.Errorf("pong response does not match "+
226+
"expected size. Expected: %d, Got: %d",
227+
expected, pongSize)
180228

181-
m.cfg.OnPongFailure(e)
229+
lastRTT := m.getLastRTT()
230+
m.cfg.OnPongFailure(e, actualRTT, lastRTT)
182231

183-
return
184-
}
232+
m.resetPingState()
185233

186-
// Compute RTT of ping and save that for future
187-
// querying.
188-
if lastPing != nil {
189-
rtt := time.Since(*lastPing)
190-
m.pingTime.Store(&rtt)
234+
continue
191235
}
192236

237+
// Pong is good, update RTT and reset state.
238+
m.pingTime.Store(&actualRTT)
239+
m.resetPingState()
240+
193241
case <-m.quit:
194242
return
195243
}
@@ -231,6 +279,7 @@ func (m *PingManager) setPingState(pongSize uint16) error {
231279
func (m *PingManager) resetPingState() {
232280
m.pingLastSend = nil
233281
m.outstandingPongSize = -1
282+
234283
if !m.pingTimeout.Stop() {
235284
select {
236285
case <-m.pingTimeout.C:

peer/ping_manager_test.go

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package peer
22

33
import (
4+
"sync"
45
"testing"
56
"time"
67

@@ -23,19 +24,19 @@ func TestPingManager(t *testing.T) {
2324
result bool
2425
}{
2526
{
26-
name: "Happy Path",
27+
name: "happy Path",
2728
delay: 0,
2829
pongSize: 4,
2930
result: true,
3031
},
3132
{
32-
name: "Bad Pong",
33+
name: "bad Pong",
3334
delay: 0,
3435
pongSize: 3,
3536
result: false,
3637
},
3738
{
38-
name: "Timeout",
39+
name: "timeout",
3940
delay: 2,
4041
pongSize: 4,
4142
result: false,
@@ -44,45 +45,56 @@ func TestPingManager(t *testing.T) {
4445

4546
payload := make([]byte, 4)
4647
for _, test := range testCases {
47-
// Set up PingManager.
48-
pingSent := make(chan struct{})
49-
disconnected := make(chan struct{})
50-
mgr := NewPingManager(&PingManagerConfig{
51-
NewPingPayload: func() []byte {
52-
return payload
53-
},
54-
NewPongSize: func() uint16 {
55-
return 4
56-
},
57-
IntervalDuration: time.Second * 2,
58-
TimeoutDuration: time.Second,
59-
SendPing: func(ping *lnwire.Ping) {
60-
close(pingSent)
61-
},
62-
OnPongFailure: func(err error) {
63-
close(disconnected)
64-
},
65-
})
66-
require.NoError(t, mgr.Start(), "Could not start pingManager")
48+
t.Run(test.name, func(t *testing.T) {
49+
// Set up PingManager.
50+
var pingOnce sync.Once
51+
pingSent := make(chan struct{})
52+
disconnected := make(chan struct{})
53+
mgr := NewPingManager(&PingManagerConfig{
54+
NewPingPayload: func() []byte {
55+
return payload
56+
},
57+
NewPongSize: func() uint16 {
58+
return 4
59+
},
60+
IntervalDuration: time.Second * 2,
61+
TimeoutDuration: time.Second,
62+
SendPing: func(ping *lnwire.Ping) {
63+
pingOnce.Do(func() {
64+
close(pingSent)
65+
})
66+
},
67+
OnPongFailure: func(err error,
68+
_ time.Duration, _ time.Duration) {
69+
70+
close(disconnected)
71+
},
72+
})
73+
require.NoError(
74+
t, mgr.Start(), "Could not start pingManager",
75+
)
6776

68-
// Wait for initial Ping.
69-
<-pingSent
77+
// Wait for initial Ping.
78+
<-pingSent
7079

71-
// Wait for pre-determined time before sending Pong response.
72-
time.Sleep(time.Duration(test.delay) * time.Second)
80+
// Wait for pre-determined time before sending Pong
81+
// response.
82+
time.Sleep(time.Duration(test.delay) * time.Second)
7383

74-
// Send Pong back.
75-
res := lnwire.Pong{PongBytes: make([]byte, test.pongSize)}
76-
mgr.ReceivedPong(&res)
84+
// Send Pong back.
85+
res := lnwire.Pong{
86+
PongBytes: make([]byte, test.pongSize),
87+
}
88+
mgr.ReceivedPong(&res)
7789

78-
// Evaluate result
79-
select {
80-
case <-time.NewTimer(time.Second / 2).C:
81-
require.True(t, test.result)
82-
case <-disconnected:
83-
require.False(t, test.result)
84-
}
90+
select {
91+
case <-time.NewTimer(time.Second / 2).C:
92+
require.True(t, test.result)
93+
case <-disconnected:
94+
require.False(t, test.result)
95+
}
8596

86-
mgr.Stop()
97+
mgr.Stop()
98+
})
8799
}
88100
}

0 commit comments

Comments
 (0)