Skip to content
This repository was archived by the owner on Jan 13, 2023. It is now read-only.

Commit 5fe5c25

Browse files
authored
Merge pull request #52 from itzmeanjan/develop
Improved pubsub with always publish attempt
2 parents 2d61a8b + 2937977 commit 5fe5c25

File tree

16 files changed

+75
-62
lines changed

16 files changed

+75
-62
lines changed

Makefile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,5 @@ graphql_gen:
1313
build:
1414
go build -o ette
1515

16-
run:
17-
go build -o ette
16+
run: build
1817
./ette

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ And that's `ette`
7272

7373
- Make sure you've Go _( >= 1.15 )_ installed
7474
- You need to also install & set up PostgreSQL. I found [this](https://www.digitalocean.com/community/tutorials/how-to-install-and-use-postgresql-on-ubuntu-20-04) guide helpful.
75+
76+
> Make sure you've `pgcrypto` extension enabled on PostgreSQL Database.
77+
78+
> Check existing extensions using : `\dx`
79+
80+
> Create extension using : `create extension pgcrypto;`
81+
7582
- Redis needs to be installed too. Consider following [this](https://www.digitalocean.com/community/tutorials/how-to-install-and-secure-redis-on-ubuntu-20-04) guide.
7683

7784
> Note : Redis **v6.0.6** is required
@@ -101,7 +108,7 @@ cd ette
101108
- Skipping `RedisPassword` is absolutely fine, if you don't want to use any password in Redis instance. [ **Not recommended** ]
102109
- Replace `Domain` with your domain name i.e. `ette.company.com`
103110
- Set `Production` to `yes` before running it in production; otherwise you can simply skip it
104-
- `ette` can be run in any of 5 possible modes, which can be set by `EtteMode`
111+
- `ette` can be run in any of 👇 5 possible modes, which can be set by `EtteMode`
105112

106113
```json
107114
{
@@ -532,6 +539,8 @@ type Event {
532539

533540
### Real time notification for mined blocks ⛏
534541

542+
![pubsub-ette](./sc/pubsub-ette.png)
543+
535544
For listening to blocks getting mined, connect to `/v1/ws` endpoint using websocket client library & once connected, you need to send **subscription** request with 👇 payload _( JSON encoded )_
536545

537546
```json

app/app.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne
5959
graph.GetDatabaseConnection(_db)
6060

6161
_status := &d.StatusHolder{
62-
State: &d.SyncState{BlockCountAtStartUp: db.GetBlockCount(_db)},
62+
State: &d.SyncState{
63+
BlockCountAtStartUp: db.GetBlockCount(_db),
64+
MaxBlockNumberAtStartUp: db.GetCurrentBlockNumber(_db),
65+
},
6366
Mutex: &sync.RWMutex{},
6467
}
6568

app/block/block.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -164,18 +164,6 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
164164
// we're exiting from this context, while putting this block number in retry queue
165165
if !(result.Failure == 0) {
166166

167-
// If it's being run in mode 2, no need to put it in retry queue
168-
//
169-
// We can miss blocks, but will not be able deliver it over websocket channel
170-
// to subscribed clients
171-
//
172-
// @todo This needs to be improved, so that even if we miss a block now
173-
// we get to process & publish it over websocket based channel, where
174-
// clients subscribe for real-time data
175-
if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") {
176-
return
177-
}
178-
179167
PushBlockIntoRetryQueue(redis, block.Number().String())
180168
return
181169

app/block/fetch.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/ethereum/go-ethereum/core/types"
1212
"github.com/ethereum/go-ethereum/ethclient"
1313
"github.com/gookit/color"
14-
cfg "github.com/itzmeanjan/ette/app/config"
1514
d "github.com/itzmeanjan/ette/app/data"
1615
"github.com/itzmeanjan/ette/app/db"
1716
"gorm.io/gorm"
@@ -25,19 +24,6 @@ func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string,
2524

2625
block, err := client.BlockByHash(context.Background(), hash)
2726
if err != nil {
28-
29-
// If it's being run in mode 2, no need to put it in retry queue
30-
//
31-
// We can miss blocks, but will not be able deliver it over websocket channel
32-
// to subscribed clients
33-
//
34-
// @todo This needs to be improved, so that even if we miss a block now
35-
// we get to process & publish it over websocket based channel, where
36-
// clients subscribe for real-time data
37-
if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") {
38-
return
39-
}
40-
4127
// Pushing block number into Redis queue for retrying later
4228
PushBlockIntoRetryQueue(redis, number)
4329

@@ -50,7 +36,7 @@ func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string,
5036
}
5137

5238
// FetchBlockByNumber - Fetching block content using block number
53-
func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, redis *d.RedisInfo, _status *d.StatusHolder) {
39+
func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, redis *d.RedisInfo, publishable bool, _status *d.StatusHolder) {
5440

5541
// Starting block processing at
5642
startingAt := time.Now().UTC()
@@ -67,7 +53,7 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r
6753
return
6854
}
6955

