Skip to content

Commit 5851889

Browse files
authored
perf(core): Cache uids array in ristretto (#9430)
1 parent e9a921d commit 5851889

File tree

10 files changed

+132
-43
lines changed

10 files changed

+132
-43
lines changed

dgraph/cmd/debug/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ func printKeys(db *badger.DB) {
603603
}
604604

605605
var sz, deltaCount int64
606-
pl, err := posting.GetNew(key, db, opt.readTs)
606+
pl, err := posting.GetNew(key, db, opt.readTs, false)
607607
if err == nil {
608608
pl.RLock()
609609
c := pl.GetLength(math.MaxUint64)

dgraph/cmd/live/batch.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,11 +450,14 @@ loop:
450450
}
451451

452452
drain()
453-
time.Sleep(100 * time.Millisecond)
454453
}
454+
455+
for len(buffer) != 0 {
456+
drain()
457+
}
458+
455459
fmt.Printf("Looped %d times over buffered requests.\n", loops)
456460

457-
drain()
458461
}
459462

460463
func (l *loader) printCounters() {

posting/index_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func TestTokensTable(t *testing.T) {
172172

173173
attr := x.AttrInRootNamespace("name")
174174
key := x.DataKey(attr, 1)
175-
l, err := getNew(key, ps, math.MaxUint64)
175+
l, err := getNew(key, ps, math.MaxUint64, false)
176176
require.NoError(t, err)
177177

178178
edge := &pb.DirectedEdge{

posting/list.go

Lines changed: 88 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ type MutableLayer struct {
100100

101101
// We also cache some things required for us to update currentEntries faster
102102
currentUids map[uint64]int // Stores the uid to index mapping in the currentEntries posting list
103+
104+
// Cache for calculated UIDS
105+
isUidsCalculated bool
106+
calculatedUids []uint64
103107
}
104108

105109
func newMutableLayer() *MutableLayer {
@@ -110,6 +114,8 @@ func newMutableLayer() *MutableLayer {
110114
length: math.MaxInt,
111115
committedUids: make(map[uint64]*pb.Posting),
112116
committedUidsTime: math.MaxUint64,
117+
isUidsCalculated: false,
118+
calculatedUids: []uint64{},
113119
}
114120
}
115121

@@ -136,6 +142,8 @@ func (mm *MutableLayer) clone() *MutableLayer {
136142
length: mm.length,
137143
lastEntry: mm.lastEntry,
138144
committedUidsTime: mm.committedUidsTime,
145+
isUidsCalculated: mm.isUidsCalculated,
146+
calculatedUids: mm.calculatedUids,
139147
}
140148
}
141149

@@ -153,6 +161,8 @@ func (mm *MutableLayer) setCurrentEntries(ts uint64, pl *pb.PostingList) {
153161
mm.readTs = ts
154162
mm.currentEntries = pl
155163
clear(mm.currentUids)
164+
mm.isUidsCalculated = false
165+
mm.calculatedUids = nil
156166
mm.deleteAllMarker = math.MaxUint64
157167
mm.populateUidMap(pl)
158168
}
@@ -770,6 +780,9 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate, hasCountI
770780
l.mutationMap.currentEntries = &pb.PostingList{}
771781
}
772782

783+
l.mutationMap.isUidsCalculated = false
784+
l.mutationMap.calculatedUids = nil
785+
773786
if singleUidUpdate {
774787
// This handles the special case when adding a value to predicates of type uid.
775788
// The current value should be deleted in favor of this value. This needs to
@@ -1010,6 +1023,8 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi
10101023
l.mutationMap.currentEntries = nil
10111024
l.mutationMap.readTs = 0
10121025
l.mutationMap.currentUids = nil
1026+
l.mutationMap.isUidsCalculated = false
1027+
l.mutationMap.calculatedUids = nil
10131028

10141029
if pl.CommitTs != 0 {
10151030
l.maxTs = x.Max(l.maxTs, pl.CommitTs)
@@ -1691,6 +1706,43 @@ func (l *List) ApproxLen() int {
16911706
return l.mutationMap.len() + codec.ApproxLen(l.plist.Pack)
16921707
}
16931708

1709+
func (l *List) calculateUids() error {
1710+
l.RLock()
1711+
if l.mutationMap == nil || l.mutationMap.isUidsCalculated {
1712+
l.RUnlock()
1713+
return nil
1714+
}
1715+
res := make([]uint64, 0, l.ApproxLen())
1716+
1717+
err := l.iterate(l.mutationMap.committedUidsTime, 0, func(p *pb.Posting) error {
1718+
if p.PostingType == pb.Posting_REF {
1719+
res = append(res, p.Uid)
1720+
}
1721+
return nil
1722+
})
1723+
1724+
l.RUnlock()
1725+
1726+
if err != nil {
1727+
return err
1728+
}
1729+
1730+
l.Lock()
1731+
defer l.Unlock()
1732+
1733+
l.mutationMap.calculatedUids = res
1734+
l.mutationMap.isUidsCalculated = true
1735+
1736+
return nil
1737+
}
1738+
1739+
func (l *List) canUseCalculatedUids() bool {
1740+
if l.mutationMap == nil {
1741+
return false
1742+
}
1743+
return l.mutationMap.isUidsCalculated && l.mutationMap.currentEntries == nil
1744+
}
1745+
16941746
// Uids returns the UIDs given some query params.
16951747
// We have to apply the filtering before applying (offset, count).
16961748
// WARNING: Calling this function just to get UIDs is expensive
@@ -1700,6 +1752,30 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {
17001752
}
17011753

17021754
getUidList := func() (*pb.List, error, bool) {
1755+
if l.canUseCalculatedUids() {
1756+
l.RLock()
1757+
1758+
afterIdx := 0
1759+
1760+
if opt.AfterUid != 0 {
1761+
after := sort.Search(len(l.mutationMap.calculatedUids), func(i int) bool {
1762+
return l.mutationMap.calculatedUids[i] > opt.AfterUid
1763+
})
1764+
if after >= len(l.mutationMap.calculatedUids) {
1765+
l.RUnlock()
1766+
return &pb.List{}, nil, false
1767+
}
1768+
1769+
afterIdx = after
1770+
}
1771+
1772+
copyArr := make([]uint64, len(l.mutationMap.calculatedUids)-afterIdx)
1773+
copy(copyArr, l.mutationMap.calculatedUids[afterIdx:])
1774+
out := &pb.List{Uids: copyArr}
1775+
l.RUnlock()
1776+
1777+
return out, nil, opt.Intersect != nil
1778+
}
17031779
// Pre-assign length to make it faster.
17041780
l.RLock()
17051781
defer l.RUnlock()
@@ -1752,13 +1828,7 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {
17521828
}
17531829
res = append(res, p.Uid)
17541830

1755-
if opt.First < 0 {
1756-
// We need the last N.
1757-
// TODO: This could be optimized by only considering some of the last UidBlocks.
1758-
if len(res) > -opt.First {
1759-
res = res[1:]
1760-
}
1761-
} else if len(res) > opt.First {
1831+
if opt.First != 0 && len(res) > opt.First {
17621832
return ErrStopIteration
17631833
}
17641834
}
@@ -1774,19 +1844,22 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {
17741844

17751845
// Do The intersection here as it's optimized.
17761846
out, err, applyIntersectWith := getUidList()
1777-
if err != nil || !applyIntersectWith {
1847+
if err != nil || !applyIntersectWith || opt.First == 0 {
17781848
return out, err
17791849
}
17801850

1781-
lenBefore := len(out.Uids)
1782-
if opt.Intersect != nil {
1851+
if opt.Intersect != nil && applyIntersectWith {
17831852
algo.IntersectWith(out, opt.Intersect, out)
17841853
}
1785-
lenAfter := len(out.Uids)
1786-
if lenBefore-lenAfter > 0 {
1787-
// If we see this log, that means that iterate is going over too many elements that it doesn't need to
1788-
glog.V(3).Infof("Retrieved a list. length before intersection: %d, length after: %d, extra"+
1789-
" elements: %d", lenBefore, lenAfter, lenBefore-lenAfter)
1854+
1855+
if opt.First != 0 {
1856+
if opt.First < 0 {
1857+
if len(out.Uids) > -opt.First {
1858+
out.Uids = out.Uids[(len(out.Uids) + opt.First):]
1859+
}
1860+
} else if len(out.Uids) > opt.First {
1861+
out.Uids = out.Uids[:opt.First]
1862+
}
17901863
}
17911864
return out, nil
17921865
}

posting/list_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ func TestReadSingleValue(t *testing.T) {
500500
// and GetSingeValueForKey works without an issue.
501501

502502
key := x.DataKey(x.AttrInRootNamespace("value"), 1240)
503-
ol, err := getNew(key, ps, math.MaxUint64)
503+
ol, err := getNew(key, ps, math.MaxUint64, false)
504504
require.NoError(t, err)
505505
N := uint64(10000)
506506
for i := uint64(2); i <= N; i += 2 {
@@ -525,7 +525,7 @@ func TestReadSingleValue(t *testing.T) {
525525
require.NoError(t, writePostingListToDisk(kvs))
526526
// Delete item from global cache before reading, as we are not updating the cache in the test
527527
memoryLayer.del(key)
528-
ol, err = getNew(key, ps, math.MaxUint64)
528+
ol, err = getNew(key, ps, math.MaxUint64, false)
529529
require.NoError(t, err)
530530
}
531531

posting/lists.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func Cleanup() {
5757
// GetNoStore returns the list stored in the key or creates a new one if it doesn't exist.
5858
// It does not store the list in any cache.
5959
func GetNoStore(key []byte, readTs uint64) (rlist *List, err error) {
60-
return getNew(key, pstore, readTs)
60+
return getNew(key, pstore, readTs, false)
6161
}
6262

6363
// LocalCache stores a cache of posting lists and deltas.
@@ -258,13 +258,13 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List {
258258
return updated
259259
}
260260

261-
func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) {
261+
func (lc *LocalCache) getInternal(key []byte, readFromDisk, readUids bool) (*List, error) {
262262
skey := string(key)
263263
getNewPlistNil := func() (*List, error) {
264264
lc.RLock()
265265
defer lc.RUnlock()
266266
if lc.plists == nil {
267-
return getNew(key, pstore, lc.startTs)
267+
return getNew(key, pstore, lc.startTs, readUids)
268268
}
269269
if l, ok := lc.plists[skey]; ok {
270270
return l, nil
@@ -279,7 +279,7 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
279279
var pl *List
280280
if readFromDisk {
281281
var err error
282-
pl, err = getNew(key, pstore, lc.startTs)
282+
pl, err = getNew(key, pstore, lc.startTs, readUids)
283283
if err != nil {
284284
return nil, err
285285
}
@@ -390,14 +390,18 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
390390

391391
// Get retrieves the cached version of the list associated with the given key.
392392
func (lc *LocalCache) Get(key []byte) (*List, error) {
393-
return lc.getInternal(key, true)
393+
return lc.getInternal(key, true, false)
394+
}
395+
396+
func (lc *LocalCache) GetUids(key []byte) (*List, error) {
397+
return lc.getInternal(key, true, true)
394398
}
395399

396400
// GetFromDelta gets the cached version of the list without reading from disk
397401
// and only applies the existing deltas. This is used in situations where the
398402
// posting list will only be modified and not read (e.g adding index mutations).
399403
func (lc *LocalCache) GetFromDelta(key []byte) (*List, error) {
400-
return lc.getInternal(key, false)
404+
return lc.getInternal(key, false, false)
401405
}
402406

403407
// UpdateDeltasAndDiscardLists updates the delta cache before removing the stored posting lists.

posting/lmap_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func BenchmarkGet(b *testing.B) {
1818
for pb.Next() {
1919
// i := uint64(rand.Int63())
2020
_ = uint64(rand.Int63())
21-
_, _ = getNew(key, nil, math.MaxUint64)
21+
_, _ = getNew(key, nil, math.MaxUint64, false)
2222
// lmap.Get(i)
2323
}
2424
})
@@ -30,7 +30,7 @@ func BenchmarkGetLinear(b *testing.B) {
3030
for i := 0; i < b.N; i++ {
3131
k := uint64(i)
3232
if _, ok := m[k]; !ok {
33-
l, err := getNew(key, nil, math.MaxUint64)
33+
l, err := getNew(key, nil, math.MaxUint64, false)
3434
if err != nil {
3535
b.Error(err)
3636
}

posting/mvcc.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
636636
}
637637
it.Next()
638638
}
639+
639640
return l, nil
640641
}
641642

@@ -671,7 +672,7 @@ func (ml *MemoryLayer) readFromCache(key []byte, readTs uint64) *List {
671672
return nil
672673
}
673674

674-
func (ml *MemoryLayer) readFromDisk(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
675+
func (ml *MemoryLayer) readFromDisk(key []byte, pstore *badger.DB, readTs uint64, readUids bool) (*List, error) {
675676
txn := pstore.NewTransactionAt(readTs, false)
676677
defer txn.Discard()
677678

@@ -687,6 +688,14 @@ func (ml *MemoryLayer) readFromDisk(key []byte, pstore *badger.DB, readTs uint64
687688
if err != nil {
688689
return l, err
689690
}
691+
if readUids {
692+
if l.mutationMap == nil {
693+
l.mutationMap = newMutableLayer()
694+
}
695+
if err := l.calculateUids(); err != nil {
696+
return nil, err
697+
}
698+
}
690699
return l, nil
691700
}
692701

@@ -700,7 +709,7 @@ func (ml *MemoryLayer) saveInCache(key []byte, l *List) {
700709
ml.cache.set(key, cacheItem)
701710
}
702711

703-
func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
712+
func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64, readUids bool) (*List, error) {
704713
// We first try to read the data from cache, if it is present. If it's not present, then we would read the
705714
// latest data from the disk. This would get stored in the cache. If this read has a minTs > readTs then
706715
// we would have to read the correct timestamp from the disk.
@@ -709,7 +718,7 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*
709718
l.mutationMap.setTs(readTs)
710719
return l, nil
711720
}
712-
l, err := ml.readFromDisk(key, pstore, math.MaxUint64)
721+
l, err := ml.readFromDisk(key, pstore, math.MaxUint64, readUids)
713722
if err != nil {
714723
return nil, err
715724
}
@@ -719,7 +728,7 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*
719728
return l, nil
720729
}
721730

722-
l, err = ml.readFromDisk(key, pstore, readTs)
731+
l, err = ml.readFromDisk(key, pstore, readTs, readUids)
723732
if err != nil {
724733
return nil, err
725734
}
@@ -728,16 +737,16 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*
728737
return l, nil
729738
}
730739

731-
func GetNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
732-
return getNew(key, pstore, readTs)
740+
func GetNew(key []byte, pstore *badger.DB, readTs uint64, readUids bool) (*List, error) {
741+
return getNew(key, pstore, readTs, readUids)
733742
}
734743

735-
func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
744+
func getNew(key []byte, pstore *badger.DB, readTs uint64, readUids bool) (*List, error) {
736745
if pstore.IsClosed() {
737746
return nil, badger.ErrDBClosed
738747
}
739748

740-
l, err := memoryLayer.ReadData(key, pstore, readTs)
749+
l, err := memoryLayer.ReadData(key, pstore, readTs, readUids)
741750
if err != nil {
742751
return l, err
743752
}

0 commit comments

Comments
 (0)