Skip to content
This repository was archived by the owner on Jan 13, 2023. It is now read-only.

Commit 71754fd

Browse files
authored
Merge pull request #56 from itzmeanjan/develop
Block Processor Queue to pick next block ( failed before ) efficiently
2 parents 1a87cac + e7e9257 commit 71754fd

23 files changed

+1343
-915
lines changed

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
SHELL:=/bin/bash
2+
13
proto_clean:
24
rm -rfv app/pb
35

@@ -6,9 +8,7 @@ proto_gen:
68
protoc -I app/proto/ --go_out=paths=source_relative:app/pb app/proto/*.proto
79

810
graphql_gen:
9-
pushd app/rest
10-
gqlgen generate
11-
popd
11+
pushd app/rest; gqlgen generate; popd
1212

1313
build:
1414
go build -o ette

app/app.go

Lines changed: 28 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,110 +5,62 @@ import (
55
"log"
66
"os"
77
"os/signal"
8-
"sync"
98
"syscall"
109
"time"
1110

12-
"github.com/go-redis/redis/v8"
1311
"github.com/gookit/color"
1412
blk "github.com/itzmeanjan/ette/app/block"
1513
cfg "github.com/itzmeanjan/ette/app/config"
16-
d "github.com/itzmeanjan/ette/app/data"
1714
"github.com/itzmeanjan/ette/app/db"
15+
1816
"github.com/itzmeanjan/ette/app/rest"
19-
"github.com/itzmeanjan/ette/app/rest/graph"
20-
srv "github.com/itzmeanjan/ette/app/services"
2117
ss "github.com/itzmeanjan/ette/app/snapshot"
22-
"gorm.io/gorm"
2318
)
2419

25-
// Setting ground up
26-
func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConnection, *redis.Client, *gorm.DB, *d.StatusHolder) {
27-
err := cfg.Read(configFile)
28-
if err != nil {
29-
log.Fatalf("[!] Failed to read `.env` : %s\n", err.Error())
30-
}
31-
32-
if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "2" || cfg.Get("EtteMode") == "3" || cfg.Get("EtteMode") == "4" || cfg.Get("EtteMode") == "5") {
33-
log.Fatalf("[!] Failed to find `EtteMode` in configuration file\n")
34-
}
35-
36-
// Maintaining both HTTP & Websocket based connection to blockchain
37-
_connection := &d.BlockChainNodeConnection{
38-
RPC: getClient(true),
39-
Websocket: getClient(false),
40-
}
41-
42-
_redisClient := getRedisClient()
43-
44-
if _redisClient == nil {
45-
log.Fatalf("[!] Failed to connect to Redis Server\n")
46-
}
47-
48-
if err := _redisClient.FlushAll(context.Background()).Err(); err != nil {
49-
log.Printf("[!] Failed to flush all keys from redis : %s\n", err.Error())
50-
}
51-
52-
_db := db.Connect()
53-
54-
// Populating subscription plans from `.plans.json` into
55-
// database table, at application start up
56-
db.PersistAllSubscriptionPlans(_db, subscriptionPlansFile)
57-
58-
// Passing db handle, to graph package, so that it can be used
59-
// for resolving graphQL queries
60-
graph.GetDatabaseConnection(_db)
61-
62-
_status := &d.StatusHolder{
63-
State: &d.SyncState{
64-
BlockCountAtStartUp: db.GetBlockCount(_db),
65-
MaxBlockNumberAtStartUp: db.GetCurrentBlockNumber(_db),
66-
},
67-
Mutex: &sync.RWMutex{},
68-
}
69-
70-
return _connection, _redisClient, _db, _status
71-
}
72-
7320
// Run - Application to be invoked from main runner using this function
7421
func Run(configFile, subscriptionPlansFile string) {
75-
_connection, _redisClient, _db, _status := bootstrap(configFile, subscriptionPlansFile)
76-
_redisInfo := d.RedisInfo{
77-
Client: _redisClient,
78-
BlockRetryQueue: "blocks_in_retry_queue",
79-
BlockRetryCountTable: "attempt_count_tracker_table",
80-
UnfinalizedBlocksQueue: "unfinalized_blocks",
81-
}
22+
23+
ctx, cancel := context.WithCancel(context.Background())
24+
_connection, _redisClient, _redisInfo, _db, _status, _queue := bootstrap(configFile, subscriptionPlansFile)
8225

8326
// Attempting to listen to Ctrl+C signal
8427
// and when received gracefully shutting down `ette`
8528
interruptChan := make(chan os.Signal, 1)
86-
signal.Notify(interruptChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
29+
signal.Notify(interruptChan, syscall.SIGTERM, syscall.SIGINT)
8730

8831
// All resources being used gets cleaned up
8932
// when we're returning from this function scope
9033
go func() {
9134

9235
<-interruptChan
9336

37+
// This call should be received in all places
38+
// where root context is passed along
39+
//
40+
// But only it's being used in block processor queue
41+
// go routine, as of now
42+
//
43+
// @note This can ( needs to ) be improved
44+
cancel()
45+
9446
sql, err := _db.DB()
9547
if err != nil {
96-
log.Printf(color.Red.Sprintf("[!] Failed to get underlying DB connection : %s", err.Error()))
48+
log.Print(color.Red.Sprintf("[!] Failed to get underlying DB connection : %s", err.Error()))
9749
return
9850
}
9951

10052
if err := sql.Close(); err != nil {
101-
log.Printf(color.Red.Sprintf("[!] Failed to close underlying DB connection : %s", err.Error()))
53+
log.Print(color.Red.Sprintf("[!] Failed to close underlying DB connection : %s", err.Error()))
10254
return
10355
}
10456

10557
if err := _redisInfo.Client.Close(); err != nil {
106-
log.Printf(color.Red.Sprintf("[!] Failed to close connection to Redis : %s", err.Error()))
58+
log.Print(color.Red.Sprintf("[!] Failed to close connection to Redis : %s", err.Error()))
10759
return
10860
}
10961

11062
// Stopping process
111-
log.Printf(color.Magenta.Sprintf("\n[+] Gracefully shut down `ette`"))
63+
log.Print(color.Magenta.Sprintf("\n[+] Gracefully shut down `ette`"))
11264
os.Exit(0)
11365

11466
}()
@@ -131,9 +83,9 @@ func Run(configFile, subscriptionPlansFile string) {
13183
// taking snapshot, this might take some time
13284
_ret := ss.TakeSnapshot(_db, _snapshotFile, db.GetCurrentOldestBlockNumber(_db), db.GetCurrentBlockNumber(_db), _status.BlockCountInDB())
13385
if _ret {
134-
log.Printf(color.Green.Sprintf("[+] Snapshotted in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _status.BlockCountInDB()))
86+
log.Print(color.Green.Sprintf("[+] Snapshotted in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _status.BlockCountInDB()))
13587
} else {
136-
log.Printf(color.Red.Sprintf("[!] Snapshotting failed in : %s", time.Now().UTC().Sub(_start)))
88+
log.Print(color.Red.Sprintf("[!] Snapshotting failed in : %s", time.Now().UTC().Sub(_start)))
13789
}
13890

13991
return
@@ -152,19 +104,24 @@ func Run(configFile, subscriptionPlansFile string) {
152104

153105
_, _count := ss.RestoreFromSnapshot(_db, _snapshotFile)
154106

155-
log.Printf(color.Green.Sprintf("[+] Restored from snapshot in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _count))
107+
log.Print(color.Green.Sprintf("[+] Restored from snapshot in : %s [ Count : %d ]", time.Now().UTC().Sub(_start), _count))
156108

157109
return
158110

159111
}
160112

113+
go _queue.Start(ctx)
114+
161115
// Pushing block header propagation listener to another thread of execution
162-
go blk.SubscribeToNewBlocks(_connection, _db, _status, &_redisInfo)
116+
go blk.SubscribeToNewBlocks(_connection, _db, _status, _redisInfo, _queue)
163117

164118
// Periodic clean up job being started, to be run every 24 hours to clean up
165119
// delivery history data, older than 24 hours
166-
go srv.DeliveryHistoryCleanUpService(_db)
120+
//
121+
// @note Need to be diagnosed, why it doesn't work
122+
// go srv.DeliveryHistoryCleanUpService(_db)
167123

168124
// Starting http server on main thread
169125
rest.RunHTTPServer(_db, _status, _redisClient)
126+
170127
}

app/block/block.go

Lines changed: 39 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,55 @@
11
package block
22

33
import (
4-
"fmt"
54
"log"
65
"runtime"
76
"time"
87

98
"github.com/ethereum/go-ethereum/core/types"
109
"github.com/ethereum/go-ethereum/ethclient"
1110
"github.com/gammazero/workerpool"
12-
"github.com/gookit/color"
1311
cfg "github.com/itzmeanjan/ette/app/config"
1412
d "github.com/itzmeanjan/ette/app/data"
1513
"github.com/itzmeanjan/ette/app/db"
14+
q "github.com/itzmeanjan/ette/app/queue"
1615
"gorm.io/gorm"
1716
)
1817

19-
// HasBlockFinalized - Checking whether block under processing i.e. `number`
20-
// has `N` confirmations on top of it or not
21-
func HasBlockFinalized(status *d.StatusHolder, number uint64) bool {
22-
23-
return status.GetLatestBlockNumber()-cfg.GetBlockConfirmations() >= number
24-
25-
}
26-
2718
// ProcessBlockContent - Processes everything inside this block i.e. block data, tx data, event data
28-
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, status *d.StatusHolder, startingAt time.Time) bool {
19+
func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm.DB, redis *d.RedisInfo, publishable bool, queue *q.BlockProcessorQueue, status *d.StatusHolder, startingAt time.Time) bool {
2920

3021
// Closure managing publishing whole block data i.e. block header, txn(s), event logs
3122
// on redis pubsub channel
32-
pubsubWorker := func(txns []*db.PackedTransaction) *db.PackedBlock {
23+
pubsubWorker := func(txns []*db.PackedTransaction) (*db.PackedBlock, bool) {
3324

3425
// Constructing block data to published & persisted
3526
packedBlock := BuildPackedBlock(block, txns)
3627

28+
// -- 3 step pub/sub attempt
29+
//
3730
// Attempting to publish whole block data to redis pubsub channel
3831
// when eligible `EtteMode` is set
3932
if publishable && (cfg.Get("EtteMode") == "2" || cfg.Get("EtteMode") == "3") {
40-
PublishBlock(packedBlock, redis)
33+
34+
// 1. Asking queue whether we need to publish block or not
35+
if !queue.CanPublish(block.NumberU64()) {
36+
return packedBlock, true
37+
}
38+
39+
// 2. Attempting to publish block on Pub/Sub topic
40+
if !PublishBlock(packedBlock, redis) {
41+
return nil, false
42+
}
43+
44+
// 3. Marking this block as published
45+
if !queue.Published(block.NumberU64()) {
46+
return nil, false
47+
}
48+
4149
}
50+
// -- done, with publishing on Pub/Sub topic
4251

43-
return packedBlock
52+
return packedBlock, true
4453

4554
}
4655

@@ -49,45 +58,34 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
4958
// Constructing block data to be persisted
5059
//
5160
// This is what we just published on pubsub channel
52-
packedBlock := pubsubWorker(nil)
61+
packedBlock, ok := pubsubWorker(nil)
62+
if !ok {
63+
return false
64+
}
5365

5466
// If `ette` being run in mode, for only publishing data to
5567
// pubsub channel, no need to persist data
5668
//
5769
// We simply publish & return from execution scope
5870
if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") {
5971

60-
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
72+
log.Printf("✅ Block %d with 0 tx(s) [ Took : %s ]\n", block.NumberU64(), time.Now().UTC().Sub(startingAt))
6173
status.IncrementBlocksProcessed()
6274

6375
return true
6476

6577
}
6678

6779
// If block doesn't contain any tx, we'll attempt to persist only block
68-
if err := db.StoreBlock(_db, packedBlock, status); err != nil {
69-
70-
log.Print(color.Red.Sprintf("[+] Failed to process block %d with 0 tx(s) : %s [ Took : %s ]", block.NumberU64(), err.Error(), time.Now().UTC().Sub(startingAt)))
80+
if err := db.StoreBlock(_db, packedBlock, status, queue); err != nil {
7181

72-
// If failed to persist, we'll put it in retry queue
73-
PushBlockIntoRetryQueue(redis, block.Number().String())
82+
log.Printf("❗️ Failed to process block %d : %s\n", block.NumberU64(), err.Error())
7483
return false
7584

7685
}
7786

78-
if !HasBlockFinalized(status, packedBlock.Block.Number) {
79-
80-
log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))
81-
82-
// Pushing into unfinalized block queue, to be picked up only when
83-
// finality for this block has been achieved
84-
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
85-
return true
86-
87-
}
88-
8987
// Successfully processed block
90-
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
88+
log.Printf("✅ Block %d with 0 tx(s) [ Took : %s ]\n", block.NumberU64(), time.Now().UTC().Sub(startingAt))
9189
status.IncrementBlocksProcessed()
9290

9391
return true
@@ -160,57 +158,41 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
160158
wp.Stop()
161159
// -- Tx processing ending
162160

163-
// When all tx(s) aren't successfully processed ( as they have informed us over go channel ),
164-
// we're exiting from this context, while putting this block number in retry queue
165161
if !(result.Failure == 0) {
166-
167-
PushBlockIntoRetryQueue(redis, block.Number().String())
168162
return false
169-
170163
}
171164

172165
// Constructing block data to be persisted
173166
//
174167
// This is what we just published on pubsub channel
175-
packedBlock := pubsubWorker(packedTxs)
168+
packedBlock, ok := pubsubWorker(packedTxs)
169+
if !ok {
170+
return false
171+
}
176172

177173
// If `ette` being run in mode, for only publishing data to
178174
// pubsub channel, no need to persist data
179175
//
180176
// We simply publish & return from execution scope
181177
if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") {
182178

183-
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))
179+
log.Printf("✅ Block %d with %d tx(s) [ Took : %s ]\n", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))
184180
status.IncrementBlocksProcessed()
185181

186182
return true
187183

188184
}
189185

190186
// If block doesn't contain any tx, we'll attempt to persist only block
191-
if err := db.StoreBlock(_db, packedBlock, status); err != nil {
187+
if err := db.StoreBlock(_db, packedBlock, status, queue); err != nil {
192188

193-
log.Print(color.Red.Sprintf("[+] Failed to process block %d with %d tx(s) : %s [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), err.Error(), time.Now().UTC().Sub(startingAt)))
194-
195-
// If failed to persist, we'll put it in retry queue
196-
PushBlockIntoRetryQueue(redis, block.Number().String())
189+
log.Printf("❗️ Failed to process block %d : %s\n", block.NumberU64(), err.Error())
197190
return false
198191

199192
}
200193

201-
if !HasBlockFinalized(status, packedBlock.Block.Number) {
202-
203-
log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))
204-
205-
// Pushing into unfinalized block queue, to be picked up only when
206-
// finality for this block has been achieved
207-
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
208-
return true
209-
210-
}
211-
212194
// Successfully processed block
213-
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))
195+
log.Printf("✅ Block %d with %d tx(s) [ Took : %s ]\n", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt))
214196

215197
status.IncrementBlocksProcessed()
216198
return true

0 commit comments

Comments
 (0)