Skip to content
This repository was archived by the owner on Jan 13, 2023. It is now read-only.

Commit 8946574

Browse files
authored
Merge pull request #53 from itzmeanjan/develop
Improve missing block finder & retry queue manager
2 parents f06e0fc + da0c3df commit 8946574

File tree

11 files changed

+223
-51
lines changed

11 files changed

+223
-51
lines changed

app/app.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,10 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne
7373
func Run(configFile, subscriptionPlansFile string) {
7474
_connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile)
7575
_redisInfo := d.RedisInfo{
76-
Client: _redisClient,
77-
BlockRetryQueueName: "blocks_in_retry_queue",
78-
UnfinalizedBlocksQueueName: "unfinalized_blocks",
76+
Client: _redisClient,
77+
BlockRetryQueue: "blocks_in_retry_queue",
78+
BlockRetryCountTable: "attempt_count_tracker_table",
79+
UnfinalizedBlocksQueue: "unfinalized_blocks",
7980
}
8081

8182
// Attempting to listen to Ctrl+C signal

app/block/block.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func HasBlockFinalized(status *d.StatusHolder, number uint64) bool {
2525
}
2626

2727
// ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data
28-
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) {
28+
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) bool {
2929

3030
// Closure managing publishing whole block data i.e. block header, txn(s), event logs
3131
// on redis pubsub channel
@@ -60,7 +60,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
6060
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
6161
status.IncrementBlocksProcessed()
6262

63-
return
63+
return true
6464

6565
}
6666

@@ -71,7 +71,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
7171
// Pushing into unfinalized block queue, to be picked up only when
7272
// finality for this block has been achieved
7373
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
74-
return
74+
return true
7575

7676
}
7777

@@ -82,15 +82,15 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
8282

8383
// If failed to persist, we'll put it in retry queue
8484
PushBlockIntoRetryQueue(redis, block.Number().String())
85-
return
85+
return false
8686

8787
}
8888

8989
// Successfully processed block
9090
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
9191
status.IncrementBlocksProcessed()
9292

93-
return
93+
return true
9494

9595
}
9696

@@ -165,7 +165,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
165165
if !(result.Failure == 0) {
166166

167167
PushBlockIntoRetryQueue(redis, block.Number().String())
168-
return
168+
return false
169169

170170
}
171171

@@ -183,7 +183,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
183183
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))
184184
status.IncrementBlocksProcessed()
185185

186-
return
186+
return true
187187

188188
}
189189

@@ -194,7 +194,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
194194
// Pushing into unfinalized block queue, to be picked up only when
195195
// finality for this block has been achieved
196196
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
197-
return
197+
return true
198198

199199
}
200200

@@ -205,12 +205,14 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
205205

206206
// If failed to persist, we'll put it in retry queue
207207
PushBlockIntoRetryQueue(redis, block.Number().String())
208-
return
208+
return false
209209

210210
}
211211

212212
// Successfully processed block
213213
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))
214+
214215
status.IncrementBlocksProcessed()
216+
return true
215217

216218
}

app/block/fetch.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,14 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r
5353
return
5454
}
5555

56-
ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt)
56+
// If attempt to process block by number went successful
57+
// we can consider removing this block number's entry from
58+
// attempt count tracker table
59+
if ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt) {
60+
61+
RemoveBlockFromAttemptCountTrackerTable(redis, fmt.Sprintf("%d", number))
62+
63+
}
5764

5865
}
5966

app/block/listener.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package block
22

33
import (
44
"context"
5+
"fmt"
56
"log"
67
"runtime"
78

@@ -87,7 +88,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
8788
//
8889
// Though it'll be picked up sometime in future ( by missing block finder ), but it can be safely handled now
8990
// so that it gets processed immediately
90-
func(blockHash common.Hash, blockNumber string) {
91+
func(blockHash common.Hash, blockNumber uint64) {
9192

9293
// When only processing blocks in real-time mode
9394
// no need to check what's present in unfinalized block number queue
@@ -132,9 +133,15 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
132133
}(oldest)
133134

134135
} else {
135-
// If oldest block is not finalized, no meaning
136-
// staying here, we'll revisit it some time in future
137-
break
136+
137+
// If left most block is not yet finalized, it'll attempt to
138+
// reorganize that queue so that other blocks waiting to be processed
139+
// can get that opportunity
140+
//
141+
// This situation generally occurs due to concurrent pattern implemented
142+
// in block processor
143+
MoveUnfinalizedOldestBlockToEnd(redis)
144+
138145
}
139146

140147
}
@@ -145,14 +152,14 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
145152

