Skip to content

Commit 2f0df2f

Browse files
authored
Merge pull request #2378 from CortexFoundation/dev
remove filter base row cache, add group read
2 parents 4f6eea4 + 2b52c0f commit 2f0df2f

File tree

3 files changed

+96
-82
lines changed

3 files changed

+96
-82
lines changed

core/filtermaps/filtermaps.go

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ const (
5353
databaseVersion = 2 // reindexed if database version does not match
5454
cachedLastBlocks = 1000 // last block of map pointers
5555
cachedLvPointers = 1000 // first log value pointer of block pointers
56-
cachedBaseRows = 100 // groups of base layer filter row data
5756
cachedFilterMaps = 3 // complete filter maps (cached by map renderer)
5857
cachedRenderSnapshots = 8 // saved map renderer data at block boundaries
5958
)
@@ -101,7 +100,6 @@ type FilterMaps struct {
101100
filterMapCache *lru.Cache[uint32, filterMap]
102101
lastBlockCache *lru.Cache[uint32, lastBlockOfMap]
103102
lvPointerCache *lru.Cache[uint64, uint64]
104-
baseRowsCache *lru.Cache[uint64, [][]uint32]
105103

106104
// the matchers set and the fields of FilterMapsMatcherBackend instances are
107105
// read and written both by exported functions and the indexer.
@@ -264,7 +262,6 @@ func NewFilterMaps(db ctxcdb.KeyValueStore, initView *ChainView, historyCutoff,
264262
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
265263
lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks),
266264
lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers),
267-
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
268265
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
269266
}
270267
f.checkRevertRange() // revert maps that are inconsistent with the current chain view
@@ -348,7 +345,6 @@ func (f *FilterMaps) reset() {
348345
f.renderSnapshots.Purge()
349346
f.lastBlockCache.Purge()
350347
f.lvPointerCache.Purge()
351-
f.baseRowsCache.Purge()
352348
f.indexLock.Unlock()
353349
// deleting the range first ensures that resetDb will be called again at next
354350
// startup and any leftover data will be removed even if it cannot finish now.
@@ -565,47 +561,69 @@ func (f *FilterMaps) getFilterMap(mapIndex uint32) (filterMap, error) {
565561
}
566562
fm := make(filterMap, f.mapHeight)
567563
for rowIndex := range fm {
568-
var err error
569-
fm[rowIndex], err = f.getFilterMapRow(mapIndex, uint32(rowIndex), false)
564+
rows, err := f.getFilterMapRows([]uint32{mapIndex}, uint32(rowIndex), false)
570565
if err != nil {
571566
return nil, fmt.Errorf("failed to load filter map %d from database: %v", mapIndex, err)
572567
}
568+
fm[rowIndex] = rows[0]
573569
}
574570
f.filterMapCache.Add(mapIndex, fm)
575571
return fm, nil
576572
}
577573

