Skip to content

Commit f0edc6c

Browse files
peterbitflyguybrush
authored andcommitted
feat(exporter): add functionality for a no downtime migration of legacy balances from clickhouse to bigtable
misc(exporter): please linter, fix typo misc(exporter): add clickhouse table migration file, switch from epoch to timestamp to enable future garbage collection misc(exporter): add retries when migrating legacy balances
1 parent fa01987 commit f0edc6c

File tree

6 files changed

+183
-2
lines changed

6 files changed

+183
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea/
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package commands
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"time"
7+
8+
"github.com/gobitfly/beaconchain/cmd/misc/misctypes"
9+
"github.com/gobitfly/beaconchain/pkg/commons/db"
10+
"github.com/gobitfly/beaconchain/pkg/commons/log"
11+
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
12+
"github.com/gobitfly/beaconchain/pkg/commons/types"
13+
)
14+
15+
type MigrateLegacyBalancesCommand struct {
16+
FlagSet *flag.FlagSet
17+
Config migrateLegacyBalancesCommandConfig
18+
}
19+
20+
type migrateLegacyBalancesCommandConfig struct {
21+
StartEpoch uint64
22+
EndEpoch uint64
23+
}
24+
25+
func (s *MigrateLegacyBalancesCommand) ParseCommandOptions() {
26+
s.FlagSet.Uint64Var(&s.Config.StartEpoch, "legacy-balances-start-epoch", 0, "Start epoch for the balance migration")
27+
s.FlagSet.Uint64Var(&s.Config.EndEpoch, "legacy-balances-end-epoch", 0, "End epoch for the balance migration")
28+
}
29+
30+
func (s *MigrateLegacyBalancesCommand) Requires() misctypes.Requires {
31+
return misctypes.Requires{
32+
ClickhouseDBs: true,
33+
ClNode: true,
34+
}
35+
}
36+
37+
func (s *MigrateLegacyBalancesCommand) Run(clClient *rpc.LighthouseClient) error {
38+
if s.Config.EndEpoch == 0 {
39+
s.showHelp()
40+
return fmt.Errorf("end-epoch is required")
41+
}
42+
43+
log.Infof("command: migrate-legacy-balances start-epoch=%d end-epoch=%d", s.Config.StartEpoch, s.Config.EndEpoch)
44+
45+
for epoch := s.Config.StartEpoch; epoch <= s.Config.EndEpoch; epoch++ {
46+
for attempt := 1; attempt <= 5; attempt++ {
47+
state, err := clClient.GetValidatorState(epoch)
48+
if err != nil {
49+
log.Error(err, fmt.Sprintf("error getting balances for epoch %d (attempt %d/5): %v", epoch, attempt, err), 0)
50+
if attempt == 5 {
51+
return fmt.Errorf("failed to get balances for epoch %d after %d attempts: %w", epoch, attempt, err)
52+
}
53+
time.Sleep(5 * time.Second)
54+
continue
55+
}
56+
57+
validators := make([]*types.Validator, 0, len(state.Data))
58+
for _, validator := range state.Data {
59+
validators = append(validators, &types.Validator{
60+
Index: validator.Index,
61+
Balance: validator.Balance,
62+
EffectiveBalance: validator.Validator.EffectiveBalance,
63+
})
64+
}
65+
66+
log.Infof("writing epoch %d to clickhouse with %d validators", epoch, len(state.Data))
67+
err = db.SaveLegacyValidatorBalancesToClickhouse(epoch, validators)
68+
if err != nil {
69+
log.Error(err, fmt.Sprintf("error saving legacy balances for epoch %d (attempt %d/5): %v", epoch, attempt, err), 0)
70+
if attempt == 5 {
71+
return fmt.Errorf("error saving legacy balances for epoch %d after %d attempts: %w", epoch, attempt, err)
72+
}
73+
time.Sleep(5 * time.Second)
74+
continue
75+
}
76+
77+
// success for this epoch
78+
break
79+
}
80+
}
81+
return nil
82+
}
83+
84+
func (s *MigrateLegacyBalancesCommand) showHelp() {
85+
log.Infof("Usage: migrate_legacy_balances [options]")
86+
log.Infof("Options:")
87+
log.Infof(" --legacy-balances-start-epoch int\tStart epoch for the balance migration")
88+
log.Infof(" --legacy-balances-end-epoch int\tEnd epoch for the balance migration")
89+
}

