diff --git a/cmd/delegators.go b/cmd/delegators.go index 54c1a5c..6b47f81 100644 --- a/cmd/delegators.go +++ b/cmd/delegators.go @@ -4,10 +4,14 @@ import ( "github.com/axone-protocol/cosmos-extractor/pkg/delegators" "github.com/spf13/cobra" "github.com/teambenny/goetl" + + "cosmossdk.io/math" ) const ( - flagHrp = "hrp" + flagHrp = "hrp" + flagMinShares = "min-shares" + flagMaxShares = "max-shares" ) var extractDelegatorsCmd = &cobra.Command{ @@ -15,15 +19,12 @@ var extractDelegatorsCmd = &cobra.Command{ Short: "Extract all delegators", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - chainName, _ := cmd.Flags().GetString(flagChainName) - src := args[0] - - processors := []goetl.Processor{} - - read, err := delegators.NewDelegatorsReader(chainName, src, logger) + read, err := newDelegatorsReader(cmd, args) if err != nil { return err } + + processors := []goetl.Processor{} processors = append(processors, read) hrps, err := cmd.Flags().GetStringSlice(flagHrp) @@ -52,6 +53,46 @@ var extractDelegatorsCmd = &cobra.Command{ }, } +func newDelegatorsReader(cmd *cobra.Command, args []string) (goetl.Processor, error) { + chainName, _ := cmd.Flags().GetString(flagChainName) + src := args[0] + + delegatorsReaderOpts := []delegators.ReaderOption{ + delegators.WithChainName(chainName), + delegators.WithLogger(logger), + } + + v, err := getShares(cmd, flagMinShares) + if err != nil { + return nil, err + } + if !v.IsNil() { + delegatorsReaderOpts = append(delegatorsReaderOpts, delegators.WithMinSharesFilter(v)) + } + + v, err = getShares(cmd, flagMaxShares) + if err != nil { + return nil, err + } + + if !v.IsNil() { + delegatorsReaderOpts = append(delegatorsReaderOpts, delegators.WithMaxSharesFilter(v)) + } + + return delegators.NewDelegatorsReader(src, delegatorsReaderOpts...) +} + +func getShares(cmd *cobra.Command, flag string) (math.LegacyDec, error) { + shares, err := cmd.Flags().GetString(flag) + if err != nil { + return math.LegacyDec{}, err + } + if shares == "" { + return math.LegacyDec{}, nil + } + return math.LegacyNewDecFromStr(shares) +} + func init() { extractCmd.AddCommand(extractDelegatorsCmd) @@ -59,6 +100,9 @@ func init() { flagHrp, "p", []string{}, - "One or more Human-Readable Parts (HRPs) to append delegator addresses in the given Bech32 formats (e.g., cosmos, osmo). "+ + "one or more Human-Readable Parts (HRPs) to append delegator addresses in the given Bech32 formats (e.g., cosmos, osmo). "+ "Can be used multiple times for different HRPs.") + + extractDelegatorsCmd.Flags().String(flagMinShares, "", "filter delegators with minimum shares") + extractDelegatorsCmd.Flags().String(flagMaxShares, "", "filter delegators with maximum shares") } diff --git a/cmd/extract.go b/cmd/extract.go index bd1e33c..34d248d 100644 --- a/cmd/extract.go +++ b/cmd/extract.go @@ -21,8 +21,8 @@ var extractCmd = &cobra.Command{ func init() { rootCmd.AddCommand(extractCmd) - extractCmd.PersistentFlags().StringP(flagChainName, "n", "cosmos", "Name of the chain") - extractCmd.PersistentFlags().StringP(flagOutput, "o", "", "Output file (defaults to stdout)") + extractCmd.PersistentFlags().StringP(flagChainName, "n", "cosmos", "name of the chain") + extractCmd.PersistentFlags().StringP(flagOutput, "o", "", "output file (defaults to stdout)") } func newCSVWriter(cmd *cobra.Command, _ []string) (goetl.Processor, error) { diff --git a/cmd/root.go b/cmd/root.go index 4f2ecd9..8caadf6 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -42,5 +42,5 @@ func Execute() { } func init() { - rootCmd.PersistentFlags().String(flagLogLevel, "info", "The logging level (trace|debug|info|warn|error|fatal|panic)") + rootCmd.PersistentFlags().String(flagLogLevel, "warn", "logging level (trace|debug|info|warn|error|fatal|panic)") } diff --git a/cmd/version.go b/cmd/version.go index 94d0f4a..1455438 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -52,6 +52,6 @@ var versionCmd = &cobra.Command{ func init() { rootCmd.AddCommand(versionCmd) - versionCmd.Flags().Bool(flagLong, false, "Print long version information") - versionCmd.Flags().StringP(flagFormat, "f", "text", "Output format (text|json)") + versionCmd.Flags().Bool(flagLong, false, "print long version information") + versionCmd.Flags().StringP(flagFormat, "f", "text", "output format (text|json)") } diff --git a/go.mod b/go.mod index 3f028ac..03e7b24 100644 --- a/go.mod +++ b/go.mod @@ -146,6 +146,7 @@ require ( github.com/rs/zerolog v1.33.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/samber/lo v1.47.0 // indirect github.com/sasha-s/go-deadlock v0.3.5 // indirect github.com/satori/go.uuid v1.2.0 // indirect github.com/smarty/assertions v1.15.0 // indirect diff --git a/go.sum b/go.sum index 1391731..2841592 100644 --- a/go.sum +++ b/go.sum @@ -798,6 +798,8 @@ github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6ke github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb2NsU= github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U= diff --git a/pkg/delegators/enhancer.go b/pkg/delegators/enhancer.go index 30b84be..f5209d8 100644 --- a/pkg/delegators/enhancer.go +++ b/pkg/delegators/enhancer.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/samber/lo" "github.com/teambenny/goetl" "github.com/teambenny/goetl/etldata" @@ -21,16 +22,13 @@ type addressEnhancer struct { // NewAddressEnhancer returns a new processor that enrich the data with addresses with the given prefixes. func NewAddressEnhancer(prefixes []string, logger log.Logger) (goetl.Processor, error) { - keys := make([]string, len(prefixes)) - for i, prefix := range prefixes { - keys[i] = fmt.Sprintf("delegator-%s-address", prefix) - } - return &addressEnhancer{ prefixes: prefixes, logger: logger, - keys: keys, - name: fmt.Sprintf("AddressEnhancer(%s)", strings.Join(prefixes, ",")), + keys: lo.Map(prefixes, func(prefix string, _ int) string { + return fmt.Sprintf("delegator_%s_address", prefix) + }), + name: fmt.Sprintf("AddressEnhancer(%s)", strings.Join(prefixes, ",")), }, nil } diff --git a/pkg/delegators/reader.go b/pkg/delegators/reader.go index bd6db67..8871942 100644 --- a/pkg/delegators/reader.go +++ b/pkg/delegators/reader.go @@ -9,9 +9,9 @@ import ( "github.com/axone-protocol/cosmos-extractor/pkg/keeper" cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" + "github.com/samber/lo" "github.com/teambenny/goetl" "github.com/teambenny/goetl/etldata" - "github.com/teambenny/goetl/etlutil" "cosmossdk.io/collections" "cosmossdk.io/log" @@ -19,23 +19,64 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper" + stakingkeeper "github.com/cosmos/cosmos-sdk/x/staking/keeper" stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" ) +type ReaderOption func(*delegatorsReader) error + +func WithChainName(chainName string) ReaderOption { + return func(r *delegatorsReader) error { + r.chainName = chainName + return nil + } +} + +func WithLogger(logger log.Logger) ReaderOption { + return func(r *delegatorsReader) error { + r.logger = logger + return nil + } +} + +func WithMinSharesFilter(minShares math.LegacyDec) ReaderOption { + return func(r *delegatorsReader) error { + r.minSharesFilter = minShares + return nil + } +} + +func WithMaxSharesFilter(maxShares math.LegacyDec) ReaderOption { + return func(r *delegatorsReader) error { + r.maxSharesFilter = maxShares + return nil + } +} + type delegatorsReader struct { - chainName string - src string - logger log.Logger - closer io.Closer + chainName string + src string + logger log.Logger + closer io.Closer + minSharesFilter math.LegacyDec + maxSharesFilter math.LegacyDec } // NewDelegatorsReader returns a new Reader that reads delegators from a blockchain data stores. -func NewDelegatorsReader(chainName, src string, logger log.Logger) (goetl.Processor, error) { - return &delegatorsReader{ - chainName: chainName, +func NewDelegatorsReader(src string, options ...ReaderOption) (goetl.Processor, error) { + r := &delegatorsReader{ + chainName: "mystery", src: src, - logger: logger, - }, nil + logger: log.NewNopLogger(), + } + + for _, option := range options { + if err := option(r); err != nil { + return nil, err + } + } + + return r, nil } func (r *delegatorsReader) ProcessData(_ etldata.Payload, outputChan chan etldata.Payload, killChan chan error) { @@ -65,22 +106,17 @@ func (r *delegatorsReader) ProcessData(_ etldata.Payload, outputChan chan etldat configureSdk(prefix) - err = IterateAllAddresses(ctx, keepers.Bank, func(addr sdk.AccAddress) (stop bool) { - for _, val := range validators { - valAddr, err := sdk.ValAddressFromBech32(val.OperatorAddress) - etlutil.KillPipelineIfErr(err, killChan) - - delegation, err := keepers.Staking.GetDelegation(ctx, addr, valAddr) - if err != nil { - if errors.Is(err, stakingtypes.ErrNoDelegation) { - continue - } + err = iterateAllAddresses(ctx, keepers.Bank, func(addr sdk.AccAddress) (stop bool) { + delegations := lo.RejectMap(validators, + extractDelegations(ctx, addr, r.logger, keepers.Staking, killChan)) + shares := lo.Reduce(delegations, computeShares(), math.LegacyZeroDec()) - r.logger.Error(err.Error()) - killChan <- err - return true - } + if (!r.maxSharesFilter.IsNil() && shares.GT(r.maxSharesFilter)) || + (!r.minSharesFilter.IsNil() && shares.LT(r.minSharesFilter)) { + return false + } + for _, delegation := range delegations { payload := Delegation{ ChainName: r.chainName, DelegatorNativeAddr: delegation.DelegatorAddress, @@ -122,7 +158,7 @@ func (r *delegatorsReader) String() string { // IterateAllAddresses iterates over all the accounts that are provided to a callback. // If true is returned from the callback, iteration is halted. -func IterateAllAddresses(ctx context.Context, bankKeeper bankkeeper.BaseKeeper, cb func(sdk.AccAddress) bool) error { +func iterateAllAddresses(ctx context.Context, bankKeeper bankkeeper.BaseKeeper, cb func(sdk.AccAddress) bool) error { lastSeenAddr := "" err := bankKeeper.Balances.Walk(ctx, nil, func(key collections.Pair[sdk.AccAddress, string], _ math.Int) (stop bool, err error) { addr := key.K1() @@ -137,6 +173,37 @@ func IterateAllAddresses(ctx context.Context, bankKeeper bankkeeper.BaseKeeper, return err } +func extractDelegations( + ctx context.Context, address sdk.AccAddress, logger log.Logger, stakingKeeper *stakingkeeper.Keeper, killChan chan error, +) func(item stakingtypes.Validator, index int) (stakingtypes.Delegation, bool) { + return func(item stakingtypes.Validator, _ int) (stakingtypes.Delegation, bool) { + valAddr, err := sdk.ValAddressFromBech32(item.OperatorAddress) + if err != nil { + logger.Error(err.Error()) + killChan <- err + return stakingtypes.Delegation{}, true + } + + delegation, err := stakingKeeper.GetDelegation(ctx, address, valAddr) + if err != nil { + if errors.Is(err, stakingtypes.ErrNoDelegation) { + return stakingtypes.Delegation{}, true + } + + logger.Error(err.Error()) + killChan <- err + return stakingtypes.Delegation{}, true + } + return delegation, false + } +} + +func computeShares() func(acc math.LegacyDec, delegation stakingtypes.Delegation, _ int) math.LegacyDec { + return func(acc math.LegacyDec, delegation stakingtypes.Delegation, _ int) math.LegacyDec { + return acc.Add(delegation.Shares) + } +} + func guessPrefixFromValoper(valoper string) (string, error) { if idx := strings.Index(valoper, "valoper"); idx != -1 { return valoper[:idx], nil