Skip to content
This repository was archived by the owner on Jan 13, 2023. It is now read-only.

Commit 769db23

Browse files
authored
Merge pull request #47 from itzmeanjan/develop
Delayed block processing queue [ addressing Chain Reorganisation issue ]
2 parents 8da500e + dc4834f commit 769db23

21 files changed

+462
-229
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ And that's `ette`
6161

6262
> Note : Redis **v6.0.6** is required
6363
64+
> Note : Setting password in Redis instance has been made optional from now on, though it's recommended.
65+
6466
- Blockchain Node's both **HTTP & Websocket** connection URL required, because we'll be querying block, transaction, event log related data using HTTP interface & listening for block mining events in real time over Websocket.
6567

6668
## Installation 🛠
@@ -98,6 +100,12 @@ cd ette
98100
- For processing block(s)/ tx(s) concurrently, it'll create `ConcurrencyFactor * #-of CPUs on machine` workers, who will pick up jobs submitted to them.
99101
- If nothing is specified, it defaults to 1 & assuming you're running `ette` on machine with 4 CPUs, it'll spawn worker pool of size 4. But more number of jobs can be submitted, only 4 can be running at max.
100102
- 👆 being done for controlling concurrency level, by putting more control on user's hand.
103+
- If you want to persist blocks in delayed fashion, you might consider setting `BlockConfirmations` to some _number > 0_.
104+
- That will make `ette` think you're asking it 80 is latest block, which can be persisted in final data store, when latest mined block number is 100 & `BlockConfirmations` is set to 20.
105+
- This option is **recommended** to be used, at least in production.
106+
- Skipping `RedisPassword` is absolutely fine, if you don't want to use any password in Redis instance. [ **Not recommended** ]
107+
- For range based queries `BlockRange` can be set to limit how many blocks can be queried by client in a single go. Default value 100.
108+
- For time span based queries `TimeRange` can be set to put limit on max time span _( in terms of second )_, can be used by clients. Default value 3600 i.e. 1 hour.
101109

102110
```
103111
RPCUrl=https://<domain-name>
@@ -117,6 +125,9 @@ Production=yes
117125
EtteMode=3
118126
EtteGraphQLPlayGround=yes
119127
ConcurrencyFactor=2
128+
BlockConfirmations=20
129+
BlockRange=1000
130+
TimeRange=21600
120131
```
121132
122133
- Create another file in same directory, named `.plans.json`, whose content will look like 👇.

app/app.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne
3535
Websocket: getClient(false),
3636
}
3737

38-
_redisClient := getPubSubClient()
38+
_redisClient := getRedisClient()
39+
40+
if _redisClient == nil {
41+
log.Fatalf("[!] Failed to connect to Redis Server\n")
42+
}
3943

4044
if err := _redisClient.FlushAll(context.Background()).Err(); err != nil {
4145
log.Printf("[!] Failed to flush all keys from redis : %s\n", err.Error())
@@ -63,8 +67,9 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne
6367
func Run(configFile, subscriptionPlansFile string) {
6468
_connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile)
6569
_redisInfo := d.RedisInfo{
66-
Client: _redisClient,
67-
QueueName: "blocks",
70+
Client: _redisClient,
71+
BlockRetryQueueName: "blocks_in_retry_queue",
72+
UnfinalizedBlocksQueueName: "unfinalized_blocks",
6873
}
6974

7075
// Attempting to listen to Ctrl+C signal

app/block/block.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package block
22

33
import (
4+
"fmt"
45
"log"
56
"runtime"
67

@@ -14,6 +15,14 @@ import (
1415
"gorm.io/gorm"
1516
)
1617

18+
// HasBlockFinalized - Checking whether block under processing i.e. `number`
19+
// has `N` confirmations on top of it or not
20+
func HasBlockFinalized(status *d.StatusHolder, number uint64) bool {
21+
22+
return status.GetLatestBlockNumber()-cfg.GetBlockConfirmations() >= number
23+
24+
}
25+
1726
// ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data
1827
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder) {
1928

@@ -40,11 +49,24 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
4049
// This is what we just published on pubsub channel
4150
packedBlock := pubsubWorker(nil)
4251

52+
if !HasBlockFinalized(status, packedBlock.Block.Number) {
53+
54+
log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))
55+
56+
// Pushing into unfinalized block queue, to be picked up only when
57+
// finality for this block has been achieved
58+
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
59+
return
60+
61+
}
62+
4363
// If block doesn't contain any tx, we'll attempt to persist only block
4464
if err := db.StoreBlock(_db, packedBlock, status); err != nil {
4565

66+
log.Print(color.Red.Sprintf("[+] Failed to process block %d with 0 tx(s) : %s", block.NumberU64(), err.Error()))
67+
4668
// If failed to persist, we'll put it in retry queue
47-
pushBlockHashIntoRedisQueue(redis, block.Number().String())
69+
PushBlockIntoRetryQueue(redis, block.Number().String())
4870
return
4971

5072
}
@@ -127,8 +149,7 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
127149
// we're exiting from this context, while putting this block number in retry queue
128150
if !(result.Failure == 0) {
129151

130-
// If failed to persist, we'll put it in retry queue
131-
pushBlockHashIntoRedisQueue(redis, block.Number().String())
152+
PushBlockIntoRetryQueue(redis, block.Number().String())
132153
return
133154

134155
}
@@ -138,11 +159,24 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
138159
// This is what we just published on pubsub channel
139160
packedBlock := pubsubWorker(packedTxs)
140161

