Skip to content

Commit 49cff0a

Browse files
committed
wip
1 parent 5b76127 commit 49cff0a

File tree

3 files changed

+315
-4
lines changed

3 files changed

+315
-4
lines changed

backend/pkg/exporter/db/db.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
ch "github.com/ClickHouse/clickhouse-go/v2"
17+
"github.com/ClickHouse/clickhouse-go/v2/ext"
1718
"github.com/attestantio/go-eth2-client/spec/phase0"
1819
"github.com/doug-martin/goqu/v9"
1920
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
@@ -2427,3 +2428,93 @@ const (
24272428
AggregateWeekly AggregateType = "_final_validator_dashboard_data_weekly"
24282429
AggregateMonthly AggregateType = "_final_validator_dashboard_data_monthly"
24292430
)
2431+
2432+
type SelectorType string
2433+
2434+
const (
2435+
WithdrawalAddressSelector SelectorType = "withdrawal_address"
2436+
WithdrawalCredentialsSelector SelectorType = "withdrawal_credentials"
2437+
DepositAddressSelector SelectorType = "deposit_address"
2438+
PublicKeySelector SelectorType = "public_key"
2439+
)
2440+
2441+
const LookupTableName = "_lookup_validator"
2442+
2443+
// NewLookupExternalTable creates a ClickHouse external table with the standard
2444+
// schema expected by UpdateLookupTable: (selector_type, selector, validator_index).
2445+
func NewLookupExternalTable() (*ext.Table, error) {
2446+
return ext.NewTable("external_data",
2447+
ext.Column("selector_type", "String"),
2448+
ext.Column("selector", "String"),
2449+
ext.Column("validator_index", "UInt64"),
2450+
)
2451+
}
2452+
2453+
// UpdateLookupTable replaces all entries of the given selector type in the lookup table
2454+
// with rows from the provided external ClickHouse table. Any existing rows of that
2455+
// selector type not present in the external table are marked as deleted.
2456+
//
2457+
// The external table must have exactly these columns:
2458+
// - selector_type
2459+
// - selector
2460+
// - validator_index
2461+
func UpdateLookupTable(parentCtx context.Context, selector SelectorType, dat *ext.Table) error {
2462+
if dat == nil {
2463+
return fmt.Errorf("external table is nil")
2464+
}
2465+
cols := dat.Block().Columns
2466+
// columns must be selector_type, selector, validator_index
2467+
if len(cols) != 3 {
2468+
return fmt.Errorf("invalid number of columns in external data table: expected 3, got %d", len(cols))
2469+
}
2470+
for i, col := range []string{"selector_type", "selector", "validator_index"} {
2471+
if cols[i].Name() != col {
2472+
return fmt.Errorf("invalid column %s at position %d, expected %s", cols[i].Name(), i, col)
2473+
}
2474+
}
2475+
2476+
externalTableName := dat.Name()
2477+
now := time.Now().UTC()
2478+
ds := goqu.
2479+
From(goqu.L(LookupTableName+" FINAL")).
2480+
Select(
2481+
goqu.C("selector_type"),
2482+
goqu.C("selector"),
2483+
goqu.C("validator_index"),
2484+
goqu.V(true).As("is_deleted"),
2485+
goqu.V(now.Add(-1*time.Second)).As("updated_at"), // ensures its older than the new entries
2486+
).
2487+
Where(
2488+
goqu.C("selector_type").Eq(string(selector)),
2489+
goqu.L("(selector_type, selector, validator_index) not in ?", goqu.From(goqu.T(externalTableName)).Select(
2490+
goqu.C("selector_type"),
2491+
goqu.C("selector"),
2492+
goqu.C("validator_index"),
2493+
)),
2494+
goqu.C("is_deleted").Eq(false),
2495+
).
2496+
UnionAll(
2497+
goqu.From(goqu.T(externalTableName)).
2498+
Select(
2499+
goqu.C("selector_type"),
2500+
goqu.C("selector"),
2501+
goqu.C("validator_index"),
2502+
goqu.V(false).As("is_deleted"),
2503+
goqu.V(now).As("updated_at"),
2504+
).
2505+
Where(
2506+
goqu.C("selector_type").Eq(string(selector)),
2507+
),
2508+
)
2509+
insert := goqu.Insert(LookupTableName).FromQuery(ds)
2510+
query, _, err := insert.Prepared(true).ToSQL()
2511+
if err != nil {
2512+
return fmt.Errorf("error building lookup table update query: %w", err)
2513+
}
2514+
ctx := ch.Context(parentCtx, ch.WithExternalTable(dat))
2515+
_, err = db.ClickHouseWriter.ExecContext(ctx, query)
2516+
if err != nil {
2517+
return fmt.Errorf("error updating lookup table: %w", err)
2518+
}
2519+
return nil
2520+
}