578-
// getFilterMapRow fetches the given filter map row. If baseLayerOnly is true
579-
// then only the first baseRowLength entries are returned.
580-
func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) {
581-
baseMapRowIndex := f.mapRowIndex(mapIndex&-f.baseRowGroupLength, rowIndex)
582-
baseRows, ok := f.baseRowsCache.Get(baseMapRowIndex)
583-
if !ok {
584-
var err error
585-
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
586-
if err != nil {
587-
return nil, fmt.Errorf("failed to retrieve filter map %d base rows %d: %v", mapIndex, rowIndex, err)
574+
// getFilterMapRows fetches a set of filter map rows at the corresponding map
575+
// indices and a shared row index. If baseLayerOnly is true then only the first
576+
// baseRowLength entries are returned.
577+
func (f *FilterMaps) getFilterMapRows(mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) ([]FilterRow, error) {
578+
rows := make([]FilterRow, len(mapIndices))
579+
var ptr int
580+
for len(mapIndices) > ptr {
581+
baseRowGroup := mapIndices[ptr] / f.baseRowGroupLength
582+
groupLength := 1
583+
for ptr+groupLength < len(mapIndices) && mapIndices[ptr+groupLength]/f.baseRowGroupLength == baseRowGroup {
584+
groupLength++
588585
}
589-
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
590-
}
591-
baseRow := slices.Clone(baseRows[mapIndex&(f.baseRowGroupLength-1)])
592-
if baseLayerOnly {
593-
return baseRow, nil
586+
if err := f.getFilterMapRowsOfGroup(rows[ptr:ptr+groupLength], mapIndices[ptr:ptr+groupLength], rowIndex, baseLayerOnly); err != nil {
587+
return nil, err
588+
}
589+
ptr += groupLength
594590
}
595-
extRow, err := rawdb.ReadFilterMapExtRow(f.db, f.mapRowIndex(mapIndex, rowIndex), f.logMapWidth)
591+
return rows, nil
592+
}
593+
594+
// getFilterMapRowsOfGroup fetches a set of filter map rows at map indices
595+
// belonging to the same base row group.
596+
func (f *FilterMaps) getFilterMapRowsOfGroup(target []FilterRow, mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) error {
597+
baseRowGroup := mapIndices[0] / f.baseRowGroupLength
598+
baseMapRowIndex := f.mapRowIndex(baseRowGroup*f.baseRowGroupLength, rowIndex)
599+
baseRows, err := rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
596600
if err != nil {
597-
return nil, fmt.Errorf("failed to retrieve filter map %d extended row %d: %v", mapIndex, rowIndex, err)
601+
return fmt.Errorf("failed to retrieve base row group %d of row %d: %v", baseRowGroup, rowIndex, err)
598602
}
599-
return FilterRow(append(baseRow, extRow...)), nil
603+
for i, mapIndex := range mapIndices {
604+
if mapIndex/f.baseRowGroupLength != baseRowGroup {
605+
panic("mapIndices are not in the same base row group")
606+
}
607+
row := baseRows[mapIndex&(f.baseRowGroupLength-1)]
608+
if !baseLayerOnly {
609+
extRow, err := rawdb.ReadFilterMapExtRow(f.db, f.mapRowIndex(mapIndex, rowIndex), f.logMapWidth)
610+
if err != nil {
611+
return fmt.Errorf("failed to retrieve filter map %d extended row %d: %v", mapIndex, rowIndex, err)
612+
}
613+
row = append(row, extRow...)
614+
}
615+
target[i] = row
616+
}
617+
return nil
600618
}
601619

602620
// storeFilterMapRows stores a set of filter map rows at the corresponding map
603621
// indices and a shared row index.
604622
func (f *FilterMaps) storeFilterMapRows(batch ctxcdb.Batch, mapIndices []uint32, rowIndex uint32, rows []FilterRow) error {
605623
for len(mapIndices) > 0 {
606-
baseMapIndex := mapIndices[0] & -f.baseRowGroupLength
624+
baseRowGroup := mapIndices[0] / f.baseRowGroupLength
607625
groupLength := 1
608-
for groupLength < len(mapIndices) && mapIndices[groupLength]&-f.baseRowGroupLength == baseMapIndex {
626+
for groupLength < len(mapIndices) && mapIndices[groupLength]/f.baseRowGroupLength == baseRowGroup {
609627
groupLength++
610628
}
611629
if err := f.storeFilterMapRowsOfGroup(batch, mapIndices[:groupLength], rowIndex, rows[:groupLength]); err != nil {
@@ -619,26 +637,20 @@ func (f *FilterMaps) storeFilterMapRows(batch ctxcdb.Batch, mapIndices []uint32,
619637
// storeFilterMapRowsOfGroup stores a set of filter map rows at map indices
620638
// belonging to the same base row group.
621639
func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ctxcdb.Batch, mapIndices []uint32, rowIndex uint32, rows []FilterRow) error {
622-
baseMapIndex := mapIndices[0] & -f.baseRowGroupLength
623-
baseMapRowIndex := f.mapRowIndex(baseMapIndex, rowIndex)
640+
baseRowGroup := mapIndices[0] / f.baseRowGroupLength
641+
baseMapRowIndex := f.mapRowIndex(baseRowGroup*f.baseRowGroupLength, rowIndex)
624642
var baseRows [][]uint32
625643
if uint32(len(mapIndices)) != f.baseRowGroupLength { // skip base rows read if all rows are replaced
626-
var ok bool
627-
baseRows, ok = f.baseRowsCache.Get(baseMapRowIndex)
628-
if ok {
629-
baseRows = slices.Clone(baseRows)
630-
} else {
631-
var err error
632-
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
633-
if err != nil {
634-
return fmt.Errorf("failed to retrieve filter map %d base rows %d for modification: %v", mapIndices[0]&-f.baseRowGroupLength, rowIndex, err)
635-
}
644+
var err error
645+
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
646+
if err != nil {
647+
return fmt.Errorf("failed to retrieve base row group %d of row %d for modification: %v", baseRowGroup, rowIndex, err)
636648
}
637649
} else {
638650
baseRows = make([][]uint32, f.baseRowGroupLength)
639651
}
640652
for i, mapIndex := range mapIndices {
641-
if mapIndex&-f.baseRowGroupLength != baseMapIndex {
653+
if mapIndex/f.baseRowGroupLength != baseRowGroup {
642654
panic("mapIndices are not in the same base row group")
643655
}
644656
baseRow := []uint32(rows[i])
@@ -650,7 +662,6 @@ func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ctxcdb.Batch, mapIndices []
650662
baseRows[mapIndex&(f.baseRowGroupLength-1)] = baseRow
651663
rawdb.WriteFilterMapExtRow(batch, f.mapRowIndex(mapIndex, rowIndex), extRow, f.logMapWidth)
652664
}
653-
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
654665
rawdb.WriteFilterMapBaseRows(batch, baseMapRowIndex, baseRows, f.logMapWidth)
655666
return nil
656667
}

core/filtermaps/matcher.go

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23-
"math"
2423
"sync"
2524
"sync/atomic"
2625
"time"
@@ -45,7 +44,7 @@ var ErrMatchAll = errors.New("match all patterns not supported")
4544
type MatcherBackend interface {
4645
GetParams() *Params
4746
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
48-
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error)
47+
GetFilterMapRows(ctx context.Context, mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) ([]FilterRow, error)
4948
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
5049
SyncLogIndex(ctx context.Context) (SyncRange, error)
5150
Close()
@@ -365,50 +364,57 @@ func (m *singleMatcherInstance) getMatchesForLayer(ctx context.Context, layerInd
365364
var st int
366365
m.stats.setState(&st, stOther)
367366
params := m.backend.GetParams()
368-
maskedMapIndex, rowIndex := uint32(math.MaxUint32), uint32(0)
369-
for _, mapIndex := range m.mapIndices {
370-
filterRows, ok := m.filterRows[mapIndex]
371-
if !ok {
372-
continue
373-
}
374-
if mm := params.maskedMapIndex(mapIndex, layerIndex); mm != maskedMapIndex {
375-
// only recalculate rowIndex when necessary
376-
maskedMapIndex = mm
377-
rowIndex = params.rowIndex(mapIndex, layerIndex, m.value)
367+
var ptr int
368+
for len(m.mapIndices) > ptr {
369+
// find next group of map indices mapped onto the same row
370+
maskedMapIndex := params.maskedMapIndex(m.mapIndices[ptr], layerIndex)
371+
rowIndex := params.rowIndex(m.mapIndices[ptr], layerIndex, m.value)
372+
groupLength := 1
373+
for ptr+groupLength < len(m.mapIndices) && params.maskedMapIndex(m.mapIndices[ptr+groupLength], layerIndex) == maskedMapIndex {
374+
groupLength++
378375
}
379376
if layerIndex == 0 {
380377
m.stats.setState(&st, stFetchFirst)
381378
} else {
382379
m.stats.setState(&st, stFetchMore)
383380
}
384-
filterRow, err := m.backend.GetFilterMapRow(ctx, mapIndex, rowIndex, layerIndex == 0)
381+
groupRows, err := m.backend.GetFilterMapRows(ctx, m.mapIndices[ptr:ptr+groupLength], rowIndex, layerIndex == 0)
385382
if err != nil {
386383
m.stats.setState(&st, stNone)
387-
return nil, fmt.Errorf("failed to retrieve filter map %d row %d: %v", mapIndex, rowIndex, err)
388-
}
389-
if layerIndex == 0 {
390-
matchBaseRowAccessMeter.Mark(1)
391-
matchBaseRowSizeMeter.Mark(int64(len(filterRow)))
392-
} else {
393-
matchExtRowAccessMeter.Mark(1)
394-
matchExtRowSizeMeter.Mark(int64(len(filterRow)))
384+
return nil, fmt.Errorf("failed to retrieve filter map %d row %d: %v", m.mapIndices[ptr], rowIndex, err)
395385
}
396-
m.stats.addAmount(st, int64(len(filterRow)))
397386
m.stats.setState(&st, stOther)
398-
filterRows = append(filterRows, filterRow)
399-
if uint32(len(filterRow)) < params.maxRowLength(layerIndex) {
400-
m.stats.setState(&st, stProcess)
401-
matches := params.potentialMatches(filterRows, mapIndex, m.value)
402-
m.stats.addAmount(st, int64(len(matches)))
403-
results = append(results, matcherResult{
404-
mapIndex: mapIndex,
405-
matches: matches,
406-
})
407-
m.stats.setState(&st, stOther)
408-
delete(m.filterRows, mapIndex)
409-
} else {
410-
m.filterRows[mapIndex] = filterRows
387+
for i := range groupLength {
388+
mapIndex := m.mapIndices[ptr+i]
389+
filterRow := groupRows[i]
390+
filterRows, ok := m.filterRows[mapIndex]
391+
if !ok {
392+
panic("dropped map in mapIndices")
393+
}
394+
if layerIndex == 0 {
395+
matchBaseRowAccessMeter.Mark(1)
396+
matchBaseRowSizeMeter.Mark(int64(len(filterRow)))
397+
} else {
398+
matchExtRowAccessMeter.Mark(1)
399+
matchExtRowSizeMeter.Mark(int64(len(filterRow)))
400+
}
401+
m.stats.addAmount(st, int64(len(filterRow)))
402+
filterRows = append(filterRows, filterRow)
403+
if uint32(len(filterRow)) < params.maxRowLength(layerIndex) {
404+
m.stats.setState(&st, stProcess)
405+
matches := params.potentialMatches(filterRows, mapIndex, m.value)
406+
m.stats.addAmount(st, int64(len(matches)))
407+
results = append(results, matcherResult{
408+
mapIndex: mapIndex,
409+
matches: matches,
410+
})
411+
m.stats.setState(&st, stOther)
412+
delete(m.filterRows, mapIndex)
413+
} else {
414+
m.filterRows[mapIndex] = filterRows
415+
}
411416
}
417+
ptr += groupLength
412418
}
413419
m.cleanMapIndices()
414420
m.stats.setState(&st, stNone)

core/filtermaps/matcher_backend.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,15 @@ func (fm *FilterMapsMatcherBackend) Close() {
6767
delete(fm.f.matchers, fm)
6868
}
6969

70-
// GetFilterMapRow returns the given row of the given map. If the row is empty
70+
// GetFilterMapRows returns the given row of the given map. If the row is empty
7171
// then a non-nil zero length row is returned. If baseLayerOnly is true then
7272
// only the first baseRowLength entries of the row are guaranteed to be
7373
// returned.
7474
// Note that the returned slices should not be modified, they should be copied
7575
// on write.
76-
// GetFilterMapRow implements MatcherBackend.
77-
func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) {
78-
fm.f.indexLock.RLock()
79-
defer fm.f.indexLock.RUnlock()
80-
81-
return fm.f.getFilterMapRow(mapIndex, rowIndex, baseLayerOnly)
76+
// GetFilterMapRows implements MatcherBackend.
77+
func (fm *FilterMapsMatcherBackend) GetFilterMapRows(ctx context.Context, mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) ([]FilterRow, error) {
78+
return fm.f.getFilterMapRows(mapIndices, rowIndex, baseLayerOnly)
8279
}
8380

8481
// GetBlockLvPointer returns the starting log value index where the log values

0 commit comments

Comments
 (0)