Skip to content

Commit 22fb13b

Browse files
committed
wip
1 parent 5b76127 commit 22fb13b

File tree

3 files changed

+320
-4
lines changed

3 files changed

+320
-4
lines changed

backend/pkg/exporter/db/db.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
ch "github.com/ClickHouse/clickhouse-go/v2"
17+
"github.com/ClickHouse/clickhouse-go/v2/ext"
1718
"github.com/attestantio/go-eth2-client/spec/phase0"
1819
"github.com/doug-martin/goqu/v9"
1920
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
@@ -2427,3 +2428,98 @@ const (
24272428
AggregateWeekly AggregateType = "_final_validator_dashboard_data_weekly"
24282429
AggregateMonthly AggregateType = "_final_validator_dashboard_data_monthly"
24292430
)
2431+
2432+
type SelectorType string
2433+
2434+
const (
2435+
WithdrawalAddressSelector SelectorType = "withdrawal_address"
2436+
WithdrawalCredentialsSelector SelectorType = "withdrawal_credentials"
2437+
DepositAddressSelector SelectorType = "deposit_address"
2438+
PublicKeySelector SelectorType = "public_key"
2439+
)
2440+
2441+
const LookupTableName = "_lookup_validator"
2442+
2443+
// NewLookupExternalTable creates a ClickHouse external table with the standard
2444+
// schema expected by UpdateLookupTable: (selector_type, selector, validator_index).
2445+
func NewLookupExternalTable() (*ext.Table, error) {
2446+
return ext.NewTable("external_data",
2447+
ext.Column("selector_type", "String"),
2448+
ext.Column("selector", "String"),
2449+
ext.Column("validator_index", "UInt64"),
2450+
)
2451+
}
2452+
2453+
// UpdateLookupTable replaces all entries of the given selector type in the lookup table
2454+
// with rows from the provided external ClickHouse table. Any existing rows of that
2455+
// selector type not present in the external table are marked as deleted.
2456+
//
2457+
// The external table must have exactly these columns:
2458+
// - selector_type
2459+
// - selector
2460+
// - validator_index
2461+
func UpdateLookupTable(parentCtx context.Context, selector SelectorType, dat *ext.Table) error {
2462+
if dat == nil {
2463+
return fmt.Errorf("external table is nil")
2464+
}
2465+
cols := dat.Block().Columns
2466+
// columns must be selector_type, selector, validator_index
2467+
if len(cols) != 3 {
2468+
return fmt.Errorf("invalid number of columns in external data table: expected 3, got %d", len(cols))
2469+
}
2470+
for i, col := range []string{"selector_type", "selector", "validator_index"} {
2471+
if cols[i].Name() != col {
2472+
return fmt.Errorf("invalid column %s at position %d, expected %s", cols[i].Name(), i, col)
2473+
}
2474+
}
2475+
2476+
externalTableName := dat.Name()
2477+
now := time.Now().UTC()
2478+
2479+
ds := goqu.
2480+
Dialect("postgres").
2481+
From(goqu.L(LookupTableName+" FINAL")).
2482+
Select(
2483+
goqu.C("selector_type"),
2484+
goqu.C("selector"),
2485+
goqu.C("validator_index"),
2486+
goqu.V(1).As("is_deleted"),
2487+
goqu.V(now.Add(-1*time.Second)).As("updated_at"), // ensures its older than the new entries
2488+
).
2489+
Where(
2490+
goqu.C("selector_type").Eq(string(selector)),
2491+
goqu.L("(selector_type, selector, validator_index) NOT IN ?", goqu.Dialect("postgres").
2492+
From(goqu.T(externalTableName)).Select(
2493+
goqu.C("selector_type"),
2494+
goqu.C("selector"),
2495+
goqu.C("validator_index"),
2496+
),
2497+
),
2498+
goqu.C("is_deleted").Eq(0),
2499+
).
2500+
UnionAll(
2501+
goqu.Dialect("postgres").
2502+
From(goqu.T(externalTableName)).
2503+
Select(
2504+
goqu.C("selector_type"),
2505+
goqu.C("selector"),
2506+
goqu.C("validator_index"),
2507+
goqu.V(0).As("is_deleted"),
2508+
goqu.V(now).As("updated_at"),
2509+
).
2510+
Where(
2511+
goqu.C("selector_type").Eq(string(selector)),
2512+
),
2513+
)
2514+
insert := goqu.Dialect("postgres").Insert(LookupTableName).FromQuery(ds)
2515+
query, args, err := insert.Prepared(true).ToSQL()
2516+
if err != nil {
2517+
return fmt.Errorf("error building lookup table update query: %w", err)
2518+
}
2519+
ctx := ch.Context(parentCtx, ch.WithExternalTable(dat))
2520+
_, err = db.ClickHouseWriter.ExecContext(ctx, query, args...)
2521+
if err != nil {
2522+
return fmt.Errorf("error updating lookup table: %w", err)
2523+
}
2524+
return nil
2525+
}

