Skip to content

Commit 89c4a8d

Browse files
committed
chainio: add method CurrentHeight
Add a new method `CurrentHeight` to query the current best height of the dispatcher.
1 parent fbc668c commit 89c4a8d

File tree

2 files changed

+117
-3
lines changed

2 files changed

+117
-3
lines changed

chainio/dispatcher.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,35 @@ type BlockbeatDispatcher struct {
4949

5050
// quit is used to signal the BlockbeatDispatcher to stop.
5151
quit chan struct{}
52+
53+
// queryHeightChan is used to receive queries on the current height of
54+
// the dispatcher.
55+
queryHeightChan chan *query
56+
}
57+
58+
// query is used to fetch the internal state of the dispatcher.
59+
type query struct {
60+
// respChan is used to send back the current height back to the caller.
61+
//
62+
// NOTE: This channel must be buffered.
63+
respChan chan int32
64+
}
65+
66+
// newQuery creates a query to be used to fetch the internal state of the
67+
// dispatcher.
68+
func newQuery() *query {
69+
return &query{
70+
respChan: make(chan int32, 1),
71+
}
5272
}
5373

5474
// NewBlockbeatDispatcher returns a new blockbeat dispatcher instance.
5575
func NewBlockbeatDispatcher(n chainntnfs.ChainNotifier) *BlockbeatDispatcher {
5676
return &BlockbeatDispatcher{
57-
notifier: n,
58-
quit: make(chan struct{}),
59-
consumerQueues: make(map[uint32][]Consumer),
77+
notifier: n,
78+
quit: make(chan struct{}),
79+
consumerQueues: make(map[uint32][]Consumer),
80+
queryHeightChan: make(chan *query, 1),
6081
}
6182
}
6283

@@ -161,6 +182,18 @@ func (b *BlockbeatDispatcher) dispatchBlocks(
161182
b.log().Infof("Notified all consumers on new block "+
162183
"in %v", time.Since(start))
163184

185+
// A query has been made to fetch the current height, we now
186+
// send the height from its current beat.
187+
case query := <-b.queryHeightChan:
188+
// The beat may not be set yet, e.g., during the startup
189+
// the query is made before the block epoch being sent.
190+
height := int32(0)
191+
if b.beat != nil {
192+
height = b.beat.Height()
193+
}
194+
195+
query.respChan <- height
196+
164197
case <-b.quit:
165198
b.log().Debugf("BlockbeatDispatcher quit signal " +
166199
"received")
@@ -170,6 +203,30 @@ func (b *BlockbeatDispatcher) dispatchBlocks(
170203
}
171204
}
172205

206+
// CurrentHeight returns the current best height known to the dispatcher. 0 is
207+
// returned if the dispatcher is shutting down.
208+
func (b *BlockbeatDispatcher) CurrentHeight() int32 {
209+
query := newQuery()
210+
211+
select {
212+
case b.queryHeightChan <- query:
213+
214+
case <-b.quit:
215+
clog.Debugf("BlockbeatDispatcher quit before query")
216+
return 0
217+
}
218+
219+
select {
220+
case height := <-query.respChan:
221+
clog.Debugf("Responded current height: %v", height)
222+
return height
223+
224+
case <-b.quit:
225+
clog.Debugf("BlockbeatDispatcher quit before response")
226+
return 0
227+
}
228+
}
229+
173230
// notifyQueues notifies each queue concurrently about the latest block epoch.
174231
func (b *BlockbeatDispatcher) notifyQueues() error {
175232
// errChans is a map of channels that will be used to receive errors

chainio/dispatcher_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,3 +381,60 @@ func TestNotifyQueuesError(t *testing.T) {
381381
err := b.notifyQueues()
382382
require.ErrorIs(t, err, errDummy)
383383
}
384+
385+
// TestCurrentHeight asserts `CurrentHeight` returns the expected block height.
386+
func TestCurrentHeight(t *testing.T) {
387+
t.Parallel()
388+
389+
testHeight := int32(1000)
390+
391+
// Create a mock chain notifier.
392+
mockNotifier := &chainntnfs.MockChainNotifier{}
393+
defer mockNotifier.AssertExpectations(t)
394+
395+
// Create a mock beat.
396+
mockBeat := &MockBlockbeat{}
397+
defer mockBeat.AssertExpectations(t)
398+
mockBeat.On("logger").Return(clog)
399+
mockBeat.On("Height").Return(testHeight).Once()
400+
401+
// Create a mock consumer.
402+
consumer := &MockConsumer{}
403+
defer consumer.AssertExpectations(t)
404+
consumer.On("Name").Return("mocker1")
405+
406+
// Create one queue.
407+
queue := []Consumer{consumer}
408+
409+
// Create a new dispatcher.
410+
b := NewBlockbeatDispatcher(mockNotifier)
411+
412+
// Register the queues.
413+
b.RegisterQueue(queue)
414+
415+
// Attach the blockbeat.
416+
b.beat = mockBeat
417+
418+
// Mock the chain notifier to return a valid notifier.
419+
blockEpochs := &chainntnfs.BlockEpochEvent{
420+
Cancel: func() {},
421+
}
422+
mockNotifier.On("RegisterBlockEpochNtfn",
423+
mock.Anything).Return(blockEpochs, nil).Once()
424+
425+
// Start the dispatcher now should not return an error.
426+
err := b.Start()
427+
require.NoError(t, err)
428+
429+
// Make a query on the current height and assert it equals to
430+
// testHeight.
431+
height := b.CurrentHeight()
432+
require.Equal(t, testHeight, height)
433+
434+
// Stop the dispatcher.
435+
b.Stop()
436+
437+
// Make a query on the current height and assert it equals to 0.
438+
height = b.CurrentHeight()
439+
require.Zero(t, height)
440+
}

0 commit comments

Comments
 (0)