backend/pkg/exporter/modules/execution_deposits_exporter.go

Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"time"
1313

1414
"github.com/attestantio/go-eth2-client/spec/phase0"
15+
"github.com/doug-martin/goqu/v9"
16+
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
1517
"github.com/ethereum/go-ethereum/accounts/abi"
1618
"github.com/ethereum/go-ethereum/accounts/abi/bind"
1719
"github.com/ethereum/go-ethereum/common"
@@ -26,11 +28,13 @@ import (
2628
"github.com/gobitfly/beaconchain/pkg/commons/contracts/deposit_contract"
2729
"github.com/gobitfly/beaconchain/pkg/commons/db"
2830
"github.com/gobitfly/beaconchain/pkg/commons/log"
31+
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
2932
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
3033
"github.com/gobitfly/beaconchain/pkg/commons/services"
3134
"github.com/gobitfly/beaconchain/pkg/commons/types"
3235
"github.com/gobitfly/beaconchain/pkg/commons/utils"
3336
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
37+
edb "github.com/gobitfly/beaconchain/pkg/exporter/db"
3438
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
3539
)
3640

@@ -50,6 +54,7 @@ type executionDepositsExporter struct {
5054
LastExportedBlock uint64
5155
LastExportedFinalizedBlock uint64
5256
LastExportedFinalizedBlockRedisKey string
57+
LastLookupTableRefresh time.Time
5358
CurrentHeadBlock atomic.Uint64
5459
Signer gethtypes.Signer
5560
DepositMethod abi.Method
@@ -278,14 +283,32 @@ func (d *executionDepositsExporter) export() (err error) {
278283
}
279284

280285
start := time.Now()
281-
// update cached view
282-
err = d.updateCachedView()
286+
var g2 errgroup.Group
287+
g2.Go(func() error {
288+
// update cached view
289+
err := d.updateCachedView()
290+
if err != nil {
291+
return err
292+
}
293+
log.Debugf("updating cached deposits view took %v", time.Since(start))
294+
return nil
295+
})
296+
g2.Go(func() error {
297+
if d.LastLookupTableRefresh.Before(time.Now().Add(-time.Minute * 5)) {
298+
err := d.upkeepLookupTable()
299+
if err != nil {
300+
return err
301+
}
302+
d.LastLookupTableRefresh = time.Now()
303+
log.Debugf("updating lookup table took %v", time.Since(start))
304+
}
305+
return nil
306+
})
307+
err = g2.Wait()
283308
if err != nil {
284309
return err
285310
}
286311

287-
log.Debugf("updating cached deposits view took %v", time.Since(start))
288-
289312
return nil
290313
}
291314

@@ -694,3 +717,80 @@ func (d *executionDepositsExporter) updateCachedView() error {
694717
[]string{"dashboard_id", "amount"})
695718
return err
696719
}
720+
721+
func (d *executionDepositsExporter) upkeepLookupTable() error {
722+
// metrics: overall invocation
723+
metrics.Tasks.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
724+
startTotal := time.Now()
725+
defer metrics.TaskDuration.WithLabelValues("execution_deposits_exporter_upkeep_lookup.total").Observe(time.Since(startTotal).Seconds())
726+
727+
// Build external ClickHouse table with required schema
728+
tbl, err := edb.NewLookupExternalTable()
729+
if err != nil {
730+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
731+
return fmt.Errorf("error creating external table: %w", err)
732+
}
733+
734+
// Fetch distinct deposit address -> validator index mappings from Postgres (built with goqu)
735+
startPg := time.Now()
736+
ds := goqu.Dialect("postgres").
737+
From(goqu.T("eth1_deposits").As("ed")).
738+
Select(
739+
goqu.Func("ENCODE", goqu.T("ed").Col("from_address"), "hex").As("from_hex"),
740+
goqu.T("v").Col("validatorindex").As("validator_index"),
741+
).
742+
Distinct().
743+
InnerJoin(
744+
goqu.T("validators").As("v"),
745+
goqu.On(goqu.T("v").Col("pubkey").Eq(goqu.T("ed").Col("publickey"))),
746+
).
747+
Where(
748+
goqu.T("ed").Col("from_address").IsNotNull(),
749+
)
750+
sqlStr, args, err := ds.Prepared(true).ToSQL()
751+
if err != nil {
752+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
753+
return fmt.Errorf("error building deposit address mappings query: %w", err)
754+
}
755+
rows, err := db.ReaderDb.Query(sqlStr, args...)
756+
metrics.TaskDuration.WithLabelValues("execution_deposits_exporter_upkeep_lookup.pg_query").Observe(time.Since(startPg).Seconds())
757+
if err != nil {
758+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
759+
return fmt.Errorf("error querying deposit address mappings: %w", err)
760+
}
761+
defer rows.Close()
762+
763+
startAppend := time.Now()
764+
var rowsAppended int64
765+
for rows.Next() {
766+
var fromHex string
767+
var validatorIndex int64
768+
if err := rows.Scan(&fromHex, &validatorIndex); err != nil {
769+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
770+
return fmt.Errorf("error scanning deposit address mapping row: %w", err)
771+
}
772+
// Prefix 0x for selector value consistency
773+
selector := "0x" + fromHex
774+
if err := tbl.Append(string(edb.DepositAddressSelector), selector, uint64(validatorIndex)); err != nil {
775+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
776+
return fmt.Errorf("error appending row to external table: %w", err)
777+
}
778+
rowsAppended++
779+
}
780+
if err := rows.Err(); err != nil {
781+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
782+
return fmt.Errorf("row iteration error: %w", err)
783+
}
784+
metrics.TaskDuration.WithLabelValues("execution_deposits_exporter_upkeep_lookup.append_rows").Observe(time.Since(startAppend).Seconds())
785+
metrics.Counter.WithLabelValues("execution_deposits_exporter_upkeep_lookup.rows_appended").Add(float64(rowsAppended))
786+
787+
// Update the ClickHouse lookup table
788+
startCH := time.Now()
789+
ctx := context.Background()
790+
if err := edb.UpdateLookupTable(ctx, edb.DepositAddressSelector, tbl); err != nil {
791+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
792+
return fmt.Errorf("error updating deposit address lookup table: %w", err)
793+
}
794+
metrics.TaskDuration.WithLabelValues("execution_deposits_exporter_upkeep_lookup.ch_update").Observe(time.Since(startCH).Seconds())
795+
return nil
796+
}

