Skip to content

Commit d5ac05c

Browse files
authored
Merge pull request #9501 from yyforyongyu/getinfo-blockheight
rpcserver: check `blockbeatDispatcher` when deciding `isSynced`
2 parents 9c2c95d + 759dc20 commit d5ac05c

File tree

4 files changed

+206
-28
lines changed

4 files changed

+206
-28
lines changed

chainio/dispatcher.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,35 @@ type BlockbeatDispatcher struct {
4949

5050
// quit is used to signal the BlockbeatDispatcher to stop.
5151
quit chan struct{}
52+
53+
// queryHeightChan is used to receive queries on the current height of
54+
// the dispatcher.
55+
queryHeightChan chan *query
56+
}
57+
58+
// query is used to fetch the internal state of the dispatcher.
59+
type query struct {
60+
// respChan is used to send back the current height back to the caller.
61+
//
62+
// NOTE: This channel must be buffered.
63+
respChan chan int32
64+
}
65+
66+
// newQuery creates a query to be used to fetch the internal state of the
67+
// dispatcher.
68+
func newQuery() *query {
69+
return &query{
70+
respChan: make(chan int32, 1),
71+
}
5272
}
5373

5474
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
5575
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
5676
return &BlockbeatDispatcher{
57-
notifier: n,
58-
quit: make(chan struct{}),
59-
consumerQueues: make(map[uint32][]Consumer),
77+
notifier: n,
78+
quit: make(chan struct{}),
79+
consumerQueues: make(map[uint32][]Consumer),
80+
queryHeightChan: make(chan *query, 1),
6081
}
6182
}
6283

@@ -161,6 +182,18 @@ func (b *BlockbeatDispatcher) dispatchBlocks(
161182
b.log().Infof("Notified all consumers on new block "+
162183
"in %v", time.Since(start))
163184

185+
// A query has been made to fetch the current height, we now
186+
// send the height from its current beat.
187+
case query := <-b.queryHeightChan:
188+
// The beat may not be set yet, e.g., during the startup
189+
// the query is made before the block epoch being sent.
190+
height := int32(0)
191+
if b.beat != nil {
192+
height = b.beat.Height()
193+
}
194+
195+
query.respChan <- height
196+
164197
case <-b.quit:
165198
b.log().Debugf("BlockbeatDispatcher quit signal " +
166199
"received")
@@ -170,6 +203,30 @@ func (b *BlockbeatDispatcher) dispatchBlocks(
170203
}
171204
}
172205