162+
if !HasBlockFinalized(status, packedBlock.Block.Number) {
163+
164+
log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))
165+
166+
// Pushing into unfinalized block queue, to be picked up only when
167+
// finality for this block has been achieved
168+
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
169+
return
170+
171+
}
172+
141173
// If block doesn't contain any tx, we'll attempt to persist only block
142174
if err := db.StoreBlock(_db, packedBlock, status); err != nil {
143175

176+
log.Print(color.Red.Sprintf("[+] Failed to process block %d with %d tx(s) : %s", block.NumberU64(), block.Transactions().Len(), err.Error()))
177+
144178
// If failed to persist, we'll put it in retry queue
145-
pushBlockHashIntoRedisQueue(redis, block.Number().String())
179+
PushBlockIntoRetryQueue(redis, block.Number().String())
146180
return
147181

148182
}

app/block/fetch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string,
2020
block, err := client.BlockByHash(context.Background(), hash)
2121
if err != nil {
2222
// Pushing block number into Redis queue for retrying later
23-
pushBlockHashIntoRedisQueue(redis, number)
23+
PushBlockIntoRetryQueue(redis, number)
2424

2525
log.Print(color.Red.Sprintf("[!] Failed to fetch block by hash [ block : %s] : %s", number, err.Error()))
2626
return
@@ -37,7 +37,7 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r
3737
block, err := client.BlockByNumber(context.Background(), _num)
3838
if err != nil {
3939
// Pushing block number into Redis queue for retrying later
40-
pushBlockHashIntoRedisQueue(redis, fmt.Sprintf("%d", number))
40+
PushBlockIntoRetryQueue(redis, fmt.Sprintf("%d", number))
4141

4242
log.Print(color.Red.Sprintf("[!] Failed to fetch block by number [ block : %d ] : %s", number, err))
4343
return

app/block/listener.go

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
4848
log.Fatal(color.Red.Sprintf("[!] Listener stopped : %s", err.Error()))
4949
break
5050
case header := <-headerChan:
51+
52+
// Latest block number seen, is getting safely updated, as
53+
// soon as new block mined data gets propagated to network
54+
status.SetLatestBlockNumber(header.Number.Uint64())
55+
5156
if first {
5257

5358
// Starting now, to be used for calculating system performance, uptime etc.
@@ -60,19 +65,13 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
6065
// blocks from highest block number it fetched last time to current network block number
6166
// i.e. trying to fill up gap, which was caused when `ette` was offline
6267
//
63-
// But in reverse direction i.e. from 100 to 50, where `ette` fetched upto 50 last time & 100
64-
// is latest block, got mined in network
65-
//
66-
// Yes it's going refetch 50, due to the fact, some portions of 50 might be missed in last try
67-
// So, it'll check & decide whether persisting again is required or not
68-
//
69-
// This backward traversal mechanism gives us more recent blockchain happenings to cover
68+
// Backward traversal mechanism gives us more recent blockchain happenings to cover
7069
go SyncBlocksByRange(connection.RPC, _db, redis, header.Number.Uint64()-1, currentHighestBlockNumber, status)
7170

7271
// Starting go routine for fetching blocks `ette` failed to process in previous attempt
7372
//
7473
// Uses Redis backed queue for fetching pending block hash & retries
75-
go retryBlockFetching(connection.RPC, _db, redis, status)
74+
go RetryQueueManager(connection.RPC, _db, redis, status)
7675

7776
// Making sure on when next latest block header is received, it'll not
7877
// start another syncer
@@ -93,6 +92,41 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
9392
// so that it gets processed immediately
9493
func(blockHash common.Hash, blockNumber string) {
9594

95+
// Attempting to submit all blocks to job processor queue
96+
// if more blocks are present in non-final queue, than actually
97+
// should be
98+
for GetUnfinalizedQueueLength(redis) > int64(cfg.GetBlockConfirmations()) {
99+
100+
// Before submitting new block processing job
101+
// checking whether there exists any block in unfinalized
102+
// block queue or not
103+
//
104+
// If yes, we're attempting to process it, because it has now
105+
// achieved enough confirmations
106+
if CheckIfOldestBlockIsConfirmed(redis, status) {
107+
108+
oldest := PopOldestBlockFromUnfinalizedQueue(redis)
109+
110+
log.Print(color.Yellow.Sprintf("[*] Attempting to process finalised block %d [ Latest Block : %d | In Queue : %d ]", oldest, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))
111+
112+
wp.Submit(func() {
113+
114+
FetchBlockByNumber(connection.RPC,
115+
oldest,
116+
_db,
117+
redis,
118+
status)
119+
120+
})
121+
122+
} else {
123+
// If oldest block is not finalized, no meaning
124+
// staying here, we'll revisit it some time in future
125+
break
126+
}
127+
128+
}
129+
96130
wp.Submit(func() {
97131

98132
FetchBlockByHash(connection.RPC,

app/block/publish.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ import (
1212
// PublishBlock - Attempts to publish block data to Redis pubsub channel
1313
func PublishBlock(block *db.PackedBlock, redis *d.RedisInfo) {
1414

15+
if block == nil {
16+
return
17+
}
18+
1519
if err := redis.Client.Publish(context.Background(), "block", &d.Block{
1620
Hash: block.Block.Hash,
1721
Number: block.Block.Number,

app/block/retry.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ import (
1717
"gorm.io/gorm"
1818
)
1919

20-
// Pop oldest block number from Redis queue & try to fetch it in different go routine
20+
// RetryQueueManager - Pop oldest block number from Redis backed retry
21+
// queue & try to fetch it in different go routine
2122
//
2223
// Sleeps for 1000 milliseconds
2324
//
2425
// Keeps repeating
25-
func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) {
26+
func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.RedisInfo, status *d.StatusHolder) {
2627
sleep := func() {
27-
time.Sleep(time.Duration(500) * time.Millisecond)
28+
time.Sleep(time.Duration(1000) * time.Millisecond)
2829
}
2930

3031
// Creating worker pool and submitting jobs as soon as it's determined
@@ -36,7 +37,7 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.Redi
3637
sleep()
3738

3839
// Popping oldest element from Redis queue
39-
blockNumber, err := redis.Client.LPop(context.Background(), redis.QueueName).Result()
40+
blockNumber, err := redis.Client.LPop(context.Background(), redis.BlockRetryQueueName).Result()
4041
if err != nil {
4142
continue
4243
}
@@ -47,7 +48,7 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.Redi
4748
continue
4849
}
4950

50-
log.Print(color.Cyan.Sprintf("[~] Retrying block : %d [ In Queue : %d ]", parsedBlockNumber, getRetryQueueLength(redis)))
51+
log.Print(color.Cyan.Sprintf("[~] Retrying block : %d [ In Queue : %d ]", parsedBlockNumber, GetRetryQueueLength(redis)))
5152

5253
// Submitting block processor job into pool
5354
// which will be picked up & processed
@@ -67,39 +68,40 @@ func retryBlockFetching(client *ethclient.Client, _db *gorm.DB, redis *data.Redi
6768
}
6869
}
6970

70-
// Pushes failed to fetch block number at end of Redis queue
71+
// PushBlockIntoRetryQueue - Pushes failed to fetch block number at end of Redis queue
7172
// given it has not already been added
72-
func pushBlockHashIntoRedisQueue(redis *data.RedisInfo, blockNumber string) {
73+
func PushBlockIntoRetryQueue(redis *data.RedisInfo, blockNumber string) {
7374
// Checking presence first & then deciding whether to add it or not
74-
if !checkExistenceOfBlockNumberInRedisQueue(redis, blockNumber) {
75+
if !CheckBlockInRetryQueue(redis, blockNumber) {
7576

76-
if err := redis.Client.RPush(context.Background(), redis.QueueName, blockNumber).Err(); err != nil {
77-
log.Print(color.Red.Sprintf("[!] Failed to push block %s : %s", blockNumber, err.Error()))
77+
if _, err := redis.Client.RPush(context.Background(), redis.BlockRetryQueueName, blockNumber).Result(); err != nil {
78+
log.Print(color.Red.Sprintf("[!] Failed to push block %s into retry queue : %s", blockNumber, err.Error()))
7879
}
7980

8081
}
8182
}
8283

83-
// Checks whether block number is already added in Redis backed retry queue or not
84+
// CheckBlockInRetryQueue - Checks whether block number is already added in
85+
// Redis backed retry queue or not
8486
//
8587
// If yes, it'll not be added again
8688
//
8789
// Note: this feature of checking index of value in redis queue,
8890
// was added in Redis v6.0.6 : https://redis.io/commands/lpos
89-
func checkExistenceOfBlockNumberInRedisQueue(redis *data.RedisInfo, blockNumber string) bool {
90-
if _, err := redis.Client.LPos(context.Background(), redis.QueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil {
91+
func CheckBlockInRetryQueue(redis *data.RedisInfo, blockNumber string) bool {
92+
if _, err := redis.Client.LPos(context.Background(), redis.BlockRetryQueueName, blockNumber, _redis.LPosArgs{}).Result(); err != nil {
9193
return false
9294
}
9395

9496
return true
9597
}
9698

97-
// Returns redis backed retry queue length
98-
func getRetryQueueLength(redis *data.RedisInfo) int64 {
99+
// GetRetryQueueLength - Returns redis backed retry queue length
100+
func GetRetryQueueLength(redis *data.RedisInfo) int64 {
99101

100-
blockCount, err := redis.Client.LLen(context.Background(), redis.QueueName).Result()
102+
blockCount, err := redis.Client.LLen(context.Background(), redis.BlockRetryQueueName).Result()
101103
if err != nil {
102-
log.Printf(color.Red.Sprintf("[!] Failed to determine Redis queue length : %s", err.Error()))
104+
log.Printf(color.Red.Sprintf("[!] Failed to determine retry queue length : %s", err.Error()))
103105
}
104106

105107
return blockCount

app/block/syncer.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,20 @@ func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redis *data.Redis
6969
//
7070
// Job specification is provided in `Job` struct
7171
job := func(wp *workerpool.WorkerPool, j *d.Job) {
72+
7273
wp.Submit(func() {
7374

75+
if !HasBlockFinalized(status, j.Block) {
76+
77+
log.Print(color.LightRed.Sprintf("[x] Non-final block %d [ Latest Block : %d | In Queue : %d ]", j.Block, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))
78+
79+
// Pushing into unfinalized block queue, to be picked up only when
80+
// finality for this block has been achieved
81+
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", j.Block))
82+
return
83+
84+
}
85+
7486
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status)
7587

7688
})
@@ -124,11 +136,23 @@ func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redis *data.R
124136
//
125137
// Job specification is provided in `Job` struct
126138
job := func(wp *workerpool.WorkerPool, j *d.Job) {
139+
127140
wp.Submit(func() {
128141

142+
if !HasBlockFinalized(status, j.Block) {
143+
144+
log.Print(color.LightRed.Sprintf("[x] Non-final block %d [ Latest Block : %d | In Queue : %d ]", j.Block, status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))
145+
146+
// Pushing into unfinalized block queue, to be picked up only when
147+
// finality for this block has been achieved
148+
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", j.Block))
149+
return
150+
151+
}
152+
129153
// Worker fetches block by number from local storage
130154
block := db.GetBlock(j.DB, j.Block)
131-
if block == nil && !checkExistenceOfBlockNumberInRedisQueue(redis, fmt.Sprintf("%d", j.Block)) {
155+
if block == nil && !CheckBlockInRetryQueue(redis, fmt.Sprintf("%d", j.Block)) {
132156
// If not found, block fetching cycle is run, for this block
133157
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status)
134158
}

0 commit comments

Comments
 (0)