diff --git a/action/protocol/staking/bucket_pool_test.go b/action/protocol/staking/bucket_pool_test.go index 2e6e330347..744a590052 100644 --- a/action/protocol/staking/bucket_pool_test.go +++ b/action/protocol/staking/bucket_pool_test.go @@ -79,6 +79,7 @@ func TestBucketPool(t *testing.T) { } view, _, err := CreateBaseView(sm, false) + view.contractsStake = &contractStakeView{} r.NoError(err) r.NoError(sm.WriteView(_protocolID, view)) pool = view.bucketPool diff --git a/action/protocol/staking/candidate_statereader.go b/action/protocol/staking/candidate_statereader.go index d41e7d6613..728dee254c 100644 --- a/action/protocol/staking/candidate_statereader.go +++ b/action/protocol/staking/candidate_statereader.go @@ -176,8 +176,9 @@ func CreateBaseView(sr protocol.StateReader, enableSMStorage bool) (*ViewData, u } return &ViewData{ - candCenter: center, - bucketPool: pool, + candCenter: center, + bucketPool: pool, + contractsStake: &contractStakeView{}, }, height, nil } diff --git a/action/protocol/staking/viewdata.go b/action/protocol/staking/viewdata.go index 2f0fc9fba2..e1ddee51b1 100644 --- a/action/protocol/staking/viewdata.go +++ b/action/protocol/staking/viewdata.go @@ -10,9 +10,10 @@ import ( "math/big" "github.com/iotexproject/iotex-address/address" + "github.com/pkg/errors" + "github.com/iotexproject/iotex-core/v2/action" "github.com/iotexproject/iotex-core/v2/action/protocol" - "github.com/pkg/errors" ) type ( @@ -21,6 +22,7 @@ type ( Clone() ContractStakeView CreatePreStates(ctx context.Context) error Handle(ctx context.Context, receipt *action.Receipt) error + Commit() BucketsByCandidate(ownerAddr address.Address) ([]*VoteBucket, error) } // ViewData is the data that need to be stored in protocol's view @@ -79,6 +81,7 @@ func (v *ViewData) Commit(ctx context.Context, sr protocol.StateReader) error { if err := v.bucketPool.Commit(sr); err != nil { return err } + v.contractsStake.Commit() v.snapshots = []Snapshot{} return nil @@ -172,3 +175,15 @@ func (csv *contractStakeView) Handle(ctx context.Context, receipt *action.Receip } return nil } + +func (csv *contractStakeView) Commit() { + if csv.v1 != nil { + csv.v1.Commit() + } + if csv.v2 != nil { + csv.v2.Commit() + } + if csv.v3 != nil { + csv.v3.Commit() + } +} diff --git a/action/protocol/staking/viewdata_test.go b/action/protocol/staking/viewdata_test.go index 17517d319a..716e290888 100644 --- a/action/protocol/staking/viewdata_test.go +++ b/action/protocol/staking/viewdata_test.go @@ -6,9 +6,10 @@ import ( "testing" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/iotexproject/iotex-core/v2/test/identityset" "github.com/iotexproject/iotex-core/v2/test/mock/mock_chainmanager" - "github.com/stretchr/testify/require" ) func TestViewData_Clone(t *testing.T) { @@ -61,9 +62,10 @@ func prepareViewData(t *testing.T) (*ViewData, int) { }, } viewData := &ViewData{ - candCenter: candCenter, - bucketPool: bucketPool, - snapshots: []Snapshot{}, + candCenter: candCenter, + bucketPool: bucketPool, + snapshots: []Snapshot{}, + contractsStake: &contractStakeView{}, } return viewData, viewData.Snapshot() } diff --git a/blockindex/contractstaking/indexer.go b/blockindex/contractstaking/indexer.go index 26600dc08f..d012d87602 100644 --- a/blockindex/contractstaking/indexer.go +++ b/blockindex/contractstaking/indexer.go @@ -87,7 +87,7 @@ func (s *Indexer) StartView(ctx context.Context) (staking.ContractStakeView, err } return &stakeView{ helper: s, - cache: s.cache.Clone(), + clean: s.cache.Clone(), height: s.cache.Height(), }, nil } diff --git a/blockindex/contractstaking/stakeview.go b/blockindex/contractstaking/stakeview.go index de3130613b..6ba7e87431 100644 --- a/blockindex/contractstaking/stakeview.go +++ b/blockindex/contractstaking/stakeview.go @@ -2,59 +2,96 @@ package contractstaking import ( "context" + "sync" "github.com/iotexproject/iotex-address/address" + "github.com/iotexproject/iotex-proto/golang/iotextypes" + "github.com/iotexproject/iotex-core/v2/action" "github.com/iotexproject/iotex-core/v2/action/protocol" "github.com/iotexproject/iotex-core/v2/action/protocol/staking" - "github.com/iotexproject/iotex-proto/golang/iotextypes" ) type stakeView struct { helper *Indexer - cache *contractStakingCache + clean *contractStakingCache + dirty *contractStakingCache height uint64 + mu sync.RWMutex } func (s *stakeView) Clone() staking.ContractStakeView { - return &stakeView{ + s.mu.Lock() + defer s.mu.Unlock() + clone := &stakeView{ helper: s.helper, - cache: s.cache.Clone(), + clean: s.clean, + dirty: nil, height: s.height, } + if s.dirty != nil { + clone.clean = s.dirty.Clone() + } + return clone } func (s *stakeView) BucketsByCandidate(candidate address.Address) ([]*Bucket, error) { - return s.cache.bucketsByCandidate(candidate, s.height) + s.mu.RLock() + defer s.mu.RUnlock() + if s.dirty != nil { + return s.dirty.bucketsByCandidate(candidate, s.height) + } + return s.clean.bucketsByCandidate(candidate, s.height) } func (s *stakeView) CreatePreStates(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() blkCtx := protocol.MustGetBlockCtx(ctx) s.height = blkCtx.BlockHeight return nil } func (s *stakeView) Handle(ctx context.Context, receipt *action.Receipt) error { - blkCtx := protocol.MustGetBlockCtx(ctx) - // new event handler for this receipt - handler := newContractStakingEventHandler(s.cache) - - // handle events of receipt if receipt.Status != uint64(iotextypes.ReceiptStatus_Success) { return nil } + var ( + blkCtx = protocol.MustGetBlockCtx(ctx) + handler *contractStakingEventHandler + ) for _, log := range receipt.Logs() { if log.Address != s.helper.config.ContractAddress { continue } + if handler == nil { + s.mu.Lock() + // new event handler for this receipt + if s.dirty == nil { + s.dirty = s.clean.Clone() + } + handler = newContractStakingEventHandler(s.dirty) + s.mu.Unlock() + } if err := handler.HandleEvent(ctx, blkCtx.BlockHeight, log); err != nil { return err } } + if handler == nil { + return nil + } _, delta := handler.Result() // update cache - if err := s.cache.Merge(delta, blkCtx.BlockHeight); err != nil { - return err + s.mu.Lock() + defer s.mu.Unlock() + return s.dirty.Merge(delta, blkCtx.BlockHeight) +} + +func (s *stakeView) Commit() { + s.mu.Lock() + defer s.mu.Unlock() + if s.dirty != nil { + s.clean = s.dirty + s.dirty = nil } - return nil } diff --git a/systemcontractindex/stakingindex/cache.go b/systemcontractindex/stakingindex/cache.go index 8587864e8c..5925806943 100644 --- a/systemcontractindex/stakingindex/cache.go +++ b/systemcontractindex/stakingindex/cache.go @@ -56,7 +56,7 @@ func (s *cache) Load(kvstore db.KVStore) error { return nil } -func (s *cache) Copy() *cache { +func (s *cache) Copy() bucketCache { c := newCache(s.ns, s.bucketNS) for k, v := range s.buckets { c.buckets[k] = v.Clone() diff --git a/systemcontractindex/stakingindex/cowcache.go b/systemcontractindex/stakingindex/cowcache.go new file mode 100644 index 0000000000..6ad1915cad --- /dev/null +++ b/systemcontractindex/stakingindex/cowcache.go @@ -0,0 +1,104 @@ +package stakingindex + +import ( + "errors" + "sync" + + "github.com/iotexproject/iotex-address/address" + + "github.com/iotexproject/iotex-core/v2/db" +) + +type ( + bucketCache interface { + Load(kvstore db.KVStore) error + Copy() bucketCache + PutBucket(id uint64, bkt *Bucket) + DeleteBucket(id uint64) + BucketIdxs() []uint64 + Bucket(id uint64) *Bucket + Buckets(indices []uint64) []*Bucket + BucketIdsByCandidate(candidate address.Address) []uint64 + TotalBucketCount() uint64 + } + + cowCache struct { + cache bucketCache + dirty bool + mu sync.Mutex + } +) + +func newCowCache(cache bucketCache) *cowCache { + return &cowCache{ + cache: cache, + dirty: false, + } +} + +func (cow *cowCache) Copy() bucketCache { + cow.mu.Lock() + defer cow.mu.Unlock() + if cow.dirty { + cow.dirty = false + } + return &cowCache{ + cache: cow.cache, + dirty: false, + } +} + +func (cow *cowCache) Load(kvstore db.KVStore) error { + return errors.New("not supported in cowCache") +} + +func (cow *cowCache) BucketIdsByCandidate(candidate address.Address) []uint64 { + cow.mu.Lock() + defer cow.mu.Unlock() + return cow.cache.BucketIdsByCandidate(candidate) +} + +func (cow *cowCache) PutBucket(id uint64, bkt *Bucket) { + cow.mu.Lock() + defer cow.mu.Unlock() + cow.ensureCopied() + cow.cache.PutBucket(id, bkt) +} + +func (cow *cowCache) DeleteBucket(id uint64) { + cow.mu.Lock() + defer cow.mu.Unlock() + cow.ensureCopied() + cow.cache.DeleteBucket(id) +} + +func (cow *cowCache) BucketIdxs() []uint64 { + cow.mu.Lock() + defer cow.mu.Unlock() + return cow.cache.BucketIdxs() +} + +func (cow *cowCache) Bucket(id uint64) *Bucket { + cow.mu.Lock() + defer cow.mu.Unlock() + return cow.cache.Bucket(id) +} + +func (cow *cowCache) Buckets(indices []uint64) []*Bucket { + cow.mu.Lock() + defer cow.mu.Unlock() + return cow.cache.Buckets(indices) +} + +func (cow *cowCache) TotalBucketCount() uint64 { + cow.mu.Lock() + defer cow.mu.Unlock() + return cow.cache.TotalBucketCount() +} + +func (cow *cowCache) ensureCopied() { + if !cow.dirty { + cow.cache = cow.cache.Copy() + cow.dirty = true + } +} diff --git a/systemcontractindex/stakingindex/cowcache_test.go b/systemcontractindex/stakingindex/cowcache_test.go new file mode 100644 index 0000000000..9a56d03dc9 --- /dev/null +++ b/systemcontractindex/stakingindex/cowcache_test.go @@ -0,0 +1,48 @@ +package stakingindex + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/iotexproject/iotex-core/v2/test/identityset" +) + +func TestCowCache(t *testing.T) { + r := require.New(t) + buckets := []*Bucket{ + {Candidate: identityset.Address(1), Owner: identityset.Address(2), StakedAmount: big.NewInt(1000), Timestamped: true, StakedDuration: 3600, CreatedAt: 1622548800, UnlockedAt: 1622552400, UnstakedAt: 1622556000, Muted: false}, + {Candidate: identityset.Address(3), Owner: identityset.Address(4), StakedAmount: big.NewInt(2000), Timestamped: false, StakedDuration: 7200, CreatedAt: 1622548801, UnlockedAt: 1622552401, UnstakedAt: 1622556001, Muted: true}, + {Candidate: identityset.Address(5), Owner: identityset.Address(6), StakedAmount: big.NewInt(3000), Timestamped: true, StakedDuration: 10800, CreatedAt: 1622548802, UnlockedAt: 1622552402, UnstakedAt: 1622556002, Muted: false}, + {Candidate: identityset.Address(7), Owner: identityset.Address(8), StakedAmount: big.NewInt(4000), Timestamped: true, StakedDuration: 10800, CreatedAt: 1622548802, UnlockedAt: 1622552402, UnstakedAt: 1622556002, Muted: false}, + } + original := newCache("testNS", "testBucketNS") + original.PutBucket(0, buckets[0]) + // case 1: read cowCache without modification + cow := newCowCache(original) + r.Equal(buckets[0], cow.Bucket(0)) + + // case 2: modify cowCache but not affect original cache + cow.PutBucket(1, buckets[1]) + r.Equal(buckets[1], cow.Bucket(1)) + r.Nil(original.Bucket(1)) + cow.DeleteBucket(0) + r.Nil(cow.Bucket(0)) + r.Equal(buckets[0], original.Bucket(0)) + + // case 3: not real copy before modification + copi := cow.Copy() + r.Equal(buckets[1], copi.Bucket(1)) + r.Equal(cow.cache, copi.(*cowCache).cache) + + // case 4: copied not affected by original modification + cow.PutBucket(2, buckets[2]) + r.Equal(buckets[2], cow.Bucket(2)) + r.Nil(copi.Bucket(2)) + + // case 5: original not affected by copied modification + copi.PutBucket(3, buckets[3]) + r.Equal(buckets[3], copi.Bucket(3)) + r.Nil(cow.Bucket(3)) +} diff --git a/systemcontractindex/stakingindex/event_handler.go b/systemcontractindex/stakingindex/event_handler.go index e9dd990b1f..797d07dc19 100644 --- a/systemcontractindex/stakingindex/event_handler.go +++ b/systemcontractindex/stakingindex/event_handler.go @@ -32,7 +32,7 @@ var ( type eventHandler struct { stakingBucketNS string - dirty *cache // dirty cache, a view for current block + dirty bucketCache // dirty cache, a view for current block delta batch.KVStoreBatch // delta for db to store buckets of current block tokenOwner map[uint64]address.Address // context for event handler @@ -49,7 +49,7 @@ func init() { } } -func newEventHandler(bucketNS string, dirty *cache, blkCtx protocol.BlockCtx, timestamped, muted bool) *eventHandler { +func newEventHandler(bucketNS string, dirty bucketCache, blkCtx protocol.BlockCtx, timestamped, muted bool) *eventHandler { return &eventHandler{ stakingBucketNS: bucketNS, dirty: dirty, @@ -284,7 +284,7 @@ func (eh *eventHandler) HandleDonatedEvent(event *abiutil.EventParam) error { return nil } -func (eh *eventHandler) Finalize() (batch.KVStoreBatch, *cache) { +func (eh *eventHandler) Finalize() (batch.KVStoreBatch, bucketCache) { delta, dirty := eh.delta, eh.dirty eh.delta, eh.dirty = nil, nil return delta, dirty diff --git a/systemcontractindex/stakingindex/index.go b/systemcontractindex/stakingindex/index.go index 30f651c737..4ff289061f 100644 --- a/systemcontractindex/stakingindex/index.go +++ b/systemcontractindex/stakingindex/index.go @@ -59,12 +59,12 @@ type ( HandleMergedEvent(event *abiutil.EventParam) error HandleBucketExpandedEvent(event *abiutil.EventParam) error HandleDonatedEvent(event *abiutil.EventParam) error - Finalize() (batch.KVStoreBatch, *cache) + Finalize() (batch.KVStoreBatch, bucketCache) } // Indexer is the staking indexer Indexer struct { common *systemcontractindex.IndexerCommon - cache *cache // in-memory cache, used to query index data + cache bucketCache // in-memory cache, used to query index data mutex sync.RWMutex blocksToDuration blocksDurationAtFn // function to calculate duration from block range bucketNS string @@ -130,7 +130,7 @@ func (s *Indexer) StartView(ctx context.Context) (staking.ContractStakeView, err } return &stakeView{ helper: s, - cache: s.cache.Copy(), + cache: newCowCache(s.cache.Copy()), height: s.common.Height(), }, nil } diff --git a/systemcontractindex/stakingindex/stakeview.go b/systemcontractindex/stakingindex/stakeview.go index 45f434f3e6..dcdb51eeea 100644 --- a/systemcontractindex/stakingindex/stakeview.go +++ b/systemcontractindex/stakingindex/stakeview.go @@ -2,6 +2,7 @@ package stakingindex import ( "context" + "sync" "github.com/iotexproject/iotex-address/address" @@ -12,11 +13,14 @@ import ( type stakeView struct { helper *Indexer - cache *cache + cache bucketCache height uint64 + mu sync.RWMutex } func (s *stakeView) Clone() staking.ContractStakeView { + s.mu.Lock() + defer s.mu.Unlock() return &stakeView{ helper: s.helper, cache: s.cache.Copy(), @@ -24,6 +28,8 @@ func (s *stakeView) Clone() staking.ContractStakeView { } } func (s *stakeView) BucketsByCandidate(candidate address.Address) ([]*VoteBucket, error) { + s.mu.RLock() + defer s.mu.RUnlock() idxs := s.cache.BucketIdsByCandidate(candidate) bkts := s.cache.Buckets(idxs) // filter out muted buckets @@ -40,14 +46,20 @@ func (s *stakeView) BucketsByCandidate(candidate address.Address) ([]*VoteBucket } func (s *stakeView) CreatePreStates(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() blkCtx := protocol.MustGetBlockCtx(ctx) s.height = blkCtx.BlockHeight return nil } func (s *stakeView) Handle(ctx context.Context, receipt *action.Receipt) error { + s.mu.Lock() + defer s.mu.Unlock() blkCtx := protocol.MustGetBlockCtx(ctx) muted := s.helper.muteHeight > 0 && blkCtx.BlockHeight >= s.helper.muteHeight handler := newEventHandler(s.helper.bucketNS, s.cache, blkCtx, s.helper.timestamped, muted) return s.helper.handleReceipt(ctx, handler, receipt) } + +func (s *stakeView) Commit() {}