Skip to content

Commit 7bbd881

Browse files
committed
chain validation and fix command
1 parent 627b36c commit 7bbd881

File tree

17 files changed

+1364
-121
lines changed

17 files changed

+1364
-121
lines changed

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ func init() {
220220
viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold"))
221221
rootCmd.AddCommand(orchestratorCmd)
222222
rootCmd.AddCommand(apiCmd)
223+
rootCmd.AddCommand(validateAndFixCmd)
224+
rootCmd.AddCommand(validateCmd)
223225
}
224226

225227
func initConfig() {

cmd/validate.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package cmd
2+
3+
import (
4+
"math/big"
5+
6+
"github.com/rs/zerolog/log"
7+
"github.com/spf13/cobra"
8+
config "github.com/thirdweb-dev/indexer/configs"
9+
"github.com/thirdweb-dev/indexer/internal/rpc"
10+
"github.com/thirdweb-dev/indexer/internal/storage"
11+
"github.com/thirdweb-dev/indexer/internal/validation"
12+
)
13+
14+
var (
15+
validateCmd = &cobra.Command{
16+
Use: "validate",
17+
Short: "Validate blockchain data integrity",
18+
Long: "Validate a range of blocks for data integrity issues including transaction roots and logs bloom verification",
19+
Run: func(cmd *cobra.Command, args []string) {
20+
RunValidate(cmd, args)
21+
},
22+
}
23+
)
24+
25+
/**
26+
* Validates a range of blocks (end and start are inclusive) for a given chain
27+
* First argument is the start block number
28+
* Second argument (optional) is the end block number
29+
*/
30+
func RunValidate(cmd *cobra.Command, args []string) {
31+
if len(args) < 1 {
32+
log.Fatal().Msg("Start block number is required")
33+
}
34+
startBlock, success := new(big.Int).SetString(args[0], 10)
35+
if !success {
36+
log.Fatal().Msg("Failed to parse start block number")
37+
}
38+
39+
var endBlock *big.Int
40+
if len(args) > 1 {
41+
endBlock, success = new(big.Int).SetString(args[1], 10)
42+
if !success {
43+
log.Fatal().Msg("Failed to parse end block number")
44+
}
45+
}
46+
if endBlock == nil {
47+
endBlock = startBlock
48+
}
49+
50+
rpcClient, err := rpc.Initialize()
51+
if err != nil {
52+
log.Fatal().Err(err).Msg("Failed to initialize RPC")
53+
}
54+
log.Info().Msgf("Running validation for chain %d", rpcClient.GetChainID())
55+
56+
s, err := storage.NewStorageConnector(&config.Cfg.Storage)
57+
if err != nil {
58+
log.Fatal().Err(err).Msg("Failed to initialize storage")
59+
}
60+
61+
validator := validation.NewValidator(rpcClient, s)
62+
63+
_, invalidBlocks, err := validator.ValidateBlockRange(startBlock, endBlock)
64+
if err != nil {
65+
log.Fatal().Err(err).Msg("Failed to validate blocks")
66+
}
67+
68+
if len(invalidBlocks) > 0 {
69+
log.Info().Msgf("Found %d invalid blocks", len(invalidBlocks))
70+
for _, block := range invalidBlocks {
71+
log.Info().Msgf("Invalid block: %s", block.Block.Number)
72+
}
73+
} else {
74+
log.Info().Msg("No invalid blocks found")
75+
}
76+
}

cmd/validate_and_fix.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package cmd
2+
3+
import (
4+
"crypto/tls"
5+
"fmt"
6+
"math/big"
7+
"strconv"
8+
9+
"github.com/ClickHouse/clickhouse-go/v2"
10+
"github.com/rs/zerolog/log"
11+
"github.com/spf13/cobra"
12+
config "github.com/thirdweb-dev/indexer/configs"
13+
"github.com/thirdweb-dev/indexer/internal/rpc"
14+
"github.com/thirdweb-dev/indexer/internal/storage"
15+
"github.com/thirdweb-dev/indexer/internal/validation"
16+
)
17+
18+
var (
19+
validateAndFixCmd = &cobra.Command{
20+
Use: "validateAndFix",
21+
Short: "Validate and fix blockchain data",
22+
Long: "Validate blockchain data in batches and automatically fix any issues found including duplicates, gaps, and invalid blocks",
23+
Run: func(cmd *cobra.Command, args []string) {
24+
RunValidateAndFix(cmd, args)
25+
},
26+
}
27+
)
28+
29+
func RunValidateAndFix(cmd *cobra.Command, args []string) {
30+
batchSize := big.NewInt(1000)
31+
fixBatchSize := 0 // default is no batch size
32+
if len(args) > 0 {
33+
batchSizeFromArgs, err := strconv.Atoi(args[0])
34+
if err != nil {
35+
log.Fatal().Err(err).Msg("Failed to parse batch size")
36+
}
37+
if batchSizeFromArgs < 1 {
38+
batchSizeFromArgs = 1
39+
}
40+
batchSize = big.NewInt(int64(batchSizeFromArgs))
41+
log.Info().Msgf("Using batch size %d from args", batchSize)
42+
}
43+
if len(args) > 1 {
44+
fixBatchSizeFromArgs, err := strconv.Atoi(args[1])
45+
if err != nil {
46+
log.Fatal().Err(err).Msg("Failed to parse fix batch size")
47+
}
48+
fixBatchSize = fixBatchSizeFromArgs
49+
}
50+
log.Debug().Msgf("Batch size: %d, fix batch size: %d", batchSize, fixBatchSize)
51+
batchSize = new(big.Int).Sub(batchSize, big.NewInt(1)) // -1 because range ends are inclusive
52+
53+
rpcClient, err := rpc.Initialize()
54+
if err != nil {
55+
log.Fatal().Err(err).Msg("Failed to initialize RPC")
56+
}
57+
log.Info().Msgf("Running validationAndFix for chain %d", rpcClient.GetChainID())
58+
59+
s, err := storage.NewStorageConnector(&config.Cfg.Storage)
60+
if err != nil {
61+
log.Fatal().Err(err).Msg("Failed to initialize storage")
62+
}
63+
cursor, err := validation.InitCursor(rpcClient.GetChainID(), s)
64+
if err != nil {
65+
log.Fatal().Err(err).Msg("Failed to initialize cursor")
66+
}
67+
log.Debug().Msgf("Cursor initialized for chain %d, starting from block %d", rpcClient.GetChainID(), cursor.LastScannedBlockNumber)
68+
69+
conn, err := clickhouse.Open(&clickhouse.Options{
70+
Addr: []string{fmt.Sprintf("%s:%d", config.Cfg.Storage.Main.Clickhouse.Host, config.Cfg.Storage.Main.Clickhouse.Port)},
71+
Protocol: clickhouse.Native,
72+
TLS: &tls.Config{
73+
MinVersion: tls.VersionTLS12,
74+
},
75+
Auth: clickhouse.Auth{
76+
Username: config.Cfg.Storage.Main.Clickhouse.Username,
77+
Password: config.Cfg.Storage.Main.Clickhouse.Password,
78+
},
79+
Settings: func() clickhouse.Settings {
80+
settings := clickhouse.Settings{
81+
"do_not_merge_across_partitions_select_final": "1",
82+
"use_skip_indexes_if_final": "1",
83+
"optimize_move_to_prewhere_if_final": "1",
84+
"async_insert": "1",
85+
"wait_for_async_insert": "1",
86+
}
87+
return settings
88+
}(),
89+
})
90+
if err != nil {
91+
log.Fatal().Err(err).Msg("Failed to connect to ClickHouse")
92+
}
93+
defer conn.Close()
94+
95+
startBlock := new(big.Int).Add(cursor.LastScannedBlockNumber, big.NewInt(1))
96+
97+
for startBlock.Cmp(cursor.MaxBlockNumber) <= 0 {
98+
batchEndBlock := new(big.Int).Add(startBlock, batchSize)
99+
if batchEndBlock.Cmp(cursor.MaxBlockNumber) > 0 {
100+
batchEndBlock = new(big.Int).Set(cursor.MaxBlockNumber)
101+
}
102+
103+
log.Info().Msgf("Validating batch of blocks from %s to %s", startBlock.String(), batchEndBlock.String())
104+
err := validateAndFixRange(rpcClient, s, conn, startBlock, batchEndBlock, fixBatchSize)
105+
if err != nil {
106+
log.Fatal().Err(err).Msgf("failed to validate and fix range %v-%v", startBlock, batchEndBlock)
107+
}
108+
109+
startBlock = new(big.Int).Add(batchEndBlock, big.NewInt(1))
110+
cursor.Update(batchEndBlock)
111+
}
112+
}
113+
114+
/**
115+
* Validates a range of blocks (end and start are inclusive) for a given chain and fixes any problems it finds
116+
*/
117+
func validateAndFixRange(rpcClient rpc.IRPCClient, s storage.IStorage, conn clickhouse.Conn, startBlock *big.Int, endBlock *big.Int, fixBatchSize int) error {
118+
chainId := rpcClient.GetChainID()
119+
err := validation.FindAndRemoveDuplicates(conn, chainId, startBlock, endBlock)
120+
if err != nil {
121+
return fmt.Errorf("failed to find and fix duplicates: %w", err)
122+
}
123+
124+
err = validation.FindAndFixGaps(rpcClient, s, conn, chainId, startBlock, endBlock)
125+
if err != nil {
126+
return fmt.Errorf("failed to find and fix gaps: %w", err)
127+
}
128+
129+
validator := validation.NewValidator(rpcClient, s)
130+
131+
_, invalidBlocks, err := validator.ValidateBlockRange(startBlock, endBlock)
132+
if err != nil {
133+
return fmt.Errorf("failed to validate and fix blocks: %w", err)
134+
}
135+
136+
invalidBlockNumbers := make([]*big.Int, 0)
137+
for _, blockData := range invalidBlocks {
138+
invalidBlockNumbers = append(invalidBlockNumbers, blockData.Block.Number)
139+
}
140+
141+
if len(invalidBlocks) > 0 {
142+
err = validator.FixBlocks(invalidBlockNumbers, fixBatchSize)
143+
if err != nil {
144+
return fmt.Errorf("failed to fix blocks: %w", err)
145+
}
146+
}
147+
148+
log.Debug().Msgf("ValidationAndFix complete for range %v-%v", startBlock, endBlock)
149+
return nil
150+
}

go.mod

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ go 1.23.0
44

55
require (
66
github.com/ClickHouse/clickhouse-go/v2 v2.36.0
7-
github.com/ethereum/go-ethereum v1.14.8
7+
github.com/ethereum/go-ethereum v1.15.11
88
github.com/gin-gonic/gin v1.10.0
99
github.com/gorilla/schema v1.4.1
10+
github.com/holiman/uint256 v1.3.2
1011
github.com/prometheus/client_golang v1.20.4
1112
github.com/rs/zerolog v1.33.0
1213
github.com/spf13/cobra v1.8.1
@@ -24,21 +25,21 @@ require (
2425
github.com/Microsoft/go-winio v0.6.2 // indirect
2526
github.com/andybalholm/brotli v1.1.1 // indirect
2627
github.com/beorn7/perks v1.0.1 // indirect
27-
github.com/bits-and-blooms/bitset v1.10.0 // indirect
28-
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
28+
github.com/bits-and-blooms/bitset v1.20.0 // indirect
2929
github.com/bytedance/sonic v1.12.6 // indirect
3030
github.com/bytedance/sonic/loader v0.2.1 // indirect
3131
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3232
github.com/cloudwego/base64x v0.1.4 // indirect
3333
github.com/cloudwego/iasm v0.2.0 // indirect
34-
github.com/consensys/bavard v0.1.13 // indirect
35-
github.com/consensys/gnark-crypto v0.12.1 // indirect
36-
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
37-
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
34+
github.com/consensys/bavard v0.1.27 // indirect
35+
github.com/consensys/gnark-crypto v0.16.0 // indirect
36+
github.com/crate-crypto/go-eth-kzg v1.3.0 // indirect
37+
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect
3838
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3939
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
4040
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
41-
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
41+
github.com/ethereum/c-kzg-4844/v2 v2.1.0 // indirect
42+
github.com/ethereum/go-verkle v0.2.2 // indirect
4243
github.com/fsnotify/fsnotify v1.7.0 // indirect
4344
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
4445
github.com/gin-contrib/sse v0.1.0 // indirect
@@ -53,11 +54,11 @@ require (
5354
github.com/go-playground/universal-translator v0.18.1 // indirect
5455
github.com/go-playground/validator/v10 v10.23.0 // indirect
5556
github.com/goccy/go-json v0.10.4 // indirect
56-
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
57+
github.com/gofrs/flock v0.8.1 // indirect
58+
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
5759
github.com/google/uuid v1.6.0 // indirect
5860
github.com/gorilla/websocket v1.4.2 // indirect
5961
github.com/hashicorp/hcl v1.0.0 // indirect
60-
github.com/holiman/uint256 v1.3.1 // indirect
6162
github.com/inconshreveable/mousetrap v1.1.0 // indirect
6263
github.com/josharian/intern v1.0.0 // indirect
6364
github.com/json-iterator/go v1.1.12 // indirect
@@ -68,11 +69,13 @@ require (
6869
github.com/mailru/easyjson v0.7.7 // indirect
6970
github.com/mattn/go-colorable v0.1.13 // indirect
7071
github.com/mattn/go-isatty v0.0.20 // indirect
72+
github.com/mattn/go-runewidth v0.0.13 // indirect
7173
github.com/mitchellh/mapstructure v1.5.0 // indirect
7274
github.com/mmcloughlin/addchain v0.4.0 // indirect
7375
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
7476
github.com/modern-go/reflect2 v1.0.2 // indirect
7577
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
78+
github.com/olekukonko/tablewriter v0.0.5 // indirect
7679
github.com/paulmach/orb v0.11.1 // indirect
7780
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
7881
github.com/pierrec/lz4/v4 v4.1.22 // indirect
@@ -81,6 +84,7 @@ require (
8184
github.com/prometheus/client_model v0.6.1 // indirect
8285
github.com/prometheus/common v0.55.0 // indirect
8386
github.com/prometheus/procfs v0.15.1 // indirect
87+
github.com/rivo/uniseg v0.2.0 // indirect
8488
github.com/sagikazarmark/locafero v0.4.0 // indirect
8589
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
8690
github.com/segmentio/asm v1.2.0 // indirect
@@ -92,13 +96,12 @@ require (
9296
github.com/spf13/pflag v1.0.5 // indirect
9397
github.com/stretchr/objx v0.5.2 // indirect
9498
github.com/subosito/gotenv v1.6.0 // indirect
95-
github.com/supranational/blst v0.3.11 // indirect
99+
github.com/supranational/blst v0.3.14 // indirect
96100
github.com/tklauser/go-sysconf v0.3.12 // indirect
97101
github.com/tklauser/numcpus v0.6.1 // indirect
98102
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
99103
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
100104
github.com/ugorji/go/codec v1.2.12 // indirect
101-
github.com/urfave/cli/v2 v2.27.4 // indirect
102105
github.com/yusufpapurcu/wmi v1.2.4 // indirect
103106
go.opentelemetry.io/otel v1.36.0 // indirect
104107
go.opentelemetry.io/otel/trace v1.36.0 // indirect

0 commit comments

Comments
 (0)