Skip to content

Refactor/split info delegators #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ Initially developed for Axone’s internal needs, it provides a versatile founda

## Features

- Export delegators and their delegations to a CSV file.
- Export chain store information.
- Export delegators and their delegations.

## Usage example

Expand Down
8 changes: 1 addition & 7 deletions cmd/delegators.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@ import (
"github.com/spf13/cobra"
)

const (
flagChainName = "chain-name"
)

var extractDelegatorsCmd = &cobra.Command{
Use: "delegators [source] [dest]",
Short: "Extract all delegators into CSV files",
Short: "Extract all delegators into a CSV file",
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
chainName, _ := cmd.Flags().GetString(flagChainName)
Expand All @@ -28,6 +24,4 @@ var extractDelegatorsCmd = &cobra.Command{

func init() {
extractCmd.AddCommand(extractDelegatorsCmd)

extractDelegatorsCmd.Flags().StringP(flagChainName, "n", "cosmos", "Name of the chain")
}
6 changes: 6 additions & 0 deletions cmd/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ import (
"github.com/spf13/cobra"
)

const (
flagChainName = "chain-name"
)

var extractCmd = &cobra.Command{
Use: "extract",
Short: "Extract data from a chain (snapshot)",
}

func init() {
rootCmd.AddCommand(extractCmd)

extractCmd.PersistentFlags().StringP(flagChainName, "n", "cosmos", "Name of the chain")
}
27 changes: 27 additions & 0 deletions cmd/infos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package cmd

import (
"github.com/axone-protocol/cosmos-extractor/pkg/infos"
"github.com/spf13/cobra"
)

var extractInfosCmd = &cobra.Command{
Use: "infos [source] [dest]",
Short: "Extract chain informations into a CSV file",
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
chainName, _ := cmd.Flags().GetString(flagChainName)
pipeline, err := infos.Pipeline(chainName, args[0], args[1], logger)
if err != nil {
return err
}

err = <-pipeline.Run()

return err
},
}

func init() {
extractCmd.AddCommand(extractInfosCmd)
}
6 changes: 0 additions & 6 deletions pkg/delegators/payload.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package delegators

type Chain struct {
Name string `json:"name"`
StoreVersion string `json:"store_version"`
StoreHash string `json:"store_hash"`
}

type Delegation struct {
ChainName string `json:"chain_name"`
DelegatorNativeAddr string `json:"delegator_native_addr"`
Expand Down
54 changes: 6 additions & 48 deletions pkg/delegators/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,27 @@ package delegators
import (
"path"

"github.com/axone-protocol/cosmos-extractor/pkg/processors"
"github.com/teambenny/goetl"

"cosmossdk.io/log"
)

const (
DelegatorsCSVFilename = "delegators.csv"
ChainsCSVFilename = "chains.csv"
)

type pipelines struct {
pipelines []goetl.PipelineIface
}

func Pipeline(chainName, src, dst string, logger log.Logger) (goetl.PipelineIface, error) {
readDelegators, err := NewDelegatorsReader(chainName, src, logger)
if err != nil {
return nil, err
}
writeDelegators, err := NewCSVWriter(path.Join(dst, DelegatorsCSVFilename))
if err != nil {
return nil, err
}

readChain, err := NewChainReader(chainName, src, logger)
read, err := NewDelegatorsReader(chainName, src, logger)
if err != nil {
return nil, err
}
writeChain, err := NewCSVWriter(path.Join(dst, ChainsCSVFilename))
write, err := processors.NewCSVWriter(path.Join(dst, DelegatorsCSVFilename))
if err != nil {
return nil, err
}

return &pipelines{
pipelines: []goetl.PipelineIface{
func() goetl.PipelineIface {
pipeline := goetl.NewPipeline(readChain, writeChain)
pipeline.Name = "Chain"
return pipeline
}(),
func() goetl.PipelineIface {
pipeline := goetl.NewPipeline(readDelegators, writeDelegators)
pipeline.Name = "Delegators"
return pipeline
}(),
},
}, nil
}

func (p *pipelines) Run() chan error {
errChan := make(chan error)

go func() {
defer close(errChan)

for _, pipeline := range p.pipelines {
c := pipeline.Run()
err := <-c
if err != nil {
errChan <- err
}
}
}()

return errChan
pipeline := goetl.NewPipeline(read, write)
pipeline.Name = "Delegators"
return pipeline, nil
}
55 changes: 0 additions & 55 deletions pkg/delegators/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,61 +120,6 @@ func (r *delegatorsReader) String() string {
return "DelegatorsReader"
}

type chainReader struct {
chainName string
src string
logger log.Logger
closer io.Closer
}

// NewChainReader returns a new Reader that reads metadata information about a blockchain data store.
func NewChainReader(chainName, src string, logger log.Logger) (goetl.Processor, error) {
return &chainReader{
chainName: chainName,
src: src,
logger: logger,
}, nil
}

func (r *chainReader) ProcessData(_ etldata.Payload, outputChan chan etldata.Payload, killChan chan error) {
keepers, err := keeper.OpenStore(r.src, r.logger)
if err != nil {
r.logger.Error(err.Error())
killChan <- err
return
}
r.closer = keepers

payload := Chain{
Name: r.chainName,
StoreVersion: fmt.Sprintf("%d", keepers.Store.LastCommitID().Version),
StoreHash: fmt.Sprintf("%X", keepers.Store.LastCommitID().Hash),
}

json, err := etldata.NewJSON(payload)
if err != nil {
r.logger.Error(err.Error())
killChan <- err
return
}

outputChan <- json
}

func (r *chainReader) Finish(_ chan etldata.Payload, killChan chan error) {
if r.closer != nil {
err := r.closer.Close()
if err != nil {
r.logger.Error(err.Error())
killChan <- err
}
}
}

func (r *chainReader) String() string {
return "ChainReader"
}

func convertAndEncodeMust(hrp string, bech string) string {
_, bytes, err := bech32.DecodeAndConvert(bech)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/infos/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package infos

type Info struct {
Name string `json:"name"`
StoreVersion string `json:"store_version"`
StoreHash string `json:"store_hash"`
}
29 changes: 29 additions & 0 deletions pkg/infos/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package infos

import (
"path"

"github.com/axone-protocol/cosmos-extractor/pkg/processors"
"github.com/teambenny/goetl"

"cosmossdk.io/log"
)

const (
InfosCSVFilename = "infos.csv"
)

func Pipeline(chainName, src, dst string, logger log.Logger) (goetl.PipelineIface, error) {
read, err := NewInfoReader(chainName, src, logger)
if err != nil {
return nil, err
}
write, err := processors.NewCSVWriter(path.Join(dst, InfosCSVFilename))
if err != nil {
return nil, err
}

pipeline := goetl.NewPipeline(read, write)
pipeline.Name = "Chain"
return pipeline, nil
}
67 changes: 67 additions & 0 deletions pkg/infos/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package infos

import (
"fmt"
"io"

"github.com/axone-protocol/cosmos-extractor/pkg/keeper"
"github.com/teambenny/goetl"
"github.com/teambenny/goetl/etldata"

"cosmossdk.io/log"
)

type infoReader struct {
chainName string
src string
logger log.Logger
closer io.Closer
}

// NewInfoReader returns a new Reader that reads metadata information about a blockchain data store.
func NewInfoReader(chainName, src string, logger log.Logger) (goetl.Processor, error) {
return &infoReader{
chainName: chainName,
src: src,
logger: logger,
}, nil
}

func (r *infoReader) ProcessData(_ etldata.Payload, outputChan chan etldata.Payload, killChan chan error) {
keepers, err := keeper.OpenStore(r.src, r.logger)
if err != nil {
r.logger.Error(err.Error())
killChan <- err
return
}
r.closer = keepers

payload := Info{
Name: r.chainName,
StoreVersion: fmt.Sprintf("%d", keepers.Store.LastCommitID().Version),
StoreHash: fmt.Sprintf("%X", keepers.Store.LastCommitID().Hash),
}

json, err := etldata.NewJSON(payload)
if err != nil {
r.logger.Error(err.Error())
killChan <- err
return
}

outputChan <- json
}

func (r *infoReader) Finish(_ chan etldata.Payload, killChan chan error) {
if r.closer != nil {
err := r.closer.Close()
if err != nil {
r.logger.Error(err.Error())
killChan <- err
}
}
}

func (r *infoReader) String() string {
return "InfoReader"
}
2 changes: 1 addition & 1 deletion pkg/delegators/writer.go → pkg/processors/csv_writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package delegators
package processors

import (
"bufio"
Expand Down