backend/pkg/exporter/modules/execution_deposits_exporter.go

Lines changed: 104 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"time"
1313

1414
"github.com/attestantio/go-eth2-client/spec/phase0"
15+
"github.com/doug-martin/goqu/v9"
16+
_ "github.com/doug-martin/goqu/v9/dialect/postgres"
1517
"github.com/ethereum/go-ethereum/accounts/abi"
1618
"github.com/ethereum/go-ethereum/accounts/abi/bind"
1719
"github.com/ethereum/go-ethereum/common"
@@ -26,11 +28,13 @@ import (
2628
"github.com/gobitfly/beaconchain/pkg/commons/contracts/deposit_contract"
2729
"github.com/gobitfly/beaconchain/pkg/commons/db"
2830
"github.com/gobitfly/beaconchain/pkg/commons/log"
31+
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
2932
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
3033
"github.com/gobitfly/beaconchain/pkg/commons/services"
3134
"github.com/gobitfly/beaconchain/pkg/commons/types"
3235
"github.com/gobitfly/beaconchain/pkg/commons/utils"
3336
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
37+
edb "github.com/gobitfly/beaconchain/pkg/exporter/db"
3438
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
3539
)
3640

@@ -50,6 +54,7 @@ type executionDepositsExporter struct {
5054
LastExportedBlock uint64
5155
LastExportedFinalizedBlock uint64
5256
LastExportedFinalizedBlockRedisKey string
57+
LastLookupTableRefresh time.Time
5358
CurrentHeadBlock atomic.Uint64
5459
Signer gethtypes.Signer
5560
DepositMethod abi.Method
@@ -278,14 +283,32 @@ func (d *executionDepositsExporter) export() (err error) {
278283
}
279284

280285
start := time.Now()
281-
// update cached view
282-
err = d.updateCachedView()
286+
var g2 errgroup.Group
287+
g2.Go(func() error {
288+
// update cached view
289+
err := d.updateCachedView()
290+
if err != nil {
291+
return err
292+
}
293+
log.Debugf("updating cached deposits view took %v", time.Since(start))
294+
return nil
295+
})
296+
g2.Go(func() error {
297+
if d.LastLookupTableRefresh.Before(time.Now().Add(-time.Minute * 5)) {
298+
err := d.upkeepLookupTable()
299+
if err != nil {
300+
return err
301+
}
302+
d.LastLookupTableRefresh = time.Now()
303+
log.Debugf("updating lookup table took %v", time.Since(start))
304+
}
305+
return nil
306+
})
307+
err = g2.Wait()
283308
if err != nil {
284309
return err
285310
}
286311

287-
log.Debugf("updating cached deposits view took %v", time.Since(start))
288-
289312
return nil
290313
}
291314

