@@ -44,11 +44,15 @@ type blockUpdateConsumer struct {
44
44
// 1) To establish and keep track of what the head block height of the blockchain is, so event streams know how far from the head they are
45
45
// 2) To feed new block information to any registered consumers
46
46
type blockListener struct {
47
- ctx context.Context
48
- c * ethConnector
49
- backend rpcbackend.RPC
50
- wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
51
- listenLoopDone chan struct {}
47
+ ctx context.Context
48
+ c * ethConnector
49
+ backend rpcbackend.RPC
50
+ wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
51
+ listenLoopDone chan struct {}
52
+
53
+ isStarted bool
54
+ startDone chan struct {}
55
+
52
56
initialBlockHeightObtained chan struct {}
53
57
newHeadsTap chan struct {}
54
58
newHeadsSub rpcbackend.Subscription
@@ -73,6 +77,8 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
73
77
ctx : log .WithLogField (ctx , "role" , "blocklistener" ),
74
78
c : c ,
75
79
backend : c .backend , // use the HTTP backend - might get overwritten by a connected websocket later
80
+ isStarted : false ,
81
+ startDone : make (chan struct {}),
76
82
initialBlockHeightObtained : make (chan struct {}),
77
83
newHeadsTap : make (chan struct {}),
78
84
highestBlock : - 1 ,
@@ -92,6 +98,22 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
92
98
return bl , nil
93
99
}
94
100
101
+ // setting block filter status updates that new block filter has been created
102
+ func (bl * blockListener ) markStarted () {
103
+ if ! bl .isStarted {
104
+ bl .isStarted = true
105
+ close (bl .startDone )
106
+ }
107
+ }
108
+
109
+ func (bl * blockListener ) waitUntilStarted (ctx context.Context ) {
110
+ select {
111
+ case <- bl .startDone :
112
+ case <- bl .ctx .Done ():
113
+ case <- ctx .Done ():
114
+ }
115
+ }
116
+
95
117
func (bl * blockListener ) newHeadsSubListener () {
96
118
for range bl .newHeadsSub .Notifications () {
97
119
select {
@@ -106,7 +128,7 @@ func (bl *blockListener) newHeadsSubListener() {
106
128
// getBlockHeightWithRetry keeps retrying attempting to get the initial block height until successful
107
129
func (bl * blockListener ) establishBlockHeightWithRetry () error {
108
130
wsConnected := false
109
- return bl .c .retry .Do (bl .ctx , "get initial block height" , func (attempt int ) (retry bool , err error ) {
131
+ return bl .c .retry .Do (bl .ctx , "get initial block height" , func (_ int ) (retry bool , err error ) {
110
132
111
133
// If we have a WebSocket backend, then we connect it and switch over to using it
112
134
// (we accept an un-locked update here to backend, as the most important routine that's
@@ -136,16 +158,18 @@ func (bl *blockListener) establishBlockHeightWithRetry() error {
136
158
bl .backend = bl .wsBackend
137
159
}
138
160
139
- // Now get the block heiht
161
+ // Now get the block height
140
162
var hexBlockHeight ethtypes.HexInteger
141
163
rpcErr := bl .backend .CallRPC (bl .ctx , & hexBlockHeight , "eth_blockNumber" )
142
164
if rpcErr != nil {
143
165
log .L (bl .ctx ).Warnf ("Block height could not be obtained: %s" , rpcErr .Message )
144
166
return true , rpcErr .Error ()
145
167
}
168
+
146
169
bl .mux .Lock ()
147
170
bl .highestBlock = hexBlockHeight .BigInt ().Int64 ()
148
171
bl .mux .Unlock ()
172
+
149
173
return false , nil
150
174
})
151
175
}
@@ -162,6 +186,7 @@ func (bl *blockListener) listenLoop() {
162
186
var filter string
163
187
failCount := 0
164
188
gapPotential := true
189
+ firstIteration := true
165
190
for {
166
191
if failCount > 0 {
167
192
if bl .c .doFailureDelay (bl .ctx , failCount ) {
@@ -170,12 +195,16 @@ func (bl *blockListener) listenLoop() {
170
195
}
171
196
} else {
172
197
// Sleep for the polling interval, or until we're shoulder tapped by the newHeads listener
173
- select {
174
- case <- time .After (bl .blockPollingInterval ):
175
- case <- bl .newHeadsTap :
176
- case <- bl .ctx .Done ():
177
- log .L (bl .ctx ).Debugf ("Block listener loop stopping" )
178
- return
198
+ if ! firstIteration {
199
+ select {
200
+ case <- time .After (bl .blockPollingInterval ):
201
+ case <- bl .newHeadsTap :
202
+ case <- bl .ctx .Done ():
203
+ log .L (bl .ctx ).Debugf ("Block listener loop stopping" )
204
+ return
205
+ }
206
+ } else {
207
+ firstIteration = false
179
208
}
180
209
}
181
210
@@ -186,6 +215,7 @@ func (bl *blockListener) listenLoop() {
186
215
failCount ++
187
216
continue
188
217
}
218
+ bl .markStarted ()
189
219
}
190
220
191
221
var blockHashes []ethtypes.HexBytes0xPrefix
@@ -374,7 +404,7 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
374
404
for {
375
405
var bi * blockInfoJSONRPC
376
406
var reason ffcapi.ErrorReason
377
- err := bl .c .retry .Do (bl .ctx , "rebuild listener canonical chain" , func (attempt int ) (retry bool , err error ) {
407
+ err := bl .c .retry .Do (bl .ctx , "rebuild listener canonical chain" , func (_ int ) (retry bool , err error ) {
378
408
bi , reason , err = bl .getBlockInfoByNumber (bl .ctx , nextBlockNumber , false , "" )
379
409
return reason != ffcapi .ErrorReasonNotFound , err
380
410
})
@@ -428,7 +458,7 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf
428
458
currentViewBlock := lastElem .Value .(* minimalBlockInfo )
429
459
var freshBlockInfo * blockInfoJSONRPC
430
460
var reason ffcapi.ErrorReason
431
- err := bl .c .retry .Do (bl .ctx , "rebuild listener canonical chain" , func (attempt int ) (retry bool , err error ) {
461
+ err := bl .c .retry .Do (bl .ctx , "rebuild listener canonical chain" , func (_ int ) (retry bool , err error ) {
432
462
freshBlockInfo , reason , err = bl .getBlockInfoByNumber (bl .ctx , currentViewBlock .number , false , "" )
433
463
return reason != ffcapi .ErrorReasonNotFound , err
434
464
})
@@ -471,23 +501,28 @@ func (bl *blockListener) dispatchToConsumers(consumers []*blockUpdateConsumer, u
471
501
}
472
502
}
473
503
474
- func (bl * blockListener ) checkStartedLocked () {
504
+ func (bl * blockListener ) checkAndStartListenerLoop () {
505
+ bl .mux .Lock ()
506
+ defer bl .mux .Unlock ()
475
507
if bl .listenLoopDone == nil {
476
508
bl .listenLoopDone = make (chan struct {})
477
509
go bl .listenLoop ()
478
510
}
479
511
}
480
512
481
- func (bl * blockListener ) addConsumer (c * blockUpdateConsumer ) {
513
+ func (bl * blockListener ) addConsumer (ctx context.Context , c * blockUpdateConsumer ) {
514
+ bl .checkAndStartListenerLoop ()
515
+ bl .waitUntilStarted (ctx ) // need to make sure the listener is started before adding any consumers
482
516
bl .mux .Lock ()
483
517
defer bl .mux .Unlock ()
484
- bl .checkStartedLocked ()
485
518
bl .consumers [* c .id ] = c
486
519
}
487
520
488
521
func (bl * blockListener ) getHighestBlock (ctx context.Context ) (int64 , bool ) {
522
+ bl .checkAndStartListenerLoop ()
523
+ // block height will be established as the first step of listener startup process
524
+ // so we don't need to wait for the entire startup process to finish to return the result
489
525
bl .mux .Lock ()
490
- bl .checkStartedLocked ()
491
526
highestBlock := bl .highestBlock
492
527
bl .mux .Unlock ()
493
528
// if not yet initialized, wait to be initialized
@@ -515,6 +550,9 @@ func (bl *blockListener) waitClosed() {
515
550
bl .wsBackend .Close ()
516
551
}
517
552
if listenLoopDone != nil {
518
- <- listenLoopDone
553
+ select {
554
+ case <- listenLoopDone :
555
+ case <- bl .ctx .Done ():
556
+ }
519
557
}
520
558
}
0 commit comments