Skip to content

[staking] build staking view from blockdao if indexer height behind #4658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions action/protocol/staking/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,25 @@ type (
ReceiptStatus() uint64
}

ContractStakeViewBuilder interface {
Build(ctx context.Context, target uint64) (ContractStakeView, error)
}

// Protocol defines the protocol of handling staking
Protocol struct {
addr address.Address
config Configuration
candBucketsIndexer *CandidatesBucketsIndexer
contractStakingIndexer ContractStakingIndexerWithBucketType
contractStakingIndexerV2 ContractStakingIndexer
contractStakingIndexerV3 ContractStakingIndexer
voteReviser *VoteReviser
patch *PatchStore
helperCtx HelperCtx
addr address.Address
config Configuration
candBucketsIndexer *CandidatesBucketsIndexer
contractStakingIndexer ContractStakingIndexerWithBucketType
contractStakingIndexerV2 ContractStakingIndexer
contractStakingIndexerV3 ContractStakingIndexer
voteReviser *VoteReviser
patch *PatchStore
helperCtx HelperCtx
contractStakingViewBuilder ContractStakeViewBuilder
contractStakingViewV2Builder ContractStakeViewBuilder
contractStakingViewV3Builder ContractStakeViewBuilder
blockStore BlockStore
}

// Configuration is the staking protocol configuration.
Expand Down Expand Up @@ -118,6 +126,12 @@ func WithContractStakingIndexerV3(indexer ContractStakingIndexer) Option {
}
}

func WithBlockStore(bs BlockStore) Option {
return func(p *Protocol) {
p.blockStore = bs
}
}