146153
FetchBlockByHash(connection.RPC,
147154
blockHash,
148-
blockNumber,
155+
fmt.Sprintf("%d", blockNumber),
149156
_db,
150157
redis,
151158
status)
152159

153160
})
154161

155-
}(header.Hash(), header.Number.String())
162+
}(header.Hash(), header.Number.Uint64())
156163

157164
}
158165
}

app/block/retry.go

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,19 @@ func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.Redis
3737
sleep()
3838

3939
// Popping oldest element from Redis queue
40-
blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueueName).Result()
40+
blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueue).Result()
4141
if err != nil {
4242
continue
4343
}
4444

45+
attemptCount := GetAttemptCountFromTable(redis, blockNumber)
46+
if attemptCount != 0 && attemptCount%3 != 0 {
47+
48+
PushBlockIntoRetryQueue(redis, blockNumber)
49+
continue
50+
51+
}
52+
4553
// Parsing string blockNumber to uint64
4654
parsedBlockNumber, err := strconv.ParseUint(blockNumber, 10, 64)
4755
if err != nil {
@@ -82,11 +90,75 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) {
8290
// Checking presence first & then deciding whether to add it or not
8391
if !CheckBlockInRetryQueue(redis, blockNumber) {
8492

85-
if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueueName, blockNumber).Result(); err != nil {
93+
if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueue, blockNumber).Result(); err != nil {
8694
log.Print(color.Red.Sprintf("[!] Failed to push block %s into retry queue : %s", blockNumber, err.Error()))
8795
}
8896

97+
IncrementAttemptCountOfBlockNumber(redis, blockNumber)
98+
99+
}
100+
}
101+
102+
// IncrementAttemptCountOfBlockNumber - Given block number, increments failed attempt count
103+
// of processing this block
104+
//
105+
// If block doesn't yet exist in tracker table, it'll be inserted first time & counter to be set to 1
106+
//
107+
// It'll be wrapped back to 0 as soon as it reaches 101
108+
func IncrementAttemptCountOfBlockNumber(redis *data.RedisInfo, blockNumber string) {
109+
110+
wrappedAttemptCount := (GetAttemptCountFromTable(redis, blockNumber) + 1) % 101
111+
112+
if _, err := redis.Client.HSet(context.Background(), redis.BlockRetryCountTable, blockNumber, wrappedAttemptCount).Result(); err != nil {
113+
log.Print(color.Red.Sprintf("[!] Failed to increment attempt count of block %s : %s", blockNumber, err.Error()))
89114
}
115+
116+
}
117+
118+
// CheckBlockInAttemptCounterTable - Checks whether given block number already exist in
119+
// attempt count tracker table
120+
func CheckBlockInAttemptCounterTable(redis *data.RedisInfo, blockNumber string) bool {
121+
122+
if _, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil {
123+
return false
124+
}
125+
126+
return true
127+
128+
}
129+
130+
// GetAttemptCountFromTable - Returns current attempt counter from table
131+
// for given block number
132+
func GetAttemptCountFromTable(redis *data.RedisInfo, blockNumber string) uint64 {
133+
134+
count, err := redis.Client.HGet(context.Background(), redis.BlockRetryCountTable, blockNumber).Result()
135+
if err != nil {
136+
return 0
137+
}
138+
139+
parsedCount, err := strconv.ParseUint(count, 10, 64)
140+
if err != nil {
141+
return 0
142+
}
143+
144+
return parsedCount
145+
146+
}
147+
148+
// RemoveBlockFromAttemptCountTrackerTable - Attempt to delete block number's
149+
// associated attempt count, given it already exists in table
150+
//
151+
// This is supposed to be invoked when a block is considered to be successfully processed
152+
func RemoveBlockFromAttemptCountTrackerTable(redis *data.RedisInfo, blockNumber string) {
153+
154+
if CheckBlockInAttemptCounterTable(redis, blockNumber) {
155+
156+
if _, err := redis.Client.HDel(context.Background(), redis.BlockRetryCountTable, blockNumber).Result(); err != nil {
157+
log.Print(color.Red.Sprintf("[!] Failed to delete attempt count of successful block %s : %s", blockNumber, err.Error()))
158+
}
159+
160+
}
161+
90162
}
91163