206+
// CurrentHeight returns the current best height known to the dispatcher. 0 is
207+
// returned if the dispatcher is shutting down.
208+
func (b *BlockbeatDispatcher) CurrentHeight() int32 {
209+
query := newQuery()
210+
211+
select {
212+
case b.queryHeightChan <- query:
213+
214+
case <-b.quit:
215+
clog.Debugf("BlockbeatDispatcher quit before query")
216+
return 0
217+
}
218+
219+
select {
220+
case height := <-query.respChan:
221+
clog.Debugf("Responded current height: %v", height)
222+
return height
223+
224+
case <-b.quit:
225+
clog.Debugf("BlockbeatDispatcher quit before response")
226+
return 0
227+
}
228+
}
229+
173230
// notifyQueues notifies each queue concurrently about the latest block epoch.
174231
func (b *BlockbeatDispatcher) notifyQueues() error {
175232
// errChans is a map of channels that will be used to receive errors

chainio/dispatcher_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,3 +381,60 @@ func TestNotifyQueuesError(t *testing.T) {
381381
err := b.notifyQueues()
382382
require.ErrorIs(t, err, errDummy)
383383
}
384+
385+
// TestCurrentHeight asserts `CurrentHeight` returns the expected block height.
386+
func TestCurrentHeight(t *testing.T) {
387+
t.Parallel()
388+
389+
testHeight := int32(1000)
390+
391+
// Create a mock chain notifier.
392+
mockNotifier := &chainntnfs.MockChainNotifier{}
393+
defer mockNotifier.AssertExpectations(t)
394+
395+
// Create a mock beat.
396+
mockBeat := &MockBlockbeat{}
397+
defer mockBeat.AssertExpectations(t)
398+
mockBeat.On("logger").Return(clog)
399+
mockBeat.On("Height").Return(testHeight).Once()
400+
401+
// Create a mock consumer.
402+
consumer := &MockConsumer{}
403+
defer consumer.AssertExpectations(t)
404+
consumer.On("Name").Return("mocker1")
405+
406+
// Create one queue.
407+
queue := []Consumer{consumer}
408+
409+
// Create a new dispatcher.
410+
b := NewBlockbeatDispatcher(mockNotifier)
411+
412+
// Register the queues.
413+
b.RegisterQueue(queue)
414+
415+
// Attach the blockbeat.
416+
b.beat = mockBeat
417+
418+
// Mock the chain notifier to return a valid notifier.
419+
blockEpochs := &chainntnfs.BlockEpochEvent{
420+
Cancel: func() {},
421+
}
422+
mockNotifier.On("RegisterBlockEpochNtfn",
423+
mock.Anything).Return(blockEpochs, nil).Once()
424+
425+
// Start the dispatcher now should not return an error.
426+
err := b.Start()
427+
require.NoError(t, err)
428+
429+
// Make a query on the current height and assert it equals to
430+
// testHeight.
431+
height := b.CurrentHeight()
432+
require.Equal(t, testHeight, height)
433+
434+
// Stop the dispatcher.
435+
b.Stop()
436+
437+
// Make a query on the current height and assert it equals to 0.
438+
height = b.CurrentHeight()
439+
require.Zero(t, height)
440+
}

docs/release-notes/release-notes-0.19.0.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@
170170
use the configured budget values for HTLCs (first level sweep) in parcticular
171171
`--sweeper.budget.deadlinehtlcratio` and `--sweeper.budget.deadlinehtlc`.
172172

173+
* When deciding whether `lnd` is synced to chain, the current height from the
174+
blockbeat dispatcher is now also [taken into
175+
consideration](https://github.com/lightningnetwork/lnd/pull/9501).
176+
173177
## RPC Updates
174178

175179
* Some RPCs that previously just returned an empty response message now at least

rpcserver.go

Lines changed: 85 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3225,28 +3225,10 @@ func (r *rpcServer) GetInfo(_ context.Context,
32253225
idPub := r.server.identityECDH.PubKey().SerializeCompressed()
32263226
encodedIDPub := hex.EncodeToString(idPub)
32273227

3228-
bestHash, bestHeight, err := r.server.cc.ChainIO.GetBestBlock()
3229-
if err != nil {
3230-
return nil, fmt.Errorf("unable to get best block info: %w", err)
3231-
}
3232-
3233-
isSynced, bestHeaderTimestamp, err := r.server.cc.Wallet.IsSynced()
3228+
// Get the system's chain sync info.
3229+
syncInfo, err := r.getChainSyncInfo()
32343230
if err != nil {
3235-
return nil, fmt.Errorf("unable to sync PoV of the wallet "+
3236-
"with current best block in the main chain: %v", err)
3237-
}
3238-
3239-
// If the router does full channel validation, it has a lot of work to
3240-
// do for each block. So it might be possible that it isn't yet up to
3241-
// date with the most recent block, even if the wallet is. This can
3242-
// happen in environments with high CPU load (such as parallel itests).
3243-
// Since the `synced_to_chain` flag in the response of this call is used
3244-
// by many wallets (and also our itests) to make sure everything's up to
3245-
// date, we add the router's state to it. So the flag will only toggle
3246-
// to true once the router was also able to catch up.
3247-
if !r.cfg.Routing.AssumeChannelValid {
3248-
routerHeight := r.server.graphBuilder.SyncedHeight()
3249-
isSynced = isSynced && uint32(bestHeight) == routerHeight
3231+
return nil, err
32503232
}
32513233

32523234
network := lncfg.NormalizeNetwork(r.cfg.ActiveNetParams.Name)
@@ -3297,15 +3279,15 @@ func (r *rpcServer) GetInfo(_ context.Context,
32973279
NumActiveChannels: activeChannels,
32983280
NumInactiveChannels: inactiveChannels,
32993281
NumPeers: uint32(len(serverPeers)),
3300-
BlockHeight: uint32(bestHeight),
3301-
BlockHash: bestHash.String(),
3302-
SyncedToChain: isSynced,
3282+
BlockHeight: uint32(syncInfo.bestHeight),
3283+
BlockHash: syncInfo.blockHash.String(),
3284+
SyncedToChain: syncInfo.isSynced,
33033285
Testnet: isTestNet,
33043286
Chains: activeChains,
33053287
Uris: uris,
33063288
Alias: nodeAnn.Alias.String(),
33073289
Color: nodeColor,
3308-
BestHeaderTimestamp: bestHeaderTimestamp,
3290+
BestHeaderTimestamp: syncInfo.timestamp,
33093291
Version: version,
33103292
CommitHash: build.CommitHash,
33113293
SyncedToGraph: isGraphSynced,
@@ -8929,3 +8911,81 @@ func rpcInitiator(isInitiator bool) lnrpc.Initiator {
89298911

89308912
return lnrpc.Initiator_INITIATOR_REMOTE
89318913
}
8914+
8915+
// chainSyncInfo wraps info about the best block and whether the system is
8916+
// synced to that block.
8917+
type chainSyncInfo struct {
8918+
// isSynced specifies whether the whole system is considered synced.
8919+
// When true, it means the following subsystems are at the best height
8920+
// reported by the chain backend,
8921+
// - wallet.
8922+
// - channel graph.
8923+
// - blockbeat dispatcher.
8924+
isSynced bool
8925+
8926+
// bestHeight is the current height known to the chain backend.
8927+
bestHeight int32
8928+
8929+
// blockHash is the hash of the current block known to the chain
8930+
// backend.
8931+
blockHash chainhash.Hash
8932+
8933+
// timestamp is the block's timestamp the wallet has synced to.
8934+
timestamp int64
8935+
}
8936+
8937+
// getChainSyncInfo queries the chain backend, the wallet, the channel router
8938+
// and the blockbeat dispatcher to determine the best block and whether the
8939+
// system is considered synced.
8940+
func (r *rpcServer) getChainSyncInfo() (*chainSyncInfo, error) {
8941+
bestHash, bestHeight, err := r.server.cc.ChainIO.GetBestBlock()
8942+
if err != nil {
8943+
return nil, fmt.Errorf("unable to get best block info: %w", err)
8944+
}
8945+
8946+
isSynced, bestHeaderTimestamp, err := r.server.cc.Wallet.IsSynced()
8947+
if err != nil {
8948+
return nil, fmt.Errorf("unable to sync PoV of the wallet "+
8949+
"with current best block in the main chain: %v", err)
8950+
}
8951+
8952+
// Create an info to be returned.
8953+
info := &chainSyncInfo{
8954+
isSynced: isSynced,
8955+
bestHeight: bestHeight,
8956+
blockHash: *bestHash,
8957+
timestamp: bestHeaderTimestamp,
8958+
}
8959+
8960+
// Exit early if the wallet is not synced.
8961+
if !isSynced {
8962+
return info, nil
8963+
}
8964+
8965+
// If the router does full channel validation, it has a lot of work to
8966+
// do for each block. So it might be possible that it isn't yet up to
8967+
// date with the most recent block, even if the wallet is. This can
8968+
// happen in environments with high CPU load (such as parallel itests).
8969+
// Since the `synced_to_chain` flag in the response of this call is used
8970+
// by many wallets (and also our itests) to make sure everything's up to
8971+
// date, we add the router's state to it. So the flag will only toggle
8972+
// to true once the router was also able to catch up.
8973+
if !r.cfg.Routing.AssumeChannelValid {
8974+
routerHeight := r.server.graphBuilder.SyncedHeight()
8975+
isSynced = uint32(bestHeight) == routerHeight
8976+
}
8977+
8978+
// Exit early if the channel graph is not synced.
8979+
if !isSynced {
8980+
return info, nil
8981+
}
8982+
8983+
// Given the wallet and the channel router are synced, we now check
8984+
// whether the blockbeat dispatcher is synced.
8985+
height := r.server.blockbeatDispatcher.CurrentHeight()
8986+
8987+
// Overwrite isSynced and return.
8988+
info.isSynced = height == bestHeight
8989+
8990+
return info, nil
8991+
}

0 commit comments

Comments
 (0)