// FindProtocol return a registered protocol from registry
func FindProtocol(registry *protocol.Registry) *Protocol {
if registry == nil {
Expand Down Expand Up @@ -196,6 +210,15 @@ func NewProtocol(
for _, opt := range opts {
opt(p)
}
if p.contractStakingIndexer != nil {
p.contractStakingViewBuilder = NewContractStakeViewBuilder(p.contractStakingIndexer, p.blockStore)
}
if p.contractStakingIndexerV2 != nil {
p.contractStakingViewV2Builder = NewContractStakeViewBuilder(p.contractStakingIndexerV2, p.blockStore)
}
if p.contractStakingIndexerV3 != nil {
p.contractStakingViewV3Builder = NewContractStakeViewBuilder(p.contractStakingIndexerV3, p.blockStore)
}
return p, nil
}

Expand Down Expand Up @@ -232,22 +255,22 @@ func (p *Protocol) Start(ctx context.Context, sr protocol.StateReader) (protocol
}

c.contractsStake = &contractStakeView{}
if p.contractStakingIndexer != nil {
view, err := p.contractStakingIndexer.StartView(ctx)
if p.contractStakingViewBuilder != nil {
view, err := p.contractStakingViewBuilder.Build(ctx, height)
if err != nil {
return nil, errors.Wrap(err, "failed to start contract staking indexer")
}
c.contractsStake.v1 = view
}
if p.contractStakingIndexerV2 != nil {
view, err := p.contractStakingIndexerV2.StartView(ctx)
if p.contractStakingViewV2Builder != nil {
view, err := p.contractStakingViewV2Builder.Build(ctx, height)
if err != nil {
return nil, errors.Wrap(err, "failed to start contract staking indexer v2")
}
c.contractsStake.v2 = view
}
if p.contractStakingIndexerV3 != nil {
view, err := p.contractStakingIndexerV3.StartView(ctx)
if p.contractStakingViewV3Builder != nil {
view, err := p.contractStakingViewV3Builder.Build(ctx, height)
if err != nil {
return nil, errors.Wrap(err, "failed to start contract staking indexer v3")
}
Expand Down
1 change: 1 addition & 0 deletions action/protocol/staking/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func TestProtocol_ActiveCandidates(t *testing.T) {
return blkHeight, nil
}).AnyTimes()
csIndexer.EXPECT().StartView(gomock.Any()).Return(nil, nil)
csIndexer.EXPECT().Height().Return(uint64(blkHeight), nil).AnyTimes()

v, err := p.Start(ctx, sm)
require.NoError(err)
Expand Down
68 changes: 68 additions & 0 deletions action/protocol/staking/stakeview_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package staking

import (
"context"

"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/blockchain/block"
)

type (
BlockStore interface {
GetBlockByHeight(uint64) (*block.Block, error)
GetReceipts(uint64) ([]*action.Receipt, error)
}

contractStakeViewBuilder struct {
indexer ContractStakingIndexer
blockdao BlockStore
}
)

func NewContractStakeViewBuilder(
indexer ContractStakingIndexer,
blockdao BlockStore,
) *contractStakeViewBuilder {
return &contractStakeViewBuilder{
indexer: indexer,
blockdao: blockdao,
}
}

func (b *contractStakeViewBuilder) Build(ctx context.Context, height uint64) (ContractStakeView, error) {
view, err := b.indexer.StartView(ctx)
if err != nil {
return nil, err
}
indexerHeight, err := b.indexer.Height()
if err != nil {
return nil, err
}
if indexerHeight == height {
return view, nil
} else if indexerHeight > height {
return nil, errors.Errorf("indexer height %d is greater than requested height %d", indexerHeight, height)
}
if b.blockdao == nil {
return nil, errors.Errorf("blockdao is nil, cannot build view for height %d", height)
}
for h := indexerHeight + 1; h <= height; h++ {
blk, err := b.blockdao.GetBlockByHeight(h)
if err != nil {
return nil, errors.Wrapf(err, "failed to get block at height %d", h)
}
if blk.Receipts == nil {
receipts, err := b.blockdao.GetReceipts(h)
if err != nil {
return nil, errors.Wrapf(err, "failed to get receipts at height %d", h)
}
blk.Receipts = receipts
}
if err = view.BuildWithBlock(ctx, blk); err != nil {
return nil, errors.Wrapf(err, "failed to build view with block at height %d", h)
}
}
return view, nil
}
2 changes: 2 additions & 0 deletions action/protocol/staking/viewdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/action/protocol"
"github.com/iotexproject/iotex-core/v2/blockchain/block"
)

type (
Expand All @@ -24,6 +25,7 @@ type (
Handle(ctx context.Context, receipt *action.Receipt) error
Commit()
BucketsByCandidate(ownerAddr address.Address) ([]*VoteBucket, error)
BuildWithBlock(ctx context.Context, blk *block.Block) error
}
// ViewData is the data that need to be stored in protocol's view
ViewData struct {
Expand Down
2 changes: 1 addition & 1 deletion blockchain/blockdao/blockdao.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewBlockDAOWithIndexersAndCache(blkStore BlockStore, indexers []BlockIndexe

// Start starts block DAO and initiates the top height if it doesn't exist
func (dao *blockDAO) Start(ctx context.Context) error {
err := dao.lifecycle.OnStart(ctx)
err := dao.lifecycle.OnStartSequentially(ctx)
if err != nil {
return errors.Wrap(err, "failed to start child services")
}
Expand Down
6 changes: 3 additions & 3 deletions blockchain/blockdao/blockdao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func Test_blockDAO_Start(t *testing.T) {
p := gomonkey.NewPatches()
defer p.Reset()

p.ApplyMethodReturn(&lifecycle.Lifecycle{}, "OnStart", errors.New(t.Name()))
p.ApplyMethodReturn(&lifecycle.Lifecycle{}, "OnStartSequentially", errors.New(t.Name()))

err := blockdao.Start(context.Background())
r.ErrorContains(err, t.Name())
Expand All @@ -93,7 +93,7 @@ func Test_blockDAO_Start(t *testing.T) {
p := gomonkey.NewPatches()
defer p.Reset()

p.ApplyMethodReturn(&lifecycle.Lifecycle{}, "OnStart", nil)
p.ApplyMethodReturn(&lifecycle.Lifecycle{}, "OnStartSequentially", nil)
mockblockdao.EXPECT().Height().Return(uint64(0), errors.New(t.Name())).Times(1)

err := blockdao.Start(context.Background())
Expand All @@ -106,7 +106,7 @@ func Test_blockDAO_Start(t *testing.T) {

expectedHeight := uint64(1)

p.ApplyMethodReturn(&lifecycle.Lifecycle{}, "OnStart", nil)
p.ApplyMethodReturn(&lifecycle.Lifecycle{}, "OnStartSequentially", nil)
mockblockdao.EXPECT().Height().Return(expectedHeight, nil).Times(1)
p.ApplyPrivateMethod(&blockDAO{}, "checkIndexers", func(*blockDAO, context.Context) error { return nil })

Expand Down
18 changes: 12 additions & 6 deletions blockindex/contractstaking/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,26 +204,32 @@ func (s *Indexer) PutBlock(ctx context.Context, blk *block.Block) error {
if blk.Height() > expectHeight {
return errors.Errorf("invalid block height %d, expect %d", blk.Height(), expectHeight)
}
handler, err := handleBlock(ctx, blk, &s.config, s.cache)
if err != nil {
return errors.Wrapf(err, "failed to put block %d", blk.Height())
}
return s.commit(handler, blk.Height())
}

func handleBlock(ctx context.Context, blk *block.Block, cfg *Config, cache *contractStakingCache) (*contractStakingEventHandler, error) {
// new event handler for this block
handler := newContractStakingEventHandler(s.cache)
handler := newContractStakingEventHandler(cache)

// handle events of block
for _, receipt := range blk.Receipts {
if receipt.Status != uint64(iotextypes.ReceiptStatus_Success) {
continue
}
for _, log := range receipt.Logs() {
if log.Address != s.config.ContractAddress {
if log.Address != cfg.ContractAddress {
continue
}
if err := handler.HandleEvent(ctx, blk.Height(), log); err != nil {
return err
return handler, err
}
}
}

// commit the result
return s.commit(handler, blk.Height())
return handler, nil
}

func (s *Indexer) commit(handler *contractStakingEventHandler, height uint64) error {
Expand Down
24 changes: 24 additions & 0 deletions blockindex/contractstaking/stakeview.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (

"github.com/iotexproject/iotex-address/address"
"github.com/iotexproject/iotex-proto/golang/iotextypes"
"github.com/pkg/errors"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/action/protocol"
"github.com/iotexproject/iotex-core/v2/action/protocol/staking"
"github.com/iotexproject/iotex-core/v2/blockchain/block"
)

type stakeView struct {
Expand Down Expand Up @@ -95,3 +97,25 @@ func (s *stakeView) Commit() {
s.dirty = nil
}
}

func (s *stakeView) BuildWithBlock(ctx context.Context, blk *block.Block) error {
s.mu.Lock()
defer s.mu.Unlock()
expectHeight := s.clean.Height() + 1
if expectHeight < s.helper.config.ContractDeployHeight {
expectHeight = s.helper.config.ContractDeployHeight
}
if blk.Height() < expectHeight {
return nil
}
if blk.Height() > expectHeight {
return errors.Errorf("invalid block height %d, expect %d", blk.Height(), expectHeight)
}

handler, err := handleBlock(ctx, blk, &s.helper.config, s.clean)
if err != nil {
return err
}
_, delta := handler.Result()
return s.clean.Merge(delta, blk.Height())
}
1 change: 1 addition & 0 deletions chainservice/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ func (builder *Builder) registerStakingProtocol() error {
if builder.cs.contractStakingIndexerV3 != nil {
opts = append(opts, staking.WithContractStakingIndexerV3(builder.cs.contractStakingIndexerV3))
}
opts = append(opts, staking.WithBlockStore(builder.cs.blockdao))
stakingProtocol, err := staking.NewProtocol(
staking.HelperCtx{
DepositGas: rewarding.DepositGas,
Expand Down
Loading