92164
// CheckBlockInRetryQueue - Checks whether block number is already added in
@@ -97,17 +169,19 @@ func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) {
97169
// Note: this feature of checking index of value in redis queue,
98170
// was added in Redis v6.0.6 : https://redis.io/commands/lpos
99171
func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool {
100-
if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil {
172+
173+
if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueue, blockNumber, _redis.LPosArgs{}).Result(); err != nil {
101174
return false
102175
}
103176

104177
return true
178+
105179
}
106180

107181
// GetRetryQueueLength - Returns redis backed retry queue length
108182
func GetRetryQueueLength(redis *data.RedisInfo) int64 {
109183

110-
blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueueName).Result()
184+
blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueue).Result()
111185
if err != nil {
112186
log.Printf(color.Red.Sprintf("[!] Failed to determine retry queue length : %s", err.Error()))
113187
}

app/block/syncer.go

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"log"
66
"runtime"
7+
"sort"
78
"time"
89

910
"github.com/ethereum/go-ethereum/ethclient"
@@ -16,8 +17,32 @@ import (
1617
"gorm.io/gorm"
1718
)
1819

20+
// FindMissingBlocksInRange - Given ascending ordered block numbers read from DB
21+
// attempts to find out which numbers are missing in [from, to] range
22+
// where both ends are inclusive
23+
func FindMissingBlocksInRange(found []uint64, from uint64, to uint64) []uint64 {
24+
25+
// creating slice with backing array of larger size
26+
// to avoid potential memory allocation during iteration
27+
// over loop
28+
absent := make([]uint64, 0, to-from+1)
29+
30+
for b := from; b <= to; b++ {
31+
32+
idx := sort.Search(len(found), func(j int) bool { return found[j] >= b })
33+
34+
if !(idx < len(found) && found[idx] == b) {
35+
absent = append(absent, b)
36+
}
37+
38+
}
39+
40+
return absent
41+
42+
}
43+
1944
// Syncer - Given ascending block number range i.e. fromBlock <= toBlock
20-
// fetches blocks in order {fromBlock, toBlock, fromBlock + 1, toBlock - 1, fromBlock + 2, toBlock - 2 ...}
45+
// attempts to fetch missing blocks in that range
2146
// while running n workers concurrently, where n = number of cores this machine has
2247
//
2348
// Waits for all of them to complete
@@ -28,8 +53,6 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB
2853
}
2954

3055
wp := workerpool.New(runtime.NumCPU() * int(cfg.GetConcurrencyFactor()))
31-
i := fromBlock
32-
j := toBlock
3356

3457
// Jobs need to be submitted using this interface, while
3558
// just mentioning which block needs to be fetched
@@ -43,17 +66,44 @@ func Syncer(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, fromB
4366
})
4467
}
4568

46-
for i <= j {
47-
// This condition to be arrived at when range has odd number of elements
48-
if i == j {
49-
job(i)
50-
} else {
51-
job(i)
52-
job(j)
69+
// attempting to fetch X blocks ( max ) at a time, by range
70+
//
71+
// @note This can be improved
72+
var step uint64 = 10000
73+
74+
for i := fromBlock; i <= toBlock; i += step {
75+
76+
toShouldbe := i + step - 1
77+
if toShouldbe > toBlock {
78+
toShouldbe = toBlock
79+
}
80+
81+
blocks := db.GetAllBlockNumbersInRange(_db, i, toShouldbe)
82+
83+
// No blocks present in DB, in queried range
84+
if blocks == nil || len(blocks) == 0 {
85+
86+
// So submitting all of them to job processor queue
87+
for j := i; j <= toShouldbe; j++ {
88+
89+
job(j)
90+
91+
}
92+
continue
93+
94+
}
95+
96+
// All blocks in range present in DB ✅
97+
if toShouldbe-i+1 == uint64(len(blocks)) {
98+
continue
99+
}
100+
101+
// Some blocks are missing in range, attempting to find them
102+
// and pushing their processing request to job queue
103+
for _, v := range FindMissingBlocksInRange(blocks, i, toShouldbe) {
104+
job(v)
53105
}
54106

55-
i++
56-
j--
57107
}
58108

59109
wp.StopWait()

0 commit comments

Comments
 (0)