Skip to content

Commit f54705e

Browse files
authored
chain validation and fix command (#198)
### TL;DR Added a new `validate` and `validateAndFix` commands to the indexer to verifiy and fix data integrity issues in the blockchain data. ### What changed? - Added a new `validate` command to the CLI that performs data validation - Added a new `validateAndFix` command to the CLI that performs data validation and repair - Implemented three validation functions: - `FindAndRemoveDuplicates`: Identifies and removes duplicate blocks, transactions, and logs - `FindAndFixGaps`: Detects and fills in missing blocks in a specified range - `ValidateAndFixBlocks`: Verifies block integrity by checking transaction counts, logs bloom filters, and transaction roots - Updated trace-related data structures to use proper types (int64 for trace addresses, uint64 for gas values) - Added support for filtering by block numbers when querying aggregates ### How to test? Run the validate command with optional batch size and fix batch size parameters: ```bash # Validate with default batch size (1000) ./indexer validateAndFix # Validate with custom batch size ./indexer validateAndFix 500 # Validate with custom batch size and fix batch size ./indexer validateAndFix 500 100 ``` ### Why make this change? This validation tool helps maintain data integrity in the blockchain indexer by: 1. Detecting and removing duplicate entries that could cause inconsistencies 2. Identifying and filling gaps in block sequences 3. Verifying the correctness of stored block data by recalculating and comparing cryptographic roots 4. Providing a mechanism to automatically fix corrupted or incorrect data These validation capabilities are essential for ensuring the reliability and accuracy of the indexed blockchain data. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Introduced CLI commands to validate and repair blockchain data with batch processing. * Added detection and removal of duplicate blocks, transactions, and logs in block ranges. * Implemented validation tools to identify missing blocks, fix gaps, and correct invalid entries. * Added blockchain data root and bloom calculation utilities. * Introduced progress tracking cursor for validation workflows. * Added blockchain data retrieval methods optimized for validation. * Enhanced logging for contract ABI parsing errors. * **Improvements** * Updated dependencies for enhanced compatibility and performance. * Standardized numeric types for blockchain trace and gas fields. * Improved data serialization consistency for trace addresses and gas values. * Enhanced database queries for consistent reads and filtering by block numbers. * **Bug Fixes** * Improved RPC client initialization by handling chain ID setup errors properly. * **Tests** * Added mock methods to support testing of validation and data retrieval features. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 6e554a8 + 7f6a1dc commit f54705e

File tree

16 files changed

+1409
-121
lines changed

16 files changed

+1409
-121
lines changed

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ func init() {
220220
viper.BindPFlag("workMode.liveModeThreshold", rootCmd.PersistentFlags().Lookup("workMode-liveModeThreshold"))
221221
rootCmd.AddCommand(orchestratorCmd)
222222
rootCmd.AddCommand(apiCmd)
223+
rootCmd.AddCommand(validateAndFixCmd)
224+
rootCmd.AddCommand(validateCmd)
223225
}
224226

225227
func initConfig() {

cmd/validate.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package cmd
2+
3+
import (
4+
"math/big"
5+
6+
"github.com/rs/zerolog/log"
7+
"github.com/spf13/cobra"
8+
config "github.com/thirdweb-dev/indexer/configs"
9+
"github.com/thirdweb-dev/indexer/internal/orchestrator"
10+
"github.com/thirdweb-dev/indexer/internal/rpc"
11+
"github.com/thirdweb-dev/indexer/internal/storage"
12+
)
13+
14+
var (
15+
validateCmd = &cobra.Command{
16+
Use: "validate",
17+
Short: "Validate blockchain data integrity",
18+
Long: "Validate a range of blocks for data integrity issues including transaction roots and logs bloom verification",
19+
Run: func(cmd *cobra.Command, args []string) {
20+
RunValidate(cmd, args)
21+
},
22+
}
23+
)
24+
25+
/**
26+
* Validates a range of blocks (end and start are inclusive) for a given chain
27+
* First argument is the start block number
28+
* Second argument (optional) is the end block number
29+
*/
30+
func RunValidate(cmd *cobra.Command, args []string) {
31+
if len(args) < 1 {
32+
log.Fatal().Msg("Start block number is required")
33+
}
34+
startBlock, success := new(big.Int).SetString(args[0], 10)
35+
if !success {
36+
log.Fatal().Msg("Failed to parse start block number")
37+
}
38+
39+
var endBlock *big.Int
40+
if len(args) > 1 {
41+
endBlock, success = new(big.Int).SetString(args[1], 10)
42+
if !success {
43+
log.Fatal().Msg("Failed to parse end block number")
44+
}
45+
}
46+
if endBlock == nil {
47+
endBlock = startBlock
48+
}
49+
50+
rpcClient, err := rpc.Initialize()
51+
if err != nil {
52+
log.Fatal().Err(err).Msg("Failed to initialize RPC")
53+
}
54+
log.Info().Msgf("Running validation for chain %d", rpcClient.GetChainID())
55+
56+
s, err := storage.NewStorageConnector(&config.Cfg.Storage)
57+
if err != nil {
58+
log.Fatal().Err(err).Msg("Failed to initialize storage")
59+
}
60+
61+
validator := orchestrator.NewValidator(rpcClient, s)
62+
63+
_, invalidBlocks, err := validator.ValidateBlockRange(startBlock, endBlock)
64+
if err != nil {
65+
log.Fatal().Err(err).Msg("Failed to validate blocks")
66+
}
67+
68+
if len(invalidBlocks) > 0 {
69+
log.Info().Msgf("Found %d invalid blocks", len(invalidBlocks))
70+
for _, block := range invalidBlocks {
71+
log.Info().Msgf("Invalid block: %s", block.Block.Number)
72+
}
73+
} else {
74+
log.Info().Msg("No invalid blocks found")
75+
}
76+
}

cmd/validate_and_fix.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package cmd
2+
3+
import (
4+
"crypto/tls"
5+
"fmt"
6+
"math/big"
7+
"strconv"
8+
9+
"github.com/ClickHouse/clickhouse-go/v2"
10+
"github.com/rs/zerolog/log"
11+
"github.com/spf13/cobra"
12+
config "github.com/thirdweb-dev/indexer/configs"
13+
"github.com/thirdweb-dev/indexer/internal/orchestrator"
14+
"github.com/thirdweb-dev/indexer/internal/rpc"
15+
"github.com/thirdweb-dev/indexer/internal/storage"
16+
"github.com/thirdweb-dev/indexer/internal/validation"
17+
)
18+
19+
var (
20+
validateAndFixCmd = &cobra.Command{
21+
Use: "validateAndFix",
22+
Short: "Validate and fix blockchain data",
23+
Long: "Validate blockchain data in batches and automatically fix any issues found including duplicates, gaps, and invalid blocks",
24+
Run: func(cmd *cobra.Command, args []string) {
25+
RunValidateAndFix(cmd, args)
26+
},
27+
}
28+
)
29+
30+
func RunValidateAndFix(cmd *cobra.Command, args []string) {
31+
batchSize := big.NewInt(1000)
32+
fixBatchSize := 0 // default is no batch size
33+
if len(args) > 0 {
34+
batchSizeFromArgs, err := strconv.Atoi(args[0])
35+
if err != nil {
36+
log.Fatal().Err(err).Msg("Failed to parse batch size")
37+
}
38+
if batchSizeFromArgs < 1 {
39+
batchSizeFromArgs = 1
40+
}
41+
batchSize = big.NewInt(int64(batchSizeFromArgs))
42+
log.Info().Msgf("Using batch size %d from args", batchSize)
43+
}
44+
if len(args) > 1 {
45+
fixBatchSizeFromArgs, err := strconv.Atoi(args[1])
46+
if err != nil {
47+
log.Fatal().Err(err).Msg("Failed to parse fix batch size")
48+
}
49+
fixBatchSize = fixBatchSizeFromArgs
50+
}
51+
log.Debug().Msgf("Batch size: %d, fix batch size: %d", batchSize, fixBatchSize)
52+
batchSize = new(big.Int).Sub(batchSize, big.NewInt(1)) // -1 because range ends are inclusive
53+
54+
rpcClient, err := rpc.Initialize()
55+
if err != nil {
56+
log.Fatal().Err(err).Msg("Failed to initialize RPC")
57+
}
58+
log.Info().Msgf("Running validationAndFix for chain %d", rpcClient.GetChainID())
59+
60+
s, err := storage.NewStorageConnector(&config.Cfg.Storage)
61+
if err != nil {
62+
log.Fatal().Err(err).Msg("Failed to initialize storage")
63+
}
64+
cursor, err := validation.InitCursor(rpcClient.GetChainID(), s)
65+
if err != nil {
66+
log.Fatal().Err(err).Msg("Failed to initialize cursor")
67+
}
68+
log.Debug().Msgf("Cursor initialized for chain %d, starting from block %d", rpcClient.GetChainID(), cursor.LastScannedBlockNumber)
69+
70+
conn, err := clickhouse.Open(&clickhouse.Options{
71+
Addr: []string{fmt.Sprintf("%s:%d", config.Cfg.Storage.Main.Clickhouse.Host, config.Cfg.Storage.Main.Clickhouse.Port)},
72+
Protocol: clickhouse.Native,
73+
TLS: &tls.Config{
74+
MinVersion: tls.VersionTLS12,
75+
},
76+
Auth: clickhouse.Auth{
77+
Username: config.Cfg.Storage.Main.Clickhouse.Username,
78+
Password: config.Cfg.Storage.Main.Clickhouse.Password,
79+
},
80+
Settings: func() clickhouse.Settings {
81+
settings := clickhouse.Settings{
82+
"do_not_merge_across_partitions_select_final": "1",
83+
"use_skip_indexes_if_final": "1",
84+
"optimize_move_to_prewhere_if_final": "1",
85+
"async_insert": "1",
86+
"wait_for_async_insert": "1",
87+
}
88+
return settings
89+
}(),
90+
})
91+
if err != nil {
92+
log.Fatal().Err(err).Msg("Failed to connect to ClickHouse")
93+
}
94+
defer conn.Close()
95+
96+
startBlock := new(big.Int).Add(cursor.LastScannedBlockNumber, big.NewInt(1))
97+
98+
for startBlock.Cmp(cursor.MaxBlockNumber) <= 0 {
99+
batchEndBlock := new(big.Int).Add(startBlock, batchSize)
100+
if batchEndBlock.Cmp(cursor.MaxBlockNumber) > 0 {
101+
batchEndBlock = new(big.Int).Set(cursor.MaxBlockNumber)
102+
}
103+
104+
log.Info().Msgf("Validating batch of blocks from %s to %s", startBlock.String(), batchEndBlock.String())
105+
err := validateAndFixRange(rpcClient, s, conn, startBlock, batchEndBlock, fixBatchSize)
106+
if err != nil {
107+
log.Fatal().Err(err).Msgf("failed to validate and fix range %v-%v", startBlock, batchEndBlock)
108+
}
109+
110+
startBlock = new(big.Int).Add(batchEndBlock, big.NewInt(1))
111+
cursor.Update(batchEndBlock)
112+
}
113+
}
114+
115+
/**
116+
* Validates a range of blocks (end and start are inclusive) for a given chain and fixes any problems it finds
117+
*/
118+
func validateAndFixRange(rpcClient rpc.IRPCClient, s storage.IStorage, conn clickhouse.Conn, startBlock *big.Int, endBlock *big.Int, fixBatchSize int) error {
119+
validator := orchestrator.NewValidator(rpcClient, s)
120+
121+
chainId := rpcClient.GetChainID()
122+
err := validation.FindAndRemoveDuplicates(conn, chainId, startBlock, endBlock)
123+
if err != nil {
124+
return fmt.Errorf("failed to find and fix duplicates: %w", err)
125+
}
126+
127+
err = validator.FindAndFixGaps(startBlock, endBlock)
128+
if err != nil {
129+
return fmt.Errorf("failed to find and fix gaps: %w", err)
130+
}
131+
132+
_, invalidBlocks, err := validator.ValidateBlockRange(startBlock, endBlock)
133+
if err != nil {
134+
return fmt.Errorf("failed to validate and fix blocks: %w", err)
135+
}
136+
137+
invalidBlockNumbers := make([]*big.Int, 0)
138+
for _, blockData := range invalidBlocks {
139+
invalidBlockNumbers = append(invalidBlockNumbers, blockData.Block.Number)
140+
}
141+
142+
if len(invalidBlocks) > 0 {
143+
err = validator.FixBlocks(invalidBlockNumbers, fixBatchSize)
144+
if err != nil {
145+
return fmt.Errorf("failed to fix blocks: %w", err)
146+
}
147+
}
148+
149+
log.Debug().Msgf("ValidationAndFix complete for range %v-%v", startBlock, endBlock)
150+
return nil
151+
}

go.mod

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ go 1.23.0
44

55
require (
66
github.com/ClickHouse/clickhouse-go/v2 v2.36.0
7-
github.com/ethereum/go-ethereum v1.14.8
7+
github.com/ethereum/go-ethereum v1.15.11
88
github.com/gin-gonic/gin v1.10.0
99
github.com/gorilla/schema v1.4.1
10+
github.com/holiman/uint256 v1.3.2
1011
github.com/prometheus/client_golang v1.20.4
1112
github.com/rs/zerolog v1.33.0
1213
github.com/spf13/cobra v1.8.1
@@ -24,21 +25,21 @@ require (
2425
github.com/Microsoft/go-winio v0.6.2 // indirect
2526
github.com/andybalholm/brotli v1.1.1 // indirect
2627
github.com/beorn7/perks v1.0.1 // indirect
27-
github.com/bits-and-blooms/bitset v1.10.0 // indirect
28-
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
28+
github.com/bits-and-blooms/bitset v1.20.0 // indirect
2929
github.com/bytedance/sonic v1.12.6 // indirect
3030
github.com/bytedance/sonic/loader v0.2.1 // indirect
3131
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3232
github.com/cloudwego/base64x v0.1.4 // indirect
3333
github.com/cloudwego/iasm v0.2.0 // indirect
34-
github.com/consensys/bavard v0.1.13 // indirect
35-
github.com/consensys/gnark-crypto v0.12.1 // indirect
36-
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
37-
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
34+
github.com/consensys/bavard v0.1.27 // indirect
35+
github.com/consensys/gnark-crypto v0.16.0 // indirect
36+
github.com/crate-crypto/go-eth-kzg v1.3.0 // indirect
37+
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect
3838
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3939
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
4040
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
41-
github.com/ethereum/c-kzg-4844 v1.0.0 // indirect
41+
github.com/ethereum/c-kzg-4844/v2 v2.1.0 // indirect
42+
github.com/ethereum/go-verkle v0.2.2 // indirect
4243
github.com/fsnotify/fsnotify v1.7.0 // indirect
4344
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
4445
github.com/gin-contrib/sse v0.1.0 // indirect
@@ -53,11 +54,11 @@ require (
5354
github.com/go-playground/universal-translator v0.18.1 // indirect
5455
github.com/go-playground/validator/v10 v10.23.0 // indirect
5556
github.com/goccy/go-json v0.10.4 // indirect
56-
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
57+
github.com/gofrs/flock v0.8.1 // indirect
58+
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
5759
github.com/google/uuid v1.6.0 // indirect
5860
github.com/gorilla/websocket v1.4.2 // indirect
5961
github.com/hashicorp/hcl v1.0.0 // indirect
60-
github.com/holiman/uint256 v1.3.1 // indirect
6162
github.com/inconshreveable/mousetrap v1.1.0 // indirect
6263
github.com/josharian/intern v1.0.0 // indirect
6364
github.com/json-iterator/go v1.1.12 // indirect
@@ -68,11 +69,13 @@ require (
6869
github.com/mailru/easyjson v0.7.7 // indirect
6970
github.com/mattn/go-colorable v0.1.13 // indirect
7071
github.com/mattn/go-isatty v0.0.20 // indirect
72+
github.com/mattn/go-runewidth v0.0.13 // indirect
7173
github.com/mitchellh/mapstructure v1.5.0 // indirect
7274
github.com/mmcloughlin/addchain v0.4.0 // indirect
7375
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
7476
github.com/modern-go/reflect2 v1.0.2 // indirect
7577
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
78+
github.com/olekukonko/tablewriter v0.0.5 // indirect
7679
github.com/paulmach/orb v0.11.1 // indirect
7780
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
7881
github.com/pierrec/lz4/v4 v4.1.22 // indirect
@@ -81,6 +84,7 @@ require (
8184
github.com/prometheus/client_model v0.6.1 // indirect
8285
github.com/prometheus/common v0.55.0 // indirect
8386
github.com/prometheus/procfs v0.15.1 // indirect
87+
github.com/rivo/uniseg v0.2.0 // indirect
8488
github.com/sagikazarmark/locafero v0.4.0 // indirect
8589
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
8690
github.com/segmentio/asm v1.2.0 // indirect
@@ -92,13 +96,12 @@ require (
9296
github.com/spf13/pflag v1.0.5 // indirect
9397
github.com/stretchr/objx v0.5.2 // indirect
9498
github.com/subosito/gotenv v1.6.0 // indirect
95-
github.com/supranational/blst v0.3.11 // indirect
99+
github.com/supranational/blst v0.3.14 // indirect
96100
github.com/tklauser/go-sysconf v0.3.12 // indirect
97101
github.com/tklauser/numcpus v0.6.1 // indirect
98102
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
99103
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
100104
github.com/ugorji/go/codec v1.2.12 // indirect
101-
github.com/urfave/cli/v2 v2.27.4 // indirect
102105
github.com/yusufpapurcu/wmi v1.2.4 // indirect
103106
go.opentelemetry.io/otel v1.36.0 // indirect
104107
go.opentelemetry.io/otel/trace v1.36.0 // indirect

0 commit comments

Comments
 (0)