Skip to content

Commit 815e364

Browse files
committed
refactor(notifications): concurrent beacon api requests
1 parent ef00fc1 commit 815e364

File tree

1 file changed

+174
-114
lines changed

1 file changed

+174
-114
lines changed

backend/pkg/notification/collection.go

Lines changed: 174 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/lib/pq"
2929
"github.com/rocket-pool/rocketpool-go/utils/eth"
3030
"github.com/shopspring/decimal"
31+
"golang.org/x/sync/errgroup"
3132
)
3233

3334
func InitNotificationCollector(pubkeyCachePath string) {
@@ -577,7 +578,7 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
577578
ProposalDividend decimal.Decimal `db:"blocks_dividend"`
578579
ProposalDivisor decimal.Decimal `db:"blocks_divisor"`
579580
SyncDividend decimal.Decimal `db:"sync_dividend"`
580-
SyncDivisor decimal.Decimal `db:"sync_dividend"`
581+
SyncDivisor decimal.Decimal `db:"sync_divisor"`
581582
}
582583

583584
// retrieve rewards for the epoch
@@ -594,62 +595,137 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
594595
}
595596

596597
startSlot := epoch * utils.Config.Chain.ClConfig.SlotsPerEpoch
598+
efficiencyMap := make(map[types.ValidatorIndex]*dbResult, utils.Config.Chain.ClConfig.SlotsPerEpoch)
599+
efficiencyWg := errgroup.Group{}
600+
efficiencyLock := sync.Mutex{}
601+
startTs := time.Now()
597602
// 1. attestations
598603
log.Info("retrieving attestation reward data")
599-
attestationRewards, err := mc.CL.GetAttestationRewards(epoch)
600-
if err != nil {
601-
return fmt.Errorf("error getting attestation rewards: %w", err)
602-
}
603-
efficiencyMap := make(map[types.ValidatorIndex]*dbResult, len(attestationRewards.Data.TotalRewards))
604-
idealAttestationRewardsMap := make(map[uint64]decimal.Decimal)
605-
for _, reward := range attestationRewards.Data.IdealRewards {
606-
idealAttestationRewardsMap[uint64(reward.EffectiveBalance)] = decimal.NewFromInt(int64(reward.Head) + int64(reward.Target) + int64(reward.Source) + int64(reward.InclusionDelay) + int64(reward.Inactivity))
607-
}
608-
for _, reward := range attestationRewards.Data.TotalRewards {
609-
efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)] = &dbResult{
610-
ValidatorIndex: reward.ValidatorIndex,
611-
AttestationDividend: decimal.NewFromInt(
604+
efficiencyWg.Go(func() error {
605+
attestationRewards, err := mc.CL.GetAttestationRewards(epoch)
606+
if err != nil {
607+
return fmt.Errorf("error getting attestation rewards: %w", err)
608+
}
609+
idealAttestationRewardsMap := make(map[uint64]decimal.Decimal)
610+
for _, reward := range attestationRewards.Data.IdealRewards {
611+
idealAttestationRewardsMap[uint64(reward.EffectiveBalance)] = decimal.NewFromInt(int64(reward.Head) + int64(reward.Target) + int64(reward.Source) + int64(reward.InclusionDelay) + int64(reward.Inactivity))
612+
}
613+
efficiencyLock.Lock()
614+
defer efficiencyLock.Unlock()
615+
for _, reward := range attestationRewards.Data.TotalRewards {
616+
if _, ok := efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)]; !ok {
617+
efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)] = &dbResult{
618+
ValidatorIndex: reward.ValidatorIndex,
619+
}
620+
}
621+
efficiency := efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)]
622+
efficiency.AttestationDividend = decimal.NewFromInt(
612623
max(int64(reward.Head), 0) +
613624
max(int64(reward.Target), 0) +
614625
max(int64(reward.Source), 0) +
615626
max(int64(reward.InclusionDelay), 0) +
616-
max(int64(reward.Inactivity), 0)),
617-
AttestationDivisor: idealAttestationRewardsMap[effectiveBalanceMap[reward.ValidatorIndex]],
627+
max(int64(reward.Inactivity), 0))
628+
efficiency.AttestationDivisor = idealAttestationRewardsMap[effectiveBalanceMap[reward.ValidatorIndex]]
618629
}
630+
return nil
631+
})
632+
633+
// 2. sync rewards
634+
log.Info("retrieving sync reward data")
635+
var maxSyncReward int64
636+
for i := range utils.Config.Chain.ClConfig.SlotsPerEpoch {
637+
slot := startSlot + i
638+
efficiencyWg.Go(func() error {
639+
syncRewards, err := mc.CL.GetSyncRewards(slot)
640+
if err != nil && strings.Contains(err.Error(), "NOT_FOUND") {
641+
return nil
642+
} else if err != nil {
643+
return fmt.Errorf("error getting sync rewards for slot %v: %w", slot, err)
644+
}
645+
646+
efficiencyLock.Lock()
647+
defer efficiencyLock.Unlock()
648+
for _, reward := range syncRewards.Data {
649+
if _, ok := efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)]; !ok {
650+
efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)] = &dbResult{
651+
ValidatorIndex: reward.ValidatorIndex,
652+
}
653+
}
654+
efficiency := efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)]
655+
efficiency.SyncDividend = efficiency.SyncDividend.Add(decimal.NewFromInt(max(reward.Reward, 0)))
656+
efficiency.SyncDivisor = efficiency.SyncDivisor.Add(decimal.NewFromInt(1)) // don't know max yet, multiply later
657+
if reward.Reward > maxSyncReward {
658+
maxSyncReward = reward.Reward
659+
}
660+
}
661+
return nil
662+
})
619663
}
620664

621-
// 2. proposals
665+
// 3. proposals
622666
log.Info("retrieving proposal reward data")
623667
// cache for later
624668
type proposerReward struct {
625669
Proposer uint64
626670
Reward *decimal.Decimal
627671
}
628-
proposalRewards := make(map[uint64]proposerReward)
672+
proposalRewards := make(map[uint64]*proposerReward)
629673
// include (up to) half an epoch lookback/ahead buffer for missed rewards
630674
minMedianStart := uint64(0)
631675
if startSlot > utils.Config.Chain.ClConfig.SlotsPerEpoch/2 {
632676
minMedianStart = startSlot - utils.Config.Chain.ClConfig.SlotsPerEpoch/2
633677
}
634678
maxMedianEnd := startSlot + utils.Config.Chain.ClConfig.SlotsPerEpoch*3/2
635-
for slot := minMedianStart; slot < maxMedianEnd; slot++ {
636-
proposalReward, err := mc.CL.GetProposalRewards(slot)
637-
if err != nil {
638-
return fmt.Errorf("error getting attestation rewards: %w", err)
639-
}
640-
var reward *decimal.Decimal
641-
if proposalReward != nil {
642-
reward = &proposalReward.Data.Total
643-
}
644-
proposalRewards[slot] = proposerReward{proposalReward.Data.ProposerIndex, reward}
679+
proposalAssignmentsWg := errgroup.Group{}
680+
var proposalsLock sync.Mutex = sync.Mutex{}
681+
// 3.1 Proposal assignments
682+
for epoch := minMedianStart / utils.Config.Chain.ClConfig.SlotsPerEpoch; epoch <= maxMedianEnd/utils.Config.Chain.ClConfig.SlotsPerEpoch; epoch++ {
683+
proposalAssignmentsWg.Go(func() error {
684+
proposalAssignments, err := mc.CL.GetProposalAssignments(epoch)
685+
if err != nil {
686+
return fmt.Errorf("error getting proposal assignments for epoch %v: %w", epoch, err)
687+
}
688+
689+
proposalsLock.Lock()
690+
defer proposalsLock.Unlock()
691+
for _, assignment := range proposalAssignments.Data {
692+
if uint64(assignment.Slot) >= minMedianStart && uint64(assignment.Slot) <= maxMedianEnd {
693+
proposalRewards[uint64(assignment.Slot)] = &proposerReward{Proposer: assignment.ValidatorIndex}
694+
}
695+
}
696+
return nil
697+
})
698+
}
699+
if err = proposalAssignmentsWg.Wait(); err != nil {
700+
return err
701+
}
702+
703+
// 3.2 Proposal rewards
704+
proposalRewardsWg := errgroup.Group{}
705+
for slot := minMedianStart; slot <= maxMedianEnd; slot++ {
706+
proposalRewardsWg.Go(func() error {
707+
proposalReward, err := mc.CL.GetProposalRewards(slot)
708+
if err != nil && strings.Contains(err.Error(), "NOT_FOUND") {
709+
return nil
710+
} else if err != nil {
711+
return fmt.Errorf("error getting proposal rewards for slot %v: %w", slot, err)
712+
}
713+
714+
curReward := proposalRewards[slot]
715+
curReward.Reward = &proposalReward.Data.Total
716+
return nil
717+
})
718+
}
719+
if err = proposalRewardsWg.Wait(); err != nil {
720+
return err
645721
}
646722

647-
var maxReward int64
723+
// 3.3 Proposal medians
724+
efficiencyLock.Lock()
725+
defer efficiencyLock.Unlock()
648726
for i := range utils.Config.Chain.ClConfig.SlotsPerEpoch {
649727
slot := startSlot + i
650-
// proposals
651728
medianArray := make([]uint64, 0, utils.Config.Chain.ClConfig.SlotsPerEpoch)
652-
// calculate median missed rewards using cached proposal rewards
653729
medianStart := uint64(0)
654730
if slot > utils.Config.Chain.ClConfig.SlotsPerEpoch/2 {
655731
medianStart = slot - utils.Config.Chain.ClConfig.SlotsPerEpoch/2
@@ -663,9 +739,13 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
663739
}
664740
medianArray = append(medianArray, (*curReward).BigInt().Uint64())
665741
}
742+
if len(medianArray) == 0 {
743+
// eg gnosis slot 11737794
744+
medianArray = append(medianArray, 0)
745+
}
666746
slices.Sort(medianArray)
667747
var median uint64
668-
if len(medianArray) > 0 && len(medianArray)%2 == 0 {
748+
if len(medianArray)%2 == 0 {
669749
median = (medianArray[len(medianArray)/2-1] + medianArray[len(medianArray)/2]) / 2
670750
} else {
671751
median = medianArray[len(medianArray)/2]
@@ -683,29 +763,12 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
683763
} else {
684764
efficiency.ProposalDivisor = efficiency.ProposalDivisor.Add(decimal.NewFromUint64(median))
685765
}
766+
}
686767

687-
// 3. sync
688-
syncRewards, err := mc.CL.GetSyncRewards(slot)
689-
if err != nil {
690-
return fmt.Errorf("error getting attestation rewards: %w", err)
691-
}
692-
if syncRewards == nil {
693-
continue
694-
}
695-
for _, reward := range syncRewards.Data {
696-
if _, ok := efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)]; !ok {
697-
efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)] = &dbResult{
698-
ValidatorIndex: reward.ValidatorIndex,
699-
}
700-
}
701-
efficiency := efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)]
702-
efficiency.SyncDividend = efficiency.SyncDividend.Add(decimal.NewFromInt(max(reward.Reward, 0)))
703-
efficiency.SyncDivisor = efficiency.SyncDivisor.Add(decimal.NewFromInt(1)) // used to multiply later
704-
if reward.Reward > maxReward {
705-
maxReward = reward.Reward
706-
}
707-
}
768+
if err = efficiencyWg.Wait(); err != nil {
769+
return err
708770
}
771+
log.Infof("retrieving efficiency data for epoch %v from node took: %v", epoch, time.Since(startTs))
709772

710773
subMap, err := GetSubsForEventFilter(types.ValidatorGroupEfficiencyEventName, "", nil, nil)
711774
if err != nil {
@@ -755,72 +818,69 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
755818

756819
// The commented code below can be used to validate data retrieved from the node against
757820
// data in clickhouse
758-
/*
759-
var queryResult []*dbResult
760-
clickhouseTable := "validator_dashboard_data_epoch"
761-
// retrieve efficiency data for the epoch
762-
log.Infof("retrieving efficiency data for epoch %v", epoch)
763-
ds := goqu.Dialect("postgres").
764-
From(goqu.L(fmt.Sprintf(`%s AS r`, clickhouseTable))).
765-
Select(
766-
goqu.L("validator_index"),
767-
goqu.L("COALESCE(r.attestations_reward, 0) AS attestations_reward"),
768-
goqu.L("COALESCE(r.attestations_ideal_reward, 0) AS attestations_ideal_reward"),
769-
goqu.L("COALESCE(r.blocks_proposed, 0) AS blocks_proposed"),
770-
goqu.L("COALESCE(r.blocks_scheduled, 0) AS blocks_scheduled"),
771-
goqu.L("COALESCE(r.sync_executed, 0) AS sync_executed"),
772-
goqu.L("COALESCE(r.sync_scheduled, 0) AS sync_scheduled")).
773-
Where(goqu.L("r.epoch_timestamp = ?", utils.EpochToTime(epoch)))
774-
query, args, err := ds.Prepared(true).ToSQL()
775-
if err != nil {
776-
return fmt.Errorf("error preparing query: %v", err)
777-
}
778-
779-
err = db.ClickHouseReader.Select(&queryResult, query, args...)
780-
if err != nil {
781-
return fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err)
782-
}
821+
/*startTs = time.Now()
822+
var queryResult []*dbResult
823+
clickhouseTable := "validator_dashboard_data_epoch"
824+
// retrieve efficiency data for the epoch
825+
log.Infof("retrieving efficiency data for epoch %v", epoch)
826+
ds := goqu.Dialect("postgres").
827+
From(goqu.L(fmt.Sprintf(`%s AS r`, clickhouseTable))).
828+
Select(
829+
goqu.L("validator_index"),
830+
goqu.L("efficiency_attestations_dividend").As("attestations_dividend"),
831+
goqu.L("efficiency_attestations_divisor").As("attestations_divisor"),
832+
goqu.L("efficiency_proposals_dividend").As("blocks_dividend"),
833+
goqu.L("efficiency_proposals_divisor").As("blocks_divisor"),
834+
goqu.L("efficiency_sync_dividend").As("sync_dividend"),
835+
goqu.L("efficiency_sync_divisor").As("sync_divisor"),
836+
).
837+
Where(goqu.L("r.epoch_timestamp = fromUnixTimestamp(?)", utils.EpochToTime(epoch).Unix()))
838+
query, args, err := ds.Prepared(true).ToSQL()
839+
if err != nil {
840+
return fmt.Errorf("error preparing query: %v", err)
841+
}
783842
784-
if len(queryResult) == 0 {
785-
return fmt.Errorf("no efficiency data found for epoch %v", epoch)
786-
}
843+
err = db.ClickHouseReader.Select(&queryResult, query, args...)
844+
if err != nil {
845+
return fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err)
846+
}
787847
788-
log.Infof("retrieved %v efficiency data rows", len(queryResult))
848+
if len(queryResult) == 0 {
849+
return fmt.Errorf("no efficiency data found for epoch %v", epoch)
850+
}
851+
log.Infof("retrieving %v efficiency rows for epoch %v from clickhouse took: %v", len(queryResult), epoch, time.Since(startTs))
789852
790-
for _, row := range queryResult {
791-
if _, ok := activeValidatorsMap[row.ValidatorIndex]; !ok {
792-
continue
793-
}
794-
existing := efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)]
853+
for _, row := range queryResult {
854+
if _, ok := activeValidatorsMap[row.ValidatorIndex]; !ok {
855+
continue
856+
}
857+
existing := efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)]
795858
796-
if existing == nil {
797-
existing = &dbResult{
798-
ValidatorIndex: row.ValidatorIndex,
799-
AttestationReward: decimal.Decimal{},
800-
AttestationIdealReward: decimal.Decimal{},
801-
}
802-
}
803-
if !existing.AttestationIdealReward.Equal(row.AttestationIdealReward) {
804-
log.Fatal(fmt.Errorf("ideal reward mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.AttestationIdealReward, row.AttestationIdealReward), "ideal reward mismatch", 0)
805-
}
806-
if !existing.AttestationReward.Equal(row.AttestationReward) {
807-
log.Fatal(fmt.Errorf("attestation reward mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.AttestationReward, row.AttestationReward), "attestation reward mismatch", 0)
859+
if existing == nil {
860+
existing = &dbResult{
861+
ValidatorIndex: row.ValidatorIndex,
808862
}
809-
if existing.BlocksProposed != row.BlocksProposed {
810-
log.Fatal(fmt.Errorf("blocks proposed mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.BlocksProposed, row.BlocksProposed), "blocks proposed mismatch", 0)
811-
}
812-
if existing.BlocksScheduled != row.BlocksScheduled {
813-
log.Fatal(fmt.Errorf("blocks scheduled mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.BlocksScheduled, row.BlocksScheduled), "blocks scheduled mismatch", 0)
814-
}
815-
if existing.SyncExecuted != row.SyncExecuted {
816-
log.Fatal(fmt.Errorf("sync executed mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.SyncExecuted, row.SyncExecuted), "sync executed mismatch", 0)
817-
}
818-
if existing.SyncScheduled != row.SyncScheduled {
819-
log.Fatal(fmt.Errorf("sync scheduled mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.SyncScheduled, row.SyncScheduled), "sync scheduled mismatch", 0)
820-
}
821-
efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)] = row
822863
}
823-
*/
864+
if !existing.AttestationDividend.Equal(row.AttestationDividend) {
865+
log.Fatal(fmt.Errorf("ideal reward mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.AttestationDividend, row.AttestationDividend), "attestation dividend mismatch", 0)
866+
}
867+
if !existing.AttestationDivisor.Equal(row.AttestationDivisor) {
868+
log.Fatal(fmt.Errorf("attestation reward mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.AttestationDivisor, row.AttestationDivisor), "attestation divisor mismatch", 0)
869+
}
870+
if !existing.ProposalDividend.Equal(row.ProposalDividend) {
871+
log.Fatal(fmt.Errorf("blocks proposed mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.ProposalDividend, row.ProposalDividend), "blocks dividend mismatch", 0)
872+
}
873+
if !existing.ProposalDivisor.Equal(row.ProposalDivisor) {
874+
log.Fatal(fmt.Errorf("blocks scheduled mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.ProposalDivisor, row.ProposalDivisor), "blocks divisor mismatch", 0)
875+
}
876+
if !existing.SyncDividend.Equal(row.SyncDividend) {
877+
log.Fatal(fmt.Errorf("sync executed mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.SyncDividend, row.SyncDividend), "sync dividend mismatch", 0)
878+
}
879+
if !existing.SyncDivisor.Mul(decimal.NewFromInt(maxSyncReward)).Equal(row.SyncDivisor) {
880+
log.Fatal(fmt.Errorf("sync scheduled mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.SyncDivisor, row.SyncDivisor), "sync divisor mismatch", 0)
881+
}
882+
efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)] = row
883+
}*/
824884

825885
for userId, dashboards := range dashboardMap {
826886
for dashboardId, groups := range dashboards {
@@ -844,7 +904,7 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
844904
}
845905

846906
dividend := attestationDividend.Add(proposalDividend).Add(syncDividend)
847-
divisor := attestationDivisor.Add(proposalDivisor).Add(syncDivisor)
907+
divisor := attestationDivisor.Add(proposalDivisor).Add(syncDivisor.Mul(decimal.NewFromInt(maxSyncReward)))
848908
var efficiency float64
849909
if !divisor.IsZero() {
850910
efficiency = dividend.Div(divisor).InexactFloat64()

0 commit comments

Comments
 (0)