70-
ProcessBlockContent(client, block, _db, redis, false, _status, startingAt)
56+
ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt)
7157

7258
}
7359

app/block/listener.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/gookit/color"
1212
cfg "github.com/itzmeanjan/ette/app/config"
1313
d "github.com/itzmeanjan/ette/app/data"
14-
"github.com/itzmeanjan/ette/app/db"
1514
"gorm.io/gorm"
1615
)
1716

@@ -28,9 +27,6 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
2827
// Scheduling unsubscribe, to be executed when end of this execution scope is reached
2928
defer subs.Unsubscribe()
3029

31-
// Last time `ette` stopped syncing here
32-
currentHighestBlockNumber := db.GetCurrentBlockNumber(_db)
33-
3430
// Flag to check for whether this is first time block header being received or not
3531
//
3632
// If yes, we'll start syncer to fetch all block in range (last block processed, latest block)
@@ -58,24 +54,25 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
5854
// Starting now, to be used for calculating system performance, uptime etc.
5955
status.SetStartedAt()
6056

57+
// Starting go routine for fetching blocks `ette` failed to process in previous attempt
58+
//
59+
// Uses Redis backed queue for fetching pending block hash & retries
60+
go RetryQueueManager(connection.RPC, _db, redis, status)
61+
6162
// If historical data query features are enabled
6263
// only then we need to sync to latest state of block chain
6364
if cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3" {
65+
6466
// Starting syncer in another thread, where it'll keep fetching
6567
// blocks from highest block number it fetched last time to current network block number
6668
// i.e. trying to fill up gap, which was caused when `ette` was offline
6769
//
6870
// Backward traversal mechanism gives us more recent blockchain happenings to cover
69-
go SyncBlocksByRange(connection.RPC, _db, redis, header.Number.Uint64()-1, currentHighestBlockNumber, status)
70-
71-
// Starting go routine for fetching blocks `ette` failed to process in previous attempt
72-
//
73-
// Uses Redis backed queue for fetching pending block hash & retries
74-
go RetryQueueManager(connection.RPC, _db, redis, status)
71+
go SyncBlocksByRange(connection.RPC, _db, redis, header.Number.Uint64()-1, status.MaxBlockNumberAtStartUp(), status)
7572

76-
// Making sure on when next latest block header is received, it'll not
77-
// start another syncer
7873
}
74+
// Making sure that when next latest block header is received, it'll not
75+
// start another syncer
7976
first = false
8077

8178
}
@@ -127,6 +124,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
127124
_oldestBlock,
128125
_db,
129126
redis,
127+
false,
130128
status)
131129

132130
})

app/block/publish.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ func PublishBlock(block *db.PackedBlock, redis *d.RedisInfo) {
3434
return
3535
}
3636

37+
log.Printf(color.LightMagenta.Sprintf("[*] Published block %d", block.Block.Number))
38+
3739
PublishTxs(block.Block.Number, block.Transactions, redis)
3840

3941
}
@@ -46,10 +48,18 @@ func PublishTxs(blockNumber uint64, txs []*db.PackedTransaction, redis *d.RedisI
4648
return
4749
}
4850

51+
var eventCount uint64
52+
4953
for _, t := range txs {
5054
PublishTx(blockNumber, t, redis)
55+
56+
// how many events are present in this block, in total
57+
eventCount += uint64(len(t.Events))
5158
}
5259

60+
log.Printf(color.LightMagenta.Sprintf("[*] Published %d transactions of block %d", len(txs), blockNumber))
61+
log.Printf(color.LightMagenta.Sprintf("[*] Published %d events of block %d", eventCount, blockNumber))
62+
5363
}
5464

5565
// PublishTx - Publishes tx & events in tx, related data to respective

