Skip to content

Redefine view #4668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions blockindex/contractstaking/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type (
mutex sync.RWMutex // a RW mutex for the cache to protect concurrent access
config Config
}

wrappedCache struct {
cache contractStakingCache
}
)

var (
Expand Down
4 changes: 0 additions & 4 deletions blockindex/contractstaking/delta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,6 @@ func (s *contractStakingDelta) AddedBucketTypeCnt() uint64 {
return cnt
}

func (s *contractStakingDelta) isBucketDeleted(id uint64) bool {
return s.bucketInfoDeltaState[id] == deltaStateRemoved
}

func (s *contractStakingDelta) addBucketInfo(id uint64, bi *bucketInfo) error {
var err error
s.bucketInfoDeltaState[id], err = s.bucketInfoDeltaState[id].Transfer(deltaActionAdd)
Expand Down
226 changes: 201 additions & 25 deletions systemcontractindex/stakingindex/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,46 @@ import (
"github.com/iotexproject/iotex-core/v2/pkg/util/byteutil"
)

// cache is the in-memory cache for staking index
// it is not thread-safe and should be protected by the caller
type cache struct {
buckets map[uint64]*Bucket
bucketsByCandidate map[string]map[uint64]struct{}
totalBucketCount uint64
ns, bucketNS string
}
type (
indexerCache interface {
PutBucket(id uint64, bkt *Bucket)
DeleteBucket(id uint64)
BucketIdxs() []uint64
Bucket(id uint64) *Bucket
Buckets(indices []uint64) []*Bucket
BucketIdsByCandidate(candidate address.Address) []uint64
TotalBucketCount() uint64
Base() indexerCache
Commit() error
IsDirty() bool
}
// base is the in-memory base for staking index
// it is not thread-safe and should be protected by the caller
base struct {
buckets map[uint64]*Bucket
bucketsByCandidate map[string]map[uint64]struct{}
totalBucketCount uint64
}

wrappedCache struct {
cache indexerCache
bucketsByCandidate map[string]map[uint64]bool // buckets by candidate in current block
updatedBuckets map[uint64]*Bucket // updated buckets in current block
deletedBucketIds map[uint64]struct{} // deleted buckets in current block
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove field deletedBucketIds, and replacing with updatedBuckets[idx] == nil , may make things simpler

}
)

func newCache(ns, bucketNS string) *cache {
return &cache{
func newCache() *base {
return &base{
buckets: make(map[uint64]*Bucket),
bucketsByCandidate: make(map[string]map[uint64]struct{}),
ns: ns,
bucketNS: bucketNS,
}
}

func (s *cache) Load(kvstore db.KVStore) error {
func (s *base) Load(kvstore db.KVStore, ns, bucketNS string) error {
// load total bucket count
var totalBucketCount uint64
tbc, err := kvstore.Get(s.ns, stakingTotalBucketCountKey)
tbc, err := kvstore.Get(ns, stakingTotalBucketCountKey)
if err != nil {
if !errors.Is(err, db.ErrNotExist) {
return err
Expand All @@ -42,7 +60,7 @@ func (s *cache) Load(kvstore db.KVStore) error {
s.totalBucketCount = totalBucketCount

// load buckets
ks, vs, err := kvstore.Filter(s.bucketNS, func(k, v []byte) bool { return true }, nil, nil)
ks, vs, err := kvstore.Filter(bucketNS, func(k, v []byte) bool { return true }, nil, nil)
if err != nil && !errors.Is(err, db.ErrBucketNotExist) {
return err
}
Expand All @@ -56,8 +74,8 @@ func (s *cache) Load(kvstore db.KVStore) error {
return nil
}

func (s *cache) Copy() *cache {
c := newCache(s.ns, s.bucketNS)
func (s *base) DeepClone() indexerCache {
c := newCache()
for k, v := range s.buckets {
c.buckets[k] = v.Clone()
}
Expand All @@ -71,7 +89,7 @@ func (s *cache) Copy() *cache {
return c
}

func (s *cache) PutBucket(id uint64, bkt *Bucket) {
func (s *base) PutBucket(id uint64, bkt *Bucket) {
cand := bkt.Candidate.String()
if s.buckets[id] != nil {
prevCand := s.buckets[id].Candidate.String()
Expand All @@ -87,10 +105,9 @@ func (s *cache) PutBucket(id uint64, bkt *Bucket) {
s.bucketsByCandidate[cand] = make(map[uint64]struct{})
}
s.bucketsByCandidate[cand][id] = struct{}{}
return
}

func (s *cache) DeleteBucket(id uint64) {
func (s *base) DeleteBucket(id uint64) {
bkt, ok := s.buckets[id]
if !ok {
return
Expand All @@ -103,22 +120,22 @@ func (s *cache) DeleteBucket(id uint64) {
delete(s.buckets, id)
}

func (s *cache) BucketIdxs() []uint64 {
func (s *base) BucketIdxs() []uint64 {
idxs := make([]uint64, 0, len(s.buckets))
for id := range s.buckets {
idxs = append(idxs, id)
}
return idxs
}

func (s *cache) Bucket(id uint64) *Bucket {
func (s *base) Bucket(id uint64) *Bucket {
if bkt, ok := s.buckets[id]; ok {
return bkt.Clone()
}
return nil
}

func (s *cache) Buckets(indices []uint64) []*Bucket {
func (s *base) Buckets(indices []uint64) []*Bucket {
buckets := make([]*Bucket, 0, len(indices))
for _, idx := range indices {
if bkt, ok := s.buckets[idx]; ok {
Expand All @@ -128,7 +145,7 @@ func (s *cache) Buckets(indices []uint64) []*Bucket {
return buckets
}

func (s *cache) BucketIdsByCandidate(candidate address.Address) []uint64 {
func (s *base) BucketIdsByCandidate(candidate address.Address) []uint64 {
cand := candidate.String()
buckets := make([]uint64, 0, len(s.bucketsByCandidate[cand]))
for idx := range s.bucketsByCandidate[cand] {
Expand All @@ -137,6 +154,165 @@ func (s *cache) BucketIdsByCandidate(candidate address.Address) []uint64 {
return buckets
}

func (s *cache) TotalBucketCount() uint64 {
func (s *base) TotalBucketCount() uint64 {
return s.totalBucketCount
}

func (s *base) Base() indexerCache {
return s
}

func (s *base) IsDirty() bool {
return false
}

func (s *base) Commit() error {
return nil
}

func newWrappedCache(cache indexerCache) *wrappedCache {
return &wrappedCache{
cache: cache,
bucketsByCandidate: make(map[string]map[uint64]bool),
updatedBuckets: make(map[uint64]*Bucket),
deletedBucketIds: make(map[uint64]struct{}),
}
}

func (w *wrappedCache) PutBucket(id uint64, bkt *Bucket) {
oldBucket, ok := w.updatedBuckets[id]
if !ok {
oldBucket = w.cache.Bucket(id)
}
if oldBucket != nil {
oldCand := oldBucket.Candidate.String()
if w.bucketsByCandidate[oldCand] == nil {
w.bucketsByCandidate[oldCand] = make(map[uint64]bool)
}
w.bucketsByCandidate[oldCand][id] = false
}
w.updatedBuckets[id] = bkt
delete(w.deletedBucketIds, id)
cand := bkt.Candidate.String()
if w.bucketsByCandidate[cand] == nil {
w.bucketsByCandidate[cand] = make(map[uint64]bool)
}
w.bucketsByCandidate[cand][id] = true
}

func (w *wrappedCache) DeleteBucket(id uint64) {
w.deletedBucketIds[id] = struct{}{}
delete(w.updatedBuckets, id)
for cand := range w.bucketsByCandidate {
delete(w.bucketsByCandidate[cand], id)
if len(w.bucketsByCandidate[cand]) == 0 {
delete(w.bucketsByCandidate, cand)
}
}
}

func (w *wrappedCache) BucketIdxs() []uint64 {
idxMap := make(map[uint64]struct{})
// Load from underlying cache
for _, id := range w.cache.BucketIdxs() {
if _, deleted := w.deletedBucketIds[id]; !deleted {
idxMap[id] = struct{}{}
}
}
// Add updatedBuckets
for id := range w.updatedBuckets {
if _, deleted := w.deletedBucketIds[id]; !deleted {
idxMap[id] = struct{}{}
}
}
idxs := make([]uint64, 0, len(idxMap))
for id := range idxMap {
idxs = append(idxs, id)
}
return idxs
}

func (w *wrappedCache) Bucket(id uint64) *Bucket {
if _, deleted := w.deletedBucketIds[id]; deleted {
return nil
}
if bkt, ok := w.updatedBuckets[id]; ok {
return bkt.Clone()
}
return w.cache.Bucket(id)
}

func (w *wrappedCache) Buckets(indices []uint64) []*Bucket {
buckets := make([]*Bucket, 0, len(indices))
for _, idx := range indices {
if _, deleted := w.deletedBucketIds[idx]; deleted {
continue
}
if bkt, ok := w.updatedBuckets[idx]; ok {
buckets = append(buckets, bkt.Clone())
} else if bkt := w.cache.Bucket(idx); bkt != nil {
buckets = append(buckets, bkt.Clone())
}
}
return buckets
}

func (w *wrappedCache) BucketIdsByCandidate(candidate address.Address) []uint64 {
cand := candidate.String()
ids := make(map[uint64]struct{})
// Read ids from cache first
for _, id := range w.cache.BucketIdsByCandidate(candidate) {
ids[id] = struct{}{}
}
// Update ids according to current block changes
if vals, ok := w.bucketsByCandidate[cand]; ok {
for id, keep := range vals {
if keep {
ids[id] = struct{}{}
} else {
delete(ids, id)
}
}
}
// Remove deleted ids
for id := range w.deletedBucketIds {
delete(ids, id)
}
result := make([]uint64, 0, len(ids))
for id := range ids {
result = append(result, id)
}
return result
}

func (w *wrappedCache) Base() indexerCache {
return w.cache.Base()
}

func (w *wrappedCache) TotalBucketCount() uint64 {
// TODO: update total bucket count based on current block changes
return w.cache.TotalBucketCount()
}

func (w *wrappedCache) Commit() error {
if w.isDirty() {
for id, bkt := range w.updatedBuckets {
w.cache.PutBucket(id, bkt)
}
for id := range w.deletedBucketIds {
w.cache.DeleteBucket(id)
}
w.updatedBuckets = make(map[uint64]*Bucket)
w.deletedBucketIds = make(map[uint64]struct{})
w.bucketsByCandidate = make(map[string]map[uint64]bool)
}
return w.cache.Commit()
}

func (w *wrappedCache) isDirty() bool {
return len(w.updatedBuckets) > 0 || len(w.deletedBucketIds) > 0 || len(w.bucketsByCandidate) > 0
}

func (w *wrappedCache) IsDirty() bool {
return w.cache.IsDirty() || w.isDirty()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about whether to call cache.IsDirty()

}
6 changes: 3 additions & 3 deletions systemcontractindex/stakingindex/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (

type eventHandler struct {
stakingBucketNS string
dirty *cache // dirty cache, a view for current block
dirty indexerCache // dirty cache, a view for current block
delta batch.KVStoreBatch // delta for db to store buckets of current block
tokenOwner map[uint64]address.Address
// context for event handler
Expand All @@ -49,7 +49,7 @@ func init() {
}
}

func newEventHandler(bucketNS string, dirty *cache, blkCtx protocol.BlockCtx, timestamped, muted bool) *eventHandler {
func newEventHandler(bucketNS string, dirty indexerCache, blkCtx protocol.BlockCtx, timestamped, muted bool) *eventHandler {
return &eventHandler{
stakingBucketNS: bucketNS,
dirty: dirty,
Expand Down Expand Up @@ -284,7 +284,7 @@ func (eh *eventHandler) HandleDonatedEvent(event *abiutil.EventParam) error {
return nil
}

func (eh *eventHandler) Finalize() (batch.KVStoreBatch, *cache) {
func (eh *eventHandler) Finalize() (batch.KVStoreBatch, indexerCache) {
delta, dirty := eh.delta, eh.dirty
eh.delta, eh.dirty = nil, nil
return delta, dirty
Expand Down
Loading