Skip to content

perf(delegators): significantly improve extraction performance #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 22 additions & 84 deletions pkg/delegators/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,18 @@ package delegators

import (
"context"
"errors"
"fmt"
"io"
"strings"

"github.com/axone-protocol/cosmos-extractor/pkg/keeper"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
"github.com/samber/lo"
"github.com/teambenny/goetl"
"github.com/teambenny/goetl/etldata"

"cosmossdk.io/collections"
"cosmossdk.io/log"
"cosmossdk.io/math"

sdk "github.com/cosmos/cosmos-sdk/types"
bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper"
stakingkeeper "github.com/cosmos/cosmos-sdk/x/staking/keeper"
stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
)
Expand Down Expand Up @@ -90,51 +85,27 @@ func (r *delegatorsReader) ProcessData(_ etldata.Payload, outputChan chan etldat

ctx := sdk.NewContext(keepers.Store, cmtproto.Header{}, false, keepers.Logger)

validators, err := keepers.Staking.GetAllValidators(ctx)
if err != nil {
r.logger.Error(err.Error())
killChan <- err
return
}

prefix, err := guessPrefixFromValoper(validators[0].OperatorAddress)
if err != nil {
r.logger.Error(err.Error())
killChan <- err
return
}

configureSdk(prefix)

err = iterateAllAddresses(ctx, keepers.Bank, func(addr sdk.AccAddress) (stop bool) {
delegations := lo.RejectMap(validators,
extractDelegations(ctx, addr, r.logger, keepers.Staking, killChan))
err = keepers.Account.Accounts.Walk(ctx, nil, func(addr sdk.AccAddress, _ sdk.AccountI) (stop bool, err error) {
delegations := lo.FlatMap([]sdk.AccAddress{addr},
extractDelegations(ctx, r.logger, keepers.Staking, killChan),
)
shares := lo.Reduce(delegations, computeShares(), math.LegacyZeroDec())

if (!r.maxSharesFilter.IsNil() && shares.GT(r.maxSharesFilter)) ||
(!r.minSharesFilter.IsNil() && shares.LT(r.minSharesFilter)) {
return false
return false, nil
}

for _, delegation := range delegations {
payload := Delegation{
ChainName: r.chainName,
DelegatorNativeAddr: delegation.DelegatorAddress,
ValidatorAddr: delegation.ValidatorAddress,
Shares: delegation.Shares.String(),
}

json, err := etldata.NewJSON(payload)
json, err := delegatorToPayload(r, delegation)
if err != nil {
r.logger.Error(err.Error())
killChan <- err
return true
return true, err
}

outputChan <- json
}

return false
return false, nil
})
if err != nil {
r.logger.Error(err.Error())
Expand All @@ -156,45 +127,17 @@ func (r *delegatorsReader) String() string {
return "DelegatorsReader"
}

// IterateAllAddresses iterates over all the accounts that are provided to a callback.
// If true is returned from the callback, iteration is halted.
func iterateAllAddresses(ctx context.Context, bankKeeper bankkeeper.BaseKeeper, cb func(sdk.AccAddress) bool) error {
lastSeenAddr := ""
err := bankKeeper.Balances.Walk(ctx, nil, func(key collections.Pair[sdk.AccAddress, string], _ math.Int) (stop bool, err error) {
addr := key.K1()
if addr.String() == lastSeenAddr {
return false, nil
}
lastSeenAddr = addr.String()

return cb(addr), nil
})

return err
}

func extractDelegations(
ctx context.Context, address sdk.AccAddress, logger log.Logger, stakingKeeper *stakingkeeper.Keeper, killChan chan error,
) func(item stakingtypes.Validator, index int) (stakingtypes.Delegation, bool) {
return func(item stakingtypes.Validator, _ int) (stakingtypes.Delegation, bool) {
valAddr, err := sdk.ValAddressFromBech32(item.OperatorAddress)
ctx context.Context, logger log.Logger, stakingKeeper *stakingkeeper.Keeper, killChan chan error,
) func(delegator sdk.AccAddress, _ int) []stakingtypes.Delegation {
return func(delegator sdk.AccAddress, _ int) []stakingtypes.Delegation {
delegation, err := stakingKeeper.GetAllDelegatorDelegations(ctx, delegator)
if err != nil {
logger.Error(err.Error())
killChan <- err
return stakingtypes.Delegation{}, true
return nil
}

delegation, err := stakingKeeper.GetDelegation(ctx, address, valAddr)
if err != nil {
if errors.Is(err, stakingtypes.ErrNoDelegation) {
return stakingtypes.Delegation{}, true
}

logger.Error(err.Error())
killChan <- err
return stakingtypes.Delegation{}, true
}
return delegation, false
return delegation
}
}

Expand All @@ -204,19 +147,14 @@ func computeShares() func(acc math.LegacyDec, delegation stakingtypes.Delegation
}
}

func guessPrefixFromValoper(valoper string) (string, error) {
if idx := strings.Index(valoper, "valoper"); idx != -1 {
return valoper[:idx], nil
func delegatorToPayload(r *delegatorsReader, delegation stakingtypes.Delegation) (etldata.JSON, error) {
payload := Delegation{
ChainName: r.chainName,
DelegatorNativeAddr: delegation.DelegatorAddress,
ValidatorAddr: delegation.ValidatorAddress,
Shares: delegation.Shares.String(),
}
return "", fmt.Errorf("valoper not found in operator address: %s", valoper)
}

func configureSdk(prefix string) {
config := sdk.GetConfig()
if config.GetBech32AccountAddrPrefix() != prefix {
config.SetBech32PrefixForValidator(
fmt.Sprintf("%svaloper", prefix),
fmt.Sprintf("%svaloperpub", prefix),
)
}
json, err := etldata.NewJSON(payload)
return json, err
}
4 changes: 3 additions & 1 deletion pkg/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
authcodec "github.com/cosmos/cosmos-sdk/x/auth/codec"
authkeeper "github.com/cosmos/cosmos-sdk/x/auth/keeper"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
vestingtypes "github.com/cosmos/cosmos-sdk/x/auth/vesting/types"
bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
govtypes "github.com/cosmos/cosmos-sdk/x/gov/types"
Expand Down Expand Up @@ -112,7 +113,8 @@ func newCodec() (*codec.ProtoCodec, error) {
return nil, err
}
std.RegisterInterfaces(interfaceRegistry)
interfaceRegistry.RegisterInterface("/cosmos.auth.v1beta1.BaseAccount", (*sdk.AccountI)(nil))
authtypes.RegisterInterfaces(interfaceRegistry)
vestingtypes.RegisterInterfaces(interfaceRegistry)

return codec.NewProtoCodec(interfaceRegistry), nil
}
Expand Down