app/block/retry.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,24 @@ func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.Redis
5454
// which will be picked up & processed
5555
//
5656
// This will stop us from blindly creating too many go routines
57-
func(blockNumber uint64) {
57+
func(_blockNumber uint64) {
58+
5859
wp.Submit(func() {
5960

60-
FetchBlockByNumber(client,
61-
parsedBlockNumber,
62-
_db,
63-
redis,
64-
status)
61+
// This check helps us in determining whether we should
62+
// consider sending notification over pubsub channel for this block
63+
// whose processing failed due to some reasons in last attempt
64+
if status.MaxBlockNumberAtStartUp() <= _blockNumber {
65+
66+
FetchBlockByNumber(client, _blockNumber, _db, redis, true, status)
67+
return
68+
69+
}
70+
71+
FetchBlockByNumber(client, _blockNumber, _db, redis, false, status)
6572

6673
})
74+
6775
}(parsedBlockNumber)
6876
}
6977
}

app/block/syncer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redis *data.Redis
8383

8484
}
8585

86-
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status)
86+
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, false, j.Status)
8787

8888
})
8989
}
@@ -154,7 +154,7 @@ func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redis *data.R
154154
block := db.GetBlock(j.DB, j.Block)
155155
if block == nil && !CheckBlockInRetryQueue(redis, fmt.Sprintf("%d", j.Block)) {
156156
// If not found, block fetching cycle is run, for this block
157-
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status)
157+
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, false, j.Status)
158158
}
159159

160160
})

app/data/data.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ import (
1717

1818
// SyncState - Whether `ette` is synced with blockchain or not
1919
type SyncState struct {
20-
Done uint64
21-
StartedAt time.Time
22-
BlockCountAtStartUp uint64
23-
NewBlocksInserted uint64
24-
LatestBlockNumber uint64
20+
Done uint64
21+
StartedAt time.Time
22+
BlockCountAtStartUp uint64
23+
MaxBlockNumberAtStartUp uint64
24+
NewBlocksInserted uint64
25+
LatestBlockNumber uint64
2526
}
2627

2728
// BlockCountInDB - Blocks currently present in database
@@ -36,6 +37,18 @@ type StatusHolder struct {
3637
Mutex *sync.RWMutex
3738
}
3839

40+
// MaxBlockNumberAtStartUp - Attempting to safely read latest block number
41+
// when `ette` was started, will help us in deciding whether a missing
42+
// block related notification needs to be sent on a pubsub channel or not
43+
func (s *StatusHolder) MaxBlockNumberAtStartUp() uint64 {
44+
45+
s.Mutex.RLock()
46+
defer s.Mutex.RUnlock()
47+
48+
return s.State.MaxBlockNumberAtStartUp
49+
50+
}
51+
3952
// SetStartedAt - Sets started at time
4053
func (s *StatusHolder) SetStartedAt() {
4154

app/db/model.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (u *Users) ToJSON() []byte {
134134
// This is to be used for controlling client application's access
135135
// to resources they're requesting
136136
type DeliveryHistory struct {
137-
ID uint64 `gorm:"column:id;type:bigserial;primaryKey"`
137+
ID string `gorm:"column:id;type:uuid;default:gen_random_uuid();primaryKey"`
138138
Client string `gorm:"column:client;type:char(42);not null;index"`
139139
TimeStamp time.Time `gorm:"column:ts;type:timestamp;not null;index:,sort:asc"`
140140
EndPoint string `gorm:"column:endpoint;type:varchar(100);not null"`

app/pubsub/block.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ func (b *BlockConsumer) SendData(data interface{}) bool {
185185
return false
186186
}
187187

188-
log.Printf("[+] Delivered `block` data to client\n")
189188
return true
190189

191190
}

app/pubsub/event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,8 @@ func (e *EventConsumer) SendData(data interface{}) bool {
221221
return false
222222
}
223223

224-
log.Printf("[+] Delivered `event` data to client\n")
225224
return true
225+
226226
}
227227

228228
// Unsubscribe - Unsubscribe from event data publishing topic, to be called

app/pubsub/transaction.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ func (t *TransactionConsumer) SendData(data interface{}) bool {
237237
return false
238238
}
239239

240-
log.Printf("[+] Delivered `transaction` data to client\n")
241240
return true
241+
242242
}
243243

244244
// Unsubscribe - Unsubscribe from transactions pubsub topic, which client has subscribed to

db/schema.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ create table users (
6767
create index on users(address);
6868

6969
create table delivery_history (
70-
id bigserial primary key,
70+
id uuid default gen_random_uuid() primary key,
7171
client char(42) not null,
7272
ts timestamp not null,
7373
endpoint varchar(100) not null,

sc/pubsub-ette.png

791 KB
Loading

0 commit comments

Comments
 (0)