Skip to content

Commit 886de83

Browse files
committed
refactor: reward charts data access
See: BEDS-875
1 parent 49bd75d commit 886de83

File tree

3 files changed

+136
-169
lines changed

3 files changed

+136
-169
lines changed

backend/pkg/api/data_access/vdb_rewards.go

Lines changed: 130 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -683,263 +683,230 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex
683683
return ret, nil
684684
}
685685

686-
func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Context, dashboardId t.VDBId, groupIds []int64, protocolModes t.VDBProtocolModes, aggregation enums.ChartAggregation, afterTs uint64, beforeTs uint64) (*t.ChartData[int, decimal.Decimal], error) {
687-
// @DATA-ACCESS incorporate protocolModes
688-
// bar chart for the CL and EL rewards for each group for each epoch.
689-
// NO series for all groups combined except if AggregateGroups is true.
690-
// series id is group id, series property is 'cl' or 'el'
691-
ret := &t.ChartData[int, decimal.Decimal]{}
692-
693-
if len(groupIds) == 0 {
694-
return ret, nil
695-
}
696-
697-
var err error
698-
686+
func buildRewardChartClDs(dashboardId t.VDBId, groupIds []int64, afterTs uint64, beforeTs uint64, aggregation enums.ChartAggregation, isAllGroupsRequested bool) *goqu.SelectDataset {
699687
var dataTable exp.LiteralExpression
700688
timeColumn := goqu.C("t")
689+
epochStartCol, epochEndCol := "epoch_start", "epoch_end"
701690
switch aggregation {
702691
case enums.IntervalEpoch:
703692
dataTable = goqu.L("validator_dashboard_data_epoch AS e")
704693
timeColumn = goqu.C("epoch_timestamp")
694+
epochStartCol, epochEndCol = "e.epoch", "e.epoch"
705695
case enums.IntervalHourly:
706-
dataTable = goqu.L("validator_dashboard_data_hourly AS e FINAL")
696+
dataTable = goqu.L("validator_dashboard_data_hourly AS e")
707697
case enums.IntervalDaily:
708-
dataTable = goqu.L("validator_dashboard_data_daily AS e FINAL")
698+
dataTable = goqu.L("validator_dashboard_data_daily AS e")
709699
case enums.IntervalWeekly:
710-
dataTable = goqu.L("validator_dashboard_data_weekly AS e FINAL")
711-
default:
712-
return nil, fmt.Errorf("unexpected aggregation type: %v", aggregation)
700+
dataTable = goqu.L("validator_dashboard_data_weekly AS e")
713701
}
714702

715-
requestedAllGroups := dashboardId.AggregateGroups
716-
for _, groupId := range groupIds {
717-
if groupId == t.AllGroups {
718-
// note: requesting all groups is only convenience on api level, this will NOT result in a "total" series as it wouldn't make sense for this endpoint
719-
requestedAllGroups = true
720-
break
721-
}
722-
}
723-
724-
// ------------------------------------------------------------------------------------------------------------------
725-
// Build the query that serves as base for both the main and EL rewards queries
726-
// CL
727-
rewardsDs := goqu.Dialect("postgres").
703+
clDs := goqu.Dialect("postgres").
728704
Select(
729705
goqu.L(`SUM(e.attestations_reward + e.blocks_cl_reward + e.sync_reward) AS cl_rewards`),
730706
timeColumn.As("timestamp"),
707+
goqu.MIN(epochStartCol).As("epoch_start"),
708+
goqu.MAX(epochEndCol).As("epoch_end"),
731709
).
732710
From(dataTable).
733-
With("validators", goqu.Dialect("postgres").
734-
From(goqu.T("users_val_dashboards_validators")).
735-
Select(
736-
goqu.I("validator_index"),
737-
goqu.I("group_id"),
738-
).
739-
Where(
740-
goqu.I("dashboard_id").Eq(dashboardId.Id),
741-
goqu.Or(
742-
goqu.I("group_id").In(groupIds),
743-
goqu.V(requestedAllGroups),
744-
),
745-
),
746-
).
747711
Where(
748712
timeColumn.Between(goqu.Range(
749713
goqu.L("fromUnixTimestamp(?)", afterTs),
750714
goqu.L("fromUnixTimestamp(?)", beforeTs))),
751-
)
715+
).
716+
GroupBy(timeColumn).
717+
Order(timeColumn.Asc())
752718

753-
if aggregation == enums.IntervalEpoch {
754-
rewardsDs = rewardsDs.
755-
SelectAppend(goqu.L("min(e.epoch)").As("epoch_start")).
756-
SelectAppend(goqu.L("max(e.epoch)").As("epoch_end"))
719+
if dashboardId.Validators == nil {
720+
clDs = clDs.
721+
With("validators", goqu.Dialect("postgres").
722+
From(goqu.T("users_val_dashboards_validators")).
723+
Select(
724+
goqu.I("validator_index"),
725+
goqu.I("group_id"),
726+
).
727+
Where(
728+
goqu.I("dashboard_id").Eq(dashboardId.Id),
729+
goqu.Or(
730+
goqu.I("group_id").In(groupIds),
731+
goqu.V(isAllGroupsRequested),
732+
),
733+
),
734+
).
735+
InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("e.validator_index = v.validator_index"))).
736+
Where(goqu.L("e.validator_index IN (SELECT validator_index FROM validators)"))
737+
738+
if dashboardId.AggregateGroups {
739+
clDs = clDs.
740+
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId))
741+
} else {
742+
clDs = clDs.
743+
SelectAppend(goqu.L("v.group_id AS result_group_id")).
744+
GroupByAppend(goqu.L("result_group_id")).
745+
OrderAppend(goqu.L("result_group_id").Asc())
746+
}
757747
} else {
758-
rewardsDs = rewardsDs.
759-
SelectAppend(goqu.L("min(epoch_start)").As("epoch_start")).
760-
SelectAppend(goqu.L("max(epoch_end)").As("epoch_end"))
748+
// In case a list of validators is provided set the group to the default id
749+
clDs = clDs.
750+
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)).
751+
Where(goqu.L("e.validator_index IN ?", dashboardId.Validators))
761752
}
753+
return clDs
754+
}
762755

