Skip to content

core/filtermaps: clone cached slices, fix tempRange #31680

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type FilterMaps struct {

// test hooks
testDisableSnapshots, testSnapshotUsed bool
testProcessEventsHook func()
}

// filterMap is a full or partial in-memory representation of a filter map where
Expand Down Expand Up @@ -573,7 +574,7 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32, baseLayerOnly bo
}
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
}
baseRow := baseRows[mapIndex&(f.baseRowGroupLength-1)]
baseRow := slices.Clone(baseRows[mapIndex&(f.baseRowGroupLength-1)])
if baseLayerOnly {
return baseRow, nil
}
Expand Down Expand Up @@ -610,7 +611,9 @@ func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []u
if uint32(len(mapIndices)) != f.baseRowGroupLength { // skip base rows read if all rows are replaced
var ok bool
baseRows, ok = f.baseRowsCache.Get(baseMapRowIndex)
if !ok {
if ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also clone the slice in getFilterMapRow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do clone baseRow and if required then we append extRow which is always retrieved from the database so it's guaranteed that the returned result does not share an underlying array with anything.

baseRows = slices.Clone(baseRows)
} else {
var err error
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
if err != nil {
Expand Down Expand Up @@ -656,7 +659,7 @@ func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 {
// called from outside the indexerLoop goroutine.
func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) {
if blockNumber >= f.indexedRange.blocks.AfterLast() && f.indexedRange.headIndexed {
return f.indexedRange.headDelimiter, nil
return f.indexedRange.headDelimiter + 1, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we error out if blockNumber > f.indexedRange.blocks.AfterLast()?

Theoretically, the returned log index is for the next block and it must be consecutive with the latest indexed one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the search works async (range might change any time during the process) the underlying functions of the matcher backend do not fail when they are queried out of range, they just return results so that the search will not find any additional items (the head part of the results will be invalidated anyways in this case). getLogByLvIndex also silently returns with no result end no errors.
getBlockLvPointer also just returns the beginning of the next future block, even if queried further in the future, so that the search will just search until the end of the indexed range. The point is that if it's a long search then the majority of results will still be valuable and the head part will be searched again anyways so just do something that lets the operation finish safely.
We could have a different mechanism but this seems to work safely for now and we have to do the fork release today so I would not change it fundamentally right now.

}
if lvPointer, ok := f.lvPointerCache.Get(blockNumber); ok {
return lvPointer, nil
Expand Down
3 changes: 3 additions & 0 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func (f *FilterMaps) waitForNewHead() {
// processEvents processes all events, blocking only if a block processing is
// happening and indexing should be suspended.
func (f *FilterMaps) processEvents() {
if f.testProcessEventsHook != nil {
f.testProcessEventsHook()
}
for f.processSingleEvent(f.blockProcessing) {
}
}
Expand Down
52 changes: 52 additions & 0 deletions core/filtermaps/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,58 @@ func testIndexerMatcherView(t *testing.T, concurrentRead bool) {
}
}

func TestLogsByIndex(t *testing.T) {
ts := newTestSetup(t)
defer func() {
ts.fm.testProcessEventsHook = nil
ts.close()
}()

ts.chain.addBlocks(1000, 10, 3, 4, true)
ts.setHistory(0, false)
ts.fm.WaitIdle()
firstLog := make([]uint64, 1001) // first valid log position per block
lastLog := make([]uint64, 1001) // last valid log position per block
for i := uint64(0); i <= ts.fm.indexedRange.headDelimiter; i++ {
log, err := ts.fm.getLogByLvIndex(i)
if err != nil {
t.Fatalf("Error getting log by index %d: %v", i, err)
}
if log != nil {
if firstLog[log.BlockNumber] == 0 {
firstLog[log.BlockNumber] = i
}
lastLog[log.BlockNumber] = i
}
}
var failed bool
ts.fm.testProcessEventsHook = func() {
if ts.fm.indexedRange.blocks.IsEmpty() {
return
}
if lvi := firstLog[ts.fm.indexedRange.blocks.First()]; lvi != 0 {
log, err := ts.fm.getLogByLvIndex(lvi)
if log == nil || err != nil {
t.Errorf("Error getting first log of indexed block range: %v", err)
failed = true
}
}
if lvi := lastLog[ts.fm.indexedRange.blocks.Last()]; lvi != 0 {
log, err := ts.fm.getLogByLvIndex(lvi)
if log == nil || err != nil {
t.Errorf("Error getting last log of indexed block range: %v", err)
failed = true
}
}
}
chain := ts.chain.getCanonicalChain()
for i := 0; i < 1000 && !failed; i++ {
head := rand.Intn(len(chain))
ts.chain.setCanonicalChain(chain[:head+1])
ts.fm.WaitIdle()
}
}

func TestIndexerCompareDb(t *testing.T) {
ts := newTestSetup(t)
defer ts.close()
Expand Down
6 changes: 4 additions & 2 deletions core/filtermaps/map_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -107,7 +108,7 @@ func (f *FilterMaps) renderMapsFromSnapshot(cp *renderedMap) (*mapRenderer, erro
filterMap: cp.filterMap.fullCopy(),
mapIndex: cp.mapIndex,
lastBlock: cp.lastBlock,
blockLvPtrs: cp.blockLvPtrs,
blockLvPtrs: slices.Clone(cp.blockLvPtrs),
},
finishedMaps: make(map[uint32]*renderedMap),
finished: common.NewRange(cp.mapIndex, 0),
Expand Down Expand Up @@ -244,7 +245,7 @@ func (f *FilterMaps) loadHeadSnapshot() error {
}
}
f.renderSnapshots.Add(f.indexedRange.blocks.Last(), &renderedMap{
filterMap: fm,
filterMap: fm.fullCopy(),
mapIndex: f.indexedRange.maps.Last(),
lastBlock: f.indexedRange.blocks.Last(),
lastBlockId: f.indexedView.BlockId(f.indexedRange.blocks.Last()),
Expand Down Expand Up @@ -536,6 +537,7 @@ func (r *mapRenderer) getTempRange() (filterMapsRange, error) {
} else {
tempRange.blocks.SetAfterLast(0)
}
tempRange.headIndexed = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it the real root cause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

tempRange.headDelimiter = 0
}
return tempRange, nil
Expand Down