@@ -694,3 +717,80 @@ func (d *executionDepositsExporter) updateCachedView() error {
694717
[]string{"dashboard_id", "amount"})
695718
return err
696719
}
720+
721+
func (d *executionDepositsExporter) upkeepLookupTable() error {
722+
// metrics: overall invocation
723+
metrics.Tasks.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
724+
startTotal := time.Now()
725+
defer func() { metrics.TaskDuration.WithLabelValues("execution_deposits_exporter_upkeep_lookup.total").Observe(time.Since(startTotal).Seconds()) }()
726+
727+
// Build external ClickHouse table with required schema
728+
tbl, err := edb.NewLookupExternalTable()
729+
if err != nil {
730+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
731+
return fmt.Errorf("error creating external table: %w", err)
732+
}
733+
734+
// Fetch distinct deposit address -> validator index mappings from Postgres (built with goqu)
735+
startPg := time.Now()
736+
ds := goqu.Dialect("postgres").
737+
From(goqu.T("eth1_deposits").As("ed")).
738+
Select(
739+
goqu.Func("ENCODE", goqu.T("ed").Col("from_address"), "hex").As("from_hex"),
740+
goqu.T("v").Col("validatorindex").As("validator_index"),
741+
).
742+
Distinct().
743+
InnerJoin(
744+
goqu.T("validators").As("v"),
745+
goqu.On(goqu.T("v").Col("pubkey").Eq(goqu.T("ed").Col("publickey"))),
746+
).
747+
Where(
748+
goqu.T("ed").Col("from_address").IsNotNull(),
749+
)
750+
sqlStr, args, err := ds.Prepared(true).ToSQL()
751+
if err != nil {
752+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
753+
return fmt.Errorf("error building deposit address mappings query: %w", err)
754+
}
755+
rows, err := db.ReaderDb.Query(sqlStr, args...)
756+
metrics.TaskDuration.WithLabelValues("execution_deposits_exporter_upkeep_lookup.pg_query").Observe(time.Since(startPg).Seconds())
757+
if err != nil {
758+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
759+
return fmt.Errorf("error querying deposit address mappings: %w", err)
760+
}
761+
defer rows.Close()
762+
763+
startAppend := time.Now()
764+
var rowsAppended int64
765+
for rows.Next() {
766+
var fromHex string
767+
var validatorIndex int64
768+
if err := rows.Scan(&fromHex, &validatorIndex); err != nil {
769+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
770+
return fmt.Errorf("error scanning deposit address mapping row: %w", err)
771+
}
772+
// Prefix 0x for selector value consistency
773+
selector := "0x" + fromHex
774+
if err := tbl.Append(string(edb.DepositAddressSelector), selector, uint64(validatorIndex)); err != nil {
775+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
776+
return fmt.Errorf("error appending row to external table: %w", err)
777+
}
778+
rowsAppended++
779+
}
780+
if err := rows.Err(); err != nil {
781+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
782+
return fmt.Errorf("row iteration error: %w", err)
783+
}
784+
metrics.TaskDuration.WithLabelValues("execution_deposits_exporter_upkeep_lookup.append_rows").Observe(time.Since(startAppend).Seconds())
785+
metrics.Counter.WithLabelValues("execution_deposits_exporter_upkeep_lookup.rows_appended").Add(float64(rowsAppended))
786+
787+
// Update the ClickHouse lookup table
788+
startCH := time.Now()
789+
ctx := context.Background()
790+
if err := edb.UpdateLookupTable(ctx, edb.DepositAddressSelector, tbl); err != nil {
791+
metrics.Errors.WithLabelValues("execution_deposits_exporter_upkeep_lookup").Inc()
792+
return fmt.Errorf("error updating deposit address lookup table: %w", err)
793+
}
794+
metrics.TaskDuration.WithLabelValues("execution_deposits_exporter_upkeep_lookup.ch_update").Observe(time.Since(startCH).Seconds())
795+
return nil
796+
}

0 commit comments

Comments
 (0)