backend/cmd/misc/main.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ var REQUIRES_LIST = map[string]misctypes.Requires{
8787
"clear-raw-bigtable": {
8888
RawBigtable: true,
8989
},
90-
"app-bundle": (&commands.AppBundleCommand{}).Requires(),
90+
"app-bundle": (&commands.AppBundleCommand{}).Requires(),
91+
"migrate-legacy-balances": (&commands.MigrateLegacyBalancesCommand{}).Requires(),
9192
"update-highest-active-validatorindex": {
9293
Bigtable: true,
9394
ClNode: true,
@@ -105,8 +106,12 @@ func Run() {
105106
FlagSet: fs,
106107
}
107108

109+
migrateLegacyBalancesCommand := commands.MigrateLegacyBalancesCommand{
110+
FlagSet: fs,
111+
}
112+
108113
configPath := fs.String("config", "config/default.config.yml", "Path to the config file")
109-
fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, clear-raw-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, collect-notifications, collect-user-db-notifications, verify-fcm-tokens, app-bundle, update-highest-active-validatorindex")
114+
fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, clear-raw-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, collect-notifications, collect-user-db-notifications, verify-fcm-tokens, app-bundle, update-highest-active-validatorindex, migrate-legacy-balances")
110115
fs.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
111116
fs.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
112117
fs.Uint64Var(&opts.User, "user", 0, "user id")
@@ -133,6 +138,8 @@ func Run() {
133138

134139
statsPartitionCommand.ParseCommandOptions()
135140
appBundleCommand.ParseCommandOptions()
141+
migrateLegacyBalancesCommand.ParseCommandOptions()
142+
136143
_ = fs.Parse(os.Args[2:])
137144

138145
if *versionFlag {
@@ -221,6 +228,9 @@ func Run() {
221228
db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&cfg.ClickHouse.WriterDatabase, &cfg.ClickHouse.ReaderDatabase, "clickhouse", "clickhouse")
222229
defer db.ClickHouseReader.Close()
223230
defer db.ClickHouseWriter.Close()
231+
232+
db.ClickHouseNativeWriter = db.MustInitClickhouseNative(&cfg.ClickHouse.WriterDatabase)
233+
defer db.ClickHouseNativeWriter.Close()
224234
}
225235

226236
// Initialize the persistent redis client
@@ -487,6 +497,8 @@ func Run() {
487497
case "app-bundle":
488498
appBundleCommand.Config.DryRun = opts.DryRun
489499
err = appBundleCommand.Run()
500+
case "migrate-legacy-balances":
501+
err = migrateLegacyBalancesCommand.Run(rpcClient)
490502
case "fix-ens":
491503
err = fixEns(erigonClient)
492504
case "fix-ens-addresses":
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package db
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/gobitfly/beaconchain/pkg/commons/types"
8+
"github.com/gobitfly/beaconchain/pkg/commons/utils"
9+
"github.com/google/uuid"
10+
)
11+
12+
type LegacyValidatorBalanceTable struct {
13+
ValidatorIndex []uint32 `db:"validator_index"`
14+
EpochTimestamp []*time.Time `db:"epoch_timestamp"`
15+
Balance []uint64 `db:"balance"`
16+
EffectiveBalance []uint64 `db:"effective_balance"`
17+
}
18+
19+
func (c LegacyValidatorBalanceTable) Get(str string) any {
20+
switch str {
21+
case "validator_index":
22+
return c.ValidatorIndex
23+
case "epoch_timestamp":
24+
return c.EpochTimestamp
25+
case "balance":
26+
return c.Balance
27+
case "effective_balance":
28+
return c.EffectiveBalance
29+
default:
30+
return nil
31+
}
32+
}
33+
func (c LegacyValidatorBalanceTable) Extend(cOther UltraFastClickhouseStruct) error {
34+
return nil
35+
}
36+
37+
func SaveLegacyValidatorBalancesToClickhouse(epoch uint64, state []*types.Validator) error {
38+
epochTs := utils.EpochToTime(epoch)
39+
40+
epochData := LegacyValidatorBalanceTable{}
41+
for _, validator := range state {
42+
epochData.ValidatorIndex = append(epochData.ValidatorIndex, uint32(validator.Index))
43+
epochData.EpochTimestamp = append(epochData.EpochTimestamp, &epochTs)
44+
epochData.Balance = append(epochData.Balance, validator.Balance)
45+
epochData.EffectiveBalance = append(epochData.EffectiveBalance, validator.EffectiveBalance)
46+
}
47+
48+
err := UltraFastDumpToClickhouse(epochData, "legacy_validator_epoch_balances", uuid.New().String())
49+
if err != nil {
50+
return fmt.Errorf("error writing legacy validator epoch balances to clickhouse: %w", err)
51+
}
52+
53+
return nil
54+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
CREATE TABLE IF NOT EXISTS legacy_validator_epoch_balances (
4+
`validator_index` UInt32,
5+
`epoch_timestamp` DateTime,
6+
`balance` UInt64,
7+
`effective_balance` UInt64
8+
)
9+
ENGINE = ReplacingMergeTree
10+
ORDER BY (validator_index, epoch_timestamp);
11+
-- +goose StatementEnd
12+
13+
-- +goose Down
14+
-- +goose StatementBegin
15+
DROP TABLE IF EXISTS legacy_validator_epoch_balances;
16+
-- +goose StatementEnd

backend/pkg/exporter/modules/slot_exporter.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,15 @@ func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) e
502502
return nil
503503
})
504504

505+
// save the validator balances to clickhouse
506+
g.Go(func() error {
507+
err := db.SaveLegacyValidatorBalancesToClickhouse(epoch, block.Validators)
508+
if err != nil {
509+
return fmt.Errorf("error exporting validator balances to clickhouse for slot %v: %w", block.Slot, err)
510+
}
511+
return nil
512+
})
513+
505514
// if we are exporting the head epoch, update the validator db table
506515
if isHeadEpoch {
507516
// this function sets exports the validator status into the db

0 commit comments

Comments
 (0)