diff --git a/README.md b/README.md index 49eed2b..e3ceb41 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,8 @@ Initially developed for Axone’s internal needs, it provides a versatile founda ## Features -- Export delegators and their delegations to a CSV file. +- Export chain store information. +- Export delegators and their delegations. ## Usage example diff --git a/cmd/delegators.go b/cmd/delegators.go index 2f03a9a..82ce071 100644 --- a/cmd/delegators.go +++ b/cmd/delegators.go @@ -5,13 +5,9 @@ import ( "github.com/spf13/cobra" ) -const ( - flagChainName = "chain-name" -) - var extractDelegatorsCmd = &cobra.Command{ Use: "delegators [source] [dest]", - Short: "Extract all delegators into CSV files", + Short: "Extract all delegators into a CSV file", Args: cobra.ExactArgs(2), RunE: func(cmd *cobra.Command, args []string) error { chainName, _ := cmd.Flags().GetString(flagChainName) @@ -28,6 +24,4 @@ var extractDelegatorsCmd = &cobra.Command{ func init() { extractCmd.AddCommand(extractDelegatorsCmd) - - extractDelegatorsCmd.Flags().StringP(flagChainName, "n", "cosmos", "Name of the chain") } diff --git a/cmd/extract.go b/cmd/extract.go index e8f1695..e86629c 100644 --- a/cmd/extract.go +++ b/cmd/extract.go @@ -4,6 +4,10 @@ import ( "github.com/spf13/cobra" ) +const ( + flagChainName = "chain-name" +) + var extractCmd = &cobra.Command{ Use: "extract", Short: "Extract data from a chain (snapshot)", @@ -11,4 +15,6 @@ var extractCmd = &cobra.Command{ func init() { rootCmd.AddCommand(extractCmd) + + extractCmd.PersistentFlags().StringP(flagChainName, "n", "cosmos", "Name of the chain") } diff --git a/cmd/infos.go b/cmd/infos.go new file mode 100644 index 0000000..c4ffc1c --- /dev/null +++ b/cmd/infos.go @@ -0,0 +1,27 @@ +package cmd + +import ( + "github.com/axone-protocol/cosmos-extractor/pkg/infos" + "github.com/spf13/cobra" +) + +var extractInfosCmd = &cobra.Command{ + Use: "infos [source] [dest]", + Short: "Extract chain informations into a CSV file", + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + chainName, _ := cmd.Flags().GetString(flagChainName) + pipeline, err := infos.Pipeline(chainName, args[0], args[1], logger) + if err != nil { + return err + } + + err = <-pipeline.Run() + + return err + }, +} + +func init() { + extractCmd.AddCommand(extractInfosCmd) +} diff --git a/pkg/delegators/payload.go b/pkg/delegators/payload.go index be54d47..ae9f3ec 100644 --- a/pkg/delegators/payload.go +++ b/pkg/delegators/payload.go @@ -1,11 +1,5 @@ package delegators -type Chain struct { - Name string `json:"name"` - StoreVersion string `json:"store_version"` - StoreHash string `json:"store_hash"` -} - type Delegation struct { ChainName string `json:"chain_name"` DelegatorNativeAddr string `json:"delegator_native_addr"` diff --git a/pkg/delegators/pipeline.go b/pkg/delegators/pipeline.go index 7471cd3..21bbbd5 100644 --- a/pkg/delegators/pipeline.go +++ b/pkg/delegators/pipeline.go @@ -3,6 +3,7 @@ package delegators import ( "path" + "github.com/axone-protocol/cosmos-extractor/pkg/processors" "github.com/teambenny/goetl" "cosmossdk.io/log" @@ -10,62 +11,19 @@ import ( const ( DelegatorsCSVFilename = "delegators.csv" - ChainsCSVFilename = "chains.csv" ) -type pipelines struct { - pipelines []goetl.PipelineIface -} - func Pipeline(chainName, src, dst string, logger log.Logger) (goetl.PipelineIface, error) { - readDelegators, err := NewDelegatorsReader(chainName, src, logger) - if err != nil { - return nil, err - } - writeDelegators, err := NewCSVWriter(path.Join(dst, DelegatorsCSVFilename)) - if err != nil { - return nil, err - } - - readChain, err := NewChainReader(chainName, src, logger) + read, err := NewDelegatorsReader(chainName, src, logger) if err != nil { return nil, err } - writeChain, err := NewCSVWriter(path.Join(dst, ChainsCSVFilename)) + write, err := processors.NewCSVWriter(path.Join(dst, DelegatorsCSVFilename)) if err != nil { return nil, err } - return &pipelines{ - pipelines: []goetl.PipelineIface{ - func() goetl.PipelineIface { - pipeline := goetl.NewPipeline(readChain, writeChain) - pipeline.Name = "Chain" - return pipeline - }(), - func() goetl.PipelineIface { - pipeline := goetl.NewPipeline(readDelegators, writeDelegators) - pipeline.Name = "Delegators" - return pipeline - }(), - }, - }, nil -} - -func (p *pipelines) Run() chan error { - errChan := make(chan error) - - go func() { - defer close(errChan) - - for _, pipeline := range p.pipelines { - c := pipeline.Run() - err := <-c - if err != nil { - errChan <- err - } - } - }() - - return errChan + pipeline := goetl.NewPipeline(read, write) + pipeline.Name = "Delegators" + return pipeline, nil } diff --git a/pkg/delegators/reader.go b/pkg/delegators/reader.go index c9eaa1c..396b832 100644 --- a/pkg/delegators/reader.go +++ b/pkg/delegators/reader.go @@ -120,61 +120,6 @@ func (r *delegatorsReader) String() string { return "DelegatorsReader" } -type chainReader struct { - chainName string - src string - logger log.Logger - closer io.Closer -} - -// NewChainReader returns a new Reader that reads metadata information about a blockchain data store. -func NewChainReader(chainName, src string, logger log.Logger) (goetl.Processor, error) { - return &chainReader{ - chainName: chainName, - src: src, - logger: logger, - }, nil -} - -func (r *chainReader) ProcessData(_ etldata.Payload, outputChan chan etldata.Payload, killChan chan error) { - keepers, err := keeper.OpenStore(r.src, r.logger) - if err != nil { - r.logger.Error(err.Error()) - killChan <- err - return - } - r.closer = keepers - - payload := Chain{ - Name: r.chainName, - StoreVersion: fmt.Sprintf("%d", keepers.Store.LastCommitID().Version), - StoreHash: fmt.Sprintf("%X", keepers.Store.LastCommitID().Hash), - } - - json, err := etldata.NewJSON(payload) - if err != nil { - r.logger.Error(err.Error()) - killChan <- err - return - } - - outputChan <- json -} - -func (r *chainReader) Finish(_ chan etldata.Payload, killChan chan error) { - if r.closer != nil { - err := r.closer.Close() - if err != nil { - r.logger.Error(err.Error()) - killChan <- err - } - } -} - -func (r *chainReader) String() string { - return "ChainReader" -} - func convertAndEncodeMust(hrp string, bech string) string { _, bytes, err := bech32.DecodeAndConvert(bech) if err != nil { diff --git a/pkg/infos/payload.go b/pkg/infos/payload.go new file mode 100644 index 0000000..6f6568e --- /dev/null +++ b/pkg/infos/payload.go @@ -0,0 +1,7 @@ +package infos + +type Info struct { + Name string `json:"name"` + StoreVersion string `json:"store_version"` + StoreHash string `json:"store_hash"` +} diff --git a/pkg/infos/pipeline.go b/pkg/infos/pipeline.go new file mode 100644 index 0000000..82ffeb9 --- /dev/null +++ b/pkg/infos/pipeline.go @@ -0,0 +1,29 @@ +package infos + +import ( + "path" + + "github.com/axone-protocol/cosmos-extractor/pkg/processors" + "github.com/teambenny/goetl" + + "cosmossdk.io/log" +) + +const ( + InfosCSVFilename = "infos.csv" +) + +func Pipeline(chainName, src, dst string, logger log.Logger) (goetl.PipelineIface, error) { + read, err := NewInfoReader(chainName, src, logger) + if err != nil { + return nil, err + } + write, err := processors.NewCSVWriter(path.Join(dst, InfosCSVFilename)) + if err != nil { + return nil, err + } + + pipeline := goetl.NewPipeline(read, write) + pipeline.Name = "Chain" + return pipeline, nil +} diff --git a/pkg/infos/reader.go b/pkg/infos/reader.go new file mode 100644 index 0000000..c313c2c --- /dev/null +++ b/pkg/infos/reader.go @@ -0,0 +1,67 @@ +package infos + +import ( + "fmt" + "io" + + "github.com/axone-protocol/cosmos-extractor/pkg/keeper" + "github.com/teambenny/goetl" + "github.com/teambenny/goetl/etldata" + + "cosmossdk.io/log" +) + +type infoReader struct { + chainName string + src string + logger log.Logger + closer io.Closer +} + +// NewInfoReader returns a new Reader that reads metadata information about a blockchain data store. +func NewInfoReader(chainName, src string, logger log.Logger) (goetl.Processor, error) { + return &infoReader{ + chainName: chainName, + src: src, + logger: logger, + }, nil +} + +func (r *infoReader) ProcessData(_ etldata.Payload, outputChan chan etldata.Payload, killChan chan error) { + keepers, err := keeper.OpenStore(r.src, r.logger) + if err != nil { + r.logger.Error(err.Error()) + killChan <- err + return + } + r.closer = keepers + + payload := Info{ + Name: r.chainName, + StoreVersion: fmt.Sprintf("%d", keepers.Store.LastCommitID().Version), + StoreHash: fmt.Sprintf("%X", keepers.Store.LastCommitID().Hash), + } + + json, err := etldata.NewJSON(payload) + if err != nil { + r.logger.Error(err.Error()) + killChan <- err + return + } + + outputChan <- json +} + +func (r *infoReader) Finish(_ chan etldata.Payload, killChan chan error) { + if r.closer != nil { + err := r.closer.Close() + if err != nil { + r.logger.Error(err.Error()) + killChan <- err + } + } +} + +func (r *infoReader) String() string { + return "InfoReader" +} diff --git a/pkg/delegators/writer.go b/pkg/processors/csv_writer.go similarity index 98% rename from pkg/delegators/writer.go rename to pkg/processors/csv_writer.go index 60bceae..8b033a5 100644 --- a/pkg/delegators/writer.go +++ b/pkg/processors/csv_writer.go @@ -1,4 +1,4 @@ -package delegators +package processors import ( "bufio"