backend/pkg/exporter/modules/slot_exporter.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/ethereum/go-ethereum/common/hexutil"
1415
"github.com/gobitfly/beaconchain/pkg/commons/cache"
1516
"github.com/gobitfly/beaconchain/pkg/commons/config"
1617
"github.com/gobitfly/beaconchain/pkg/commons/log"
@@ -29,6 +30,7 @@ import (
2930
"golang.org/x/exp/maps"
3031
"golang.org/x/sync/errgroup"
3132

33+
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
3234
edb "github.com/gobitfly/beaconchain/pkg/exporter/db"
3335
)
3436

@@ -684,6 +686,109 @@ func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) e
684686
return nil
685687
})
686688
}
689+
690+
// update pubkey => validator lookup index in clickhouse
691+
g.Go(func() error {
692+
metrics.Tasks.WithLabelValues("slot_exporter_upkeep_lookup_public_key").Inc()
693+
startTotal := time.Now()
694+
defer metrics.TaskDuration.WithLabelValues("slot_exporter_upkeep_lookup_public_key.total").Observe(time.Since(startTotal).Seconds())
695+
tbl, err := edb.NewLookupExternalTable()
696+
if err != nil {
697+
metrics.Errors.WithLabelValues("slot_exporter_upkeep_lookup_public_key").Inc()
698+
return fmt.Errorf("error creating external table: %w", err)
699+
}
700+
startAppend := time.Now()
701+
var rowsAppended int64
702+
for _, v := range block.Validators {
703+
if err := tbl.Append(string(edb.PublicKeySelector), hexutil.Encode(v.PublicKey), uint64(v.Index)); err != nil {
704+
metrics.Errors.WithLabelValues("slot_exporter_upkeep_lookup_public_key").Inc()
705+
return fmt.Errorf("error appending row to external table: %w", err)
706+
}
707+
rowsAppended++
708+
}
709+
metrics.TaskDuration.WithLabelValues("slot_exporter_upkeep_lookup_public_key.append_rows").Observe(time.Since(startAppend).Seconds())
710+
metrics.Counter.WithLabelValues("slot_exporter_upkeep_lookup_public_key.rows_appended").Add(float64(rowsAppended))
711+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
712+
defer cancel()
713+
startCH := time.Now()
714+
if err := edb.UpdateLookupTable(ctx, edb.PublicKeySelector, tbl); err != nil {
715+
metrics.Errors.WithLabelValues("slot_exporter_upkeep_lookup_public_key").Inc()
716+
return fmt.Errorf("error updating external table: %w", err)
717+
}
718+
metrics.TaskDuration.WithLabelValues("slot_exporter_upkeep_lookup_public_key.ch_update").Observe(time.Since(startCH).Seconds())
719+
return nil
720+
})
721+
722+
// update withdrawal credentials => validator index lookup in clickhouse (in parallel)
723+
g.Go(func() error {
724+
metrics.Tasks.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_credentials").Inc()
725+
startTotal := time.Now()
726+
defer metrics.TaskDuration.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_credentials.total").Observe(time.Since(startTotal).Seconds())
727+
tbl, err := edb.NewLookupExternalTable()
728+
if err != nil {
729+
metrics.Errors.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_credentials").Inc()
730+
return fmt.Errorf("error creating external table: %w", err)
731+
}
732+
startAppend := time.Now()
733+
var rowsAppended int64
734+
for _, v := range block.Validators {
735+
if len(v.WithdrawalCredentials) == 0 {
736+
continue
737+
}
738+
selector := hexutil.Encode(v.WithdrawalCredentials)
739+
if err := tbl.Append(string(edb.WithdrawalCredentialsSelector), selector, uint64(v.Index)); err != nil {
740+
metrics.Errors.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_credentials").Inc()
741+
return fmt.Errorf("error appending row to external table: %w", err)
742+
}
743+
rowsAppended++
744+
}
745+
metrics.TaskDuration.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_credentials.append_rows").Observe(time.Since(startAppend).Seconds())
746+
metrics.Counter.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_credentials.rows_appended").Add(float64(rowsAppended))
747+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
748+
defer cancel()
749+
startCH := time.Now()
750+
if err := edb.UpdateLookupTable(ctx, edb.WithdrawalCredentialsSelector, tbl); err != nil {
751+
metrics.Errors.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_credentials").Inc()
752+
return fmt.Errorf("error updating external table: %w", err)
753+
}
754+
metrics.TaskDuration.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_credentials.ch_update").Observe(time.Since(startCH).Seconds())
755+
return nil
756+
})
757+
758+
// update withdrawal address => validator index lookup in clickhouse (in parallel)
759+
g.Go(func() error {
760+
metrics.Tasks.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_address").Inc()
761+
startTotal := time.Now()
762+
defer metrics.TaskDuration.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_address.total").Observe(time.Since(startTotal).Seconds())
763+
tbl, err := edb.NewLookupExternalTable()
764+
if err != nil {
765+
metrics.Errors.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_address").Inc()
766+
return fmt.Errorf("error creating external table: %w", err)
767+
}
768+
startAppend := time.Now()
769+
var rowsAppended int64
770+
for _, v := range block.Validators {
771+
if addr, ok := eth1AddrFromWithdrawalCreds(v.WithdrawalCredentials); ok {
772+
selector := hexutil.Encode(addr)
773+
if err := tbl.Append(string(edb.WithdrawalAddressSelector), selector, uint64(v.Index)); err != nil {
774+
metrics.Errors.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_address").Inc()
775+
return fmt.Errorf("error appending row to external table: %w", err)
776+
}
777+
rowsAppended++
778+
}
779+
}
780+
metrics.TaskDuration.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_address.append_rows").Observe(time.Since(startAppend).Seconds())
781+
metrics.Counter.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_address.rows_appended").Add(float64(rowsAppended))
782+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
783+
defer cancel()
784+
startCH := time.Now()
785+
if err := edb.UpdateLookupTable(ctx, edb.WithdrawalAddressSelector, tbl); err != nil {
786+
metrics.Errors.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_address").Inc()
787+
return fmt.Errorf("error updating external table: %w", err)
788+
}
789+
metrics.TaskDuration.WithLabelValues("slot_exporter_upkeep_lookup_withdrawal_address.ch_update").Observe(time.Since(startCH).Seconds())
790+
return nil
791+
})
687792
}
688793
var epochParticipationStats *types.ValidatorParticipation
689794
if epoch > 0 {
@@ -758,3 +863,18 @@ func (d *slotExporterData) OnChainReorg(event *constypes.StandardEventChainReorg
758863
func (d *slotExporterData) OnFinalizedCheckpoint(event *constypes.StandardFinalizedCheckpointResponse) (err error) {
759864
return nil // nop
760865
}
866+
867+
// eth1AddrFromWithdrawalCreds extracts the 20-byte ETH1 address from 32-byte
868+
// withdrawal credentials that encode an address. Supports both 0x01 and 0x02
869+
// prefixes (wc[0] == 0x01 or 0x02). Returns (addr, true) on success.
870+
func eth1AddrFromWithdrawalCreds(wc []byte) ([]byte, bool) {
871+
if len(wc) != 32 {
872+
return nil, false
873+
}
874+
if wc[0] == 0x01 || wc[0] == 0x02 {
875+
addr := make([]byte, 20)
876+
copy(addr, wc[12:])
877+
return addr, true
878+
}
879+
return nil, false
880+
}

0 commit comments

Comments
 (0)