763-
// EL
756+
func buildRewardChartElDs(dashboardId t.VDBId, epochStarts, epochEnds []uint64) *goqu.SelectDataset {
764757
elDs := goqu.Dialect("postgres").
765758
Select(
766759
goqu.L("epoch_start"),
767760
// goqu.L("epoch_end"), not needed
768761
goqu.COALESCE(goqu.SUM(goqu.I("value")), 0).As("el_rewards")).
769762
From(goqu.L("users_val_dashboards_validators v")).
770-
LeftJoin(goqu.I("execution_rewards_finalized").As("b"), goqu.On(goqu.L("v.validator_index = b.proposer")))
771-
772-
// grouping, ordering
773-
rewardsDs = rewardsDs.
774-
GroupBy(timeColumn).
775-
Order(timeColumn.Asc())
776-
777-
elDs = elDs.
763+
LeftJoin(goqu.I("execution_rewards_finalized").As("b"), goqu.On(goqu.L("v.validator_index = b.proposer"))).
778764
GroupBy(goqu.L("epoch_start, epoch_end")).
779765
Order(goqu.L("epoch_start").Asc())
780766

781767
if dashboardId.Validators == nil {
782-
rewardsDs = rewardsDs.
783-
InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("e.validator_index = v.validator_index"))).
784-
Where(goqu.L("e.validator_index IN (SELECT validator_index FROM validators)"))
785768
elDs = elDs.
786769
Where(goqu.L("v.dashboard_id = ?", dashboardId.Id))
787770

788771
if dashboardId.AggregateGroups {
789-
rewardsDs = rewardsDs.
790-
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId))
791772
elDs = elDs.
792773
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId))
793774
} else {
794-
rewardsDs = rewardsDs.
795-
SelectAppend(goqu.L("v.group_id AS result_group_id")).
796-
GroupByAppend(goqu.L("result_group_id")).
797-
OrderAppend(goqu.L("result_group_id").Asc())
798775
elDs = elDs.
799776
SelectAppend(goqu.L("v.group_id AS result_group_id")).
800777
GroupByAppend(goqu.L("result_group_id")).
801778
OrderAppend(goqu.L("result_group_id").Asc())
802779
}
803780
} else {
804781
// In case a list of validators is provided set the group to the default id
805-
rewardsDs = rewardsDs.
806-
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)).
807-
Where(goqu.L("e.validator_index IN ?", dashboardId.Validators))
808782
elDs = elDs.
809783
SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)).
810784
Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators)))
811785
}
786+
elDs = elDs.
787+
With("epoch_ranges(epoch_start, epoch_end)", goqu.L("(SELECT * FROM unnest(?::int[], ?::int[]))", pq.Array(epochStarts), pq.Array(epochEnds))).
788+
InnerJoin(goqu.L("epoch_ranges"), goqu.On(goqu.L("b.epoch BETWEEN epoch_ranges.epoch_start AND epoch_ranges.epoch_end"))).
789+
Where(goqu.L("b.epoch BETWEEN ? AND ?", epochStarts[0], epochEnds[len(epochEnds)-1]))
790+
return elDs
791+
}
792+
793+
func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Context, dashboardId t.VDBId, groupIds []int64, protocolModes t.VDBProtocolModes, aggregation enums.ChartAggregation, afterTs uint64, beforeTs uint64) (*t.ChartData[int, decimal.Decimal], error) {
794+
// @DATA-ACCESS incorporate protocolModes
795+
// bar chart for the CL and EL rewards for each group for each epoch.
796+
// NO series for all groups combined except if AggregateGroups is true.
797+
// series id is group id, series property is 'cl' or 'el'
798+
ret := &t.ChartData[int, decimal.Decimal]{}
812799

800+
if len(groupIds) == 0 {
801+
return ret, nil
802+
}
803+
804+
requestedAllGroups := dashboardId.AggregateGroups || slices.Contains(groupIds, t.AllGroups)
813805
// ------------------------------------------------------------------------------------------------------------------
814-
// Build the main query and get the data
815-
queryResult := []struct {
806+
// CL
807+
808+
clDs := buildRewardChartClDs(dashboardId, groupIds, afterTs, beforeTs, aggregation, requestedAllGroups)
809+
type clResult struct {
816810
Timestamp time.Time `db:"timestamp"`
817811
EpochStart uint64 `db:"epoch_start"`
818812
EpochEnd uint64 `db:"epoch_end"`
819813
GroupId uint64 `db:"result_group_id"`
820814
ClRewards int64 `db:"cl_rewards"`
821-
}{}
822-
823-
query, args, err := rewardsDs.Prepared(true).ToSQL()
824-
if err != nil {
825-
return nil, fmt.Errorf("error preparing query: %w", err)
826815
}
827816

828-
err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...)
817+
clQueryResult, err := runQueryRows[[]clResult](ctx, d.clickhouseReader, clDs)
829818
if err != nil {
830-
return nil, fmt.Errorf("error retrieving rewards chart data: %w", err)
819+
return nil, fmt.Errorf("error retrieving rewards chart cl data: %w", err)
831820
}
832821

833-
if len(queryResult) == 0 {
822+
if len(clQueryResult) == 0 {
834823
return ret, nil
835824
}
836825

837-
// deduplicate epoch boundaries & make sure they are correct even with newly activated validators
826+
// deduplicate epoch boundaries & make sure they are correct even with newly activated / exited validators
827+
// as these might have different epoch start and end times
838828
type epochBoundaries struct {
839829
Start uint64
840830
End uint64
841831
}
842-
epochBoundariesMap := make(map[time.Time]epochBoundaries)
843-
for _, res := range queryResult {
844-
curBoundary := epochBoundariesMap[res.Timestamp]
845-
if epochBoundariesMap[res.Timestamp].Start == 0 || epochBoundariesMap[res.Timestamp].Start > res.EpochStart {
846-
curBoundary.Start = res.EpochStart
847-
}
848-
if epochBoundariesMap[res.Timestamp].End < res.EpochEnd {
849-
curBoundary.End = res.EpochEnd
832+
epochBoundariesMap := make(map[int64]epochBoundaries)
833+
for _, res := range clQueryResult {
834+
curBoundary, ok := epochBoundariesMap[res.Timestamp.Unix()]
835+
if !ok {
836+
curBoundary = epochBoundaries{
837+
Start: res.EpochStart,
838+
End: res.EpochEnd,
839+
}
850840
}
851-
epochBoundariesMap[res.Timestamp] = curBoundary
841+
curBoundary.Start = min(curBoundary.Start, res.EpochStart)
842+
curBoundary.End = max(curBoundary.End, res.EpochEnd)
843+
epochBoundariesMap[res.Timestamp.Unix()] = curBoundary
852844
}
853-
var epochStarts, epochEnds []uint64
854-
for _, v := range epochBoundariesMap {
855-
epochStarts = append(epochStarts, v.Start)
856-
epochEnds = append(epochEnds, v.End)
845+
epochStarts, epochEnds := make([]uint64, 0, len(epochBoundariesMap)), make([]uint64, 0, len(epochBoundariesMap))
846+
for _, v := range slices.Sorted(maps.Keys(epochBoundariesMap)) {
847+
epochStarts = append(epochStarts, epochBoundariesMap[v].Start)
848+
epochEnds = append(epochEnds, epochBoundariesMap[v].End)
857849
}
858-
slices.Sort(epochStarts)
859-
slices.Sort(epochEnds)
860-
elDs = elDs.
861-
With("epoch_ranges(epoch_start, epoch_end)", goqu.L("(SELECT * FROM unnest(?::int[], ?::int[]))", pq.Array(epochStarts), pq.Array(epochEnds))).
862-
InnerJoin(goqu.L("epoch_ranges"), goqu.On(goqu.L("b.epoch BETWEEN epoch_ranges.epoch_start AND epoch_ranges.epoch_end"))).
863-
Where(goqu.L("b.epoch BETWEEN ? AND ?", epochStarts[0], epochEnds[len(epochEnds)-1]))
864850

865851
// ------------------------------------------------------------------------------------------------------------------
866-
// Get the EL rewards
867-
elRewards := make(map[uint64]map[uint64]decimal.Decimal)
852+
// EL
868853

869-
elQueryResult := []struct {
854+
type elResult struct {
870855
EpochStart uint64 `db:"epoch_start"`
871856
GroupId uint64 `db:"result_group_id"`
872857
ElRewards decimal.Decimal `db:"el_rewards"`
873-
}{}
874-
875-
query, args, err = elDs.Prepared(true).ToSQL()
876-
if err != nil {
877-
return nil, fmt.Errorf("error preparing query: %w", err)
878-
}
879-
880-
err = d.readerDb.SelectContext(ctx, &elQueryResult, query, args...)
881-
if err != nil {
882-
return nil, fmt.Errorf("error retrieving el rewards data for rewards chart: %w", err)
883-
}
884-
885-
for _, entry := range elQueryResult {
886-
if _, ok := elRewards[entry.EpochStart]; !ok {
887-
elRewards[entry.EpochStart] = make(map[uint64]decimal.Decimal)
888-
}
889-
elRewards[entry.EpochStart][entry.GroupId] = entry.ElRewards
890858
}
891-
859+
elQueryResult, err := runQueryRows[[]elResult](ctx, d.readerDb, buildRewardChartElDs(dashboardId, epochStarts, epochEnds))
892860
if err != nil {
893-
return nil, fmt.Errorf("error retrieving validator dashboard rewards chart data: %w", err)
861+
return nil, fmt.Errorf("error retrieving rewards chart el data: %w", err)
894862
}
895863

896864
// ------------------------------------------------------------------------------------------------------------------
897865
// Create a map structure to store the data
898-
epochStartData := make(map[uint64]map[int]t.ClElValue[decimal.Decimal])
899-
epochStartList := make([]uint64, 0)
900-
groupMap := make(map[int]bool)
901866

902-
for _, res := range queryResult {
903-
if _, ok := epochStartData[res.EpochStart]; !ok {
904-
epochStartData[res.EpochStart] = make(map[int]t.ClElValue[decimal.Decimal])
905-
epochStartList = append(epochStartList, res.EpochStart)
906-
}
867+
type rewardsKey struct {
868+
epochStart uint64
869+
groupId uint64
870+
}
871+
rewardsMap := make(map[rewardsKey]t.ClElValue[decimal.Decimal])
872+
groupMap := make(map[uint64]struct{})
907873

908-
epochStartData[res.EpochStart][int(res.GroupId)] = t.ClElValue[decimal.Decimal]{
909-
El: elRewards[res.EpochStart][res.GroupId],
874+
for _, res := range clQueryResult {
875+
groupMap[res.GroupId] = struct{}{}
876+
epochStart := epochBoundariesMap[res.Timestamp.Unix()].Start // use epochStart from the boundaries map to maintain consistency with el data
877+
rewardsMap[rewardsKey{epochStart, res.GroupId}] = t.ClElValue[decimal.Decimal]{
910878
Cl: utils.GWeiToWei(big.NewInt(res.ClRewards)),
911879
}
912-
groupMap[int(res.GroupId)] = true
913880
}
914-
915-
// Get the list of groups
916-
groupList := slices.Collect(maps.Keys(groupMap))
917-
slices.Sort(groupList)
918-
919-
// Create the series structure
920-
propertyNames := []string{"el", "cl"}
921-
for _, groupId := range groupList {
922-
for _, propertyName := range propertyNames {
923-
ret.Series = append(ret.Series, t.ChartSeries[int, decimal.Decimal]{
924-
Id: groupId,
925-
Property: propertyName,
926-
})
927-
}
881+
for _, entry := range elQueryResult {
882+
rewards := rewardsMap[rewardsKey{entry.EpochStart, entry.GroupId}]
883+
rewards.El = entry.ElRewards
884+
rewardsMap[rewardsKey{entry.EpochStart, entry.GroupId}] = rewards
928885
}
929886

930-
// Fill the epoch data
931-
for _, epoch := range epochStartList {
887+
// create the chart data
888+
for _, epoch := range epochStarts {
932889
ret.Categories = append(ret.Categories, uint64(utils.EpochToTime(epoch).Unix()))
933-
for idx, series := range ret.Series {
934-
d := epochStartData[epoch][series.Id]
935-
if series.Property == "el" {
936-
ret.Series[idx].Data = append(ret.Series[idx].Data, &d.El)
937-
} else if series.Property == "cl" {
938-
ret.Series[idx].Data = append(ret.Series[idx].Data, &d.Cl)
939-
} else {
940-
return nil, fmt.Errorf("unknown series property: %s", series.Property)
941-
}
942-
}
890+
}
891+
for _, groupId := range slices.Sorted(maps.Keys(groupMap)) {
892+
clData, elData := make([]*decimal.Decimal, 0, len(epochStarts)), make([]*decimal.Decimal, 0, len(epochStarts))
893+
for _, epoch := range epochStarts {
894+
reward := rewardsMap[rewardsKey{epoch, groupId}]
895+
clData = append(clData, &reward.Cl)
896+
elData = append(elData, &reward.El)
897+
}
898+
ret.Series = append(ret.Series,
899+
t.ChartSeries[int, decimal.Decimal]{
900+
Id: int(groupId),
901+
Property: "cl",
902+
Data: clData,
903+
},
904+
t.ChartSeries[int, decimal.Decimal]{
905+
Id: int(groupId),
906+
Property: "el",
907+
Data: elData,
908+
},
909+
)
943910
}
944911

945912
return ret, nil

0 commit comments

Comments
 (0)