Skip to content

core/filtermaps: fix map renderer reorg issue #31642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var (
)

const (
databaseVersion = 1 // reindexed if database version does not match
cachedLastBlocks = 1000 // last block of map pointers
cachedLvPointers = 1000 // first log value pointer of block pointers
cachedBaseRows = 100 // groups of base layer filter row data
Expand Down Expand Up @@ -138,16 +139,31 @@ type FilterMaps struct {
// as transparent (uncached/unchanged).
type filterMap []FilterRow

// copy returns a copy of the given filter map. Note that the row slices are
// copied but their contents are not. This permits extending the rows further
// fastCopy returns a copy of the given filter map. Note that the row slices are
// copied but their contents are not. This permits appending to the rows further
// (which happens during map rendering) without affecting the validity of
// copies made for snapshots during rendering.
func (fm filterMap) copy() filterMap {
// Appending to the rows of both the original map and the fast copy, or two fast
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modification to the fast copy will affect the original map;
Appending to the fast copy won't affect the original map, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically true but the point is you should not append to both as they share the underlying array and would corrupt each other. In my use case the original keeps being appended as it is the current rendered map and snapsnots are made during the process. So I defined the semantics of fastCopy so that the original can be appended to and the copy should be used as read only. This rule guarantees safety.

// copies of the same map would result in data corruption, therefore a fast copy
// should always be used in a read only way.
func (fm filterMap) fastCopy() filterMap {
c := make(filterMap, len(fm))
copy(c, fm)
return c
}

// fullCopy returns a copy of the given filter map, also making a copy of each
// individual filter row, ensuring that a modification to either one will never
// affect the other.
func (fm filterMap) fullCopy() filterMap {
c := make(filterMap, len(fm))
for i, row := range fm {
c[i] = make(FilterRow, len(row))
copy(c[i], row)
}
return c
}

// FilterRow encodes a single row of a filter map as a list of column indices.
// Note that the values are always stored in the same order as they were added
// and if the same column index is added twice, it is also stored twice.
Expand Down Expand Up @@ -207,8 +223,9 @@ type Config struct {
// NewFilterMaps creates a new FilterMaps and starts the indexer.
func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, finalBlock uint64, params Params, config Config) *FilterMaps {
rs, initialized, err := rawdb.ReadFilterMapsRange(db)
if err != nil {
log.Error("Error reading log index range", "error", err)
if err != nil || rs.Version != databaseVersion {
rs, initialized = rawdb.FilterMapsRange{}, false
log.Warn("Invalid log index database version; resetting log index")
}
params.deriveFields()
f := &FilterMaps{
Expand Down Expand Up @@ -437,6 +454,7 @@ func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, ne
f.updateMatchersValidRange()
if newRange.initialized {
rs := rawdb.FilterMapsRange{
Version: databaseVersion,
HeadIndexed: newRange.headIndexed,
HeadDelimiter: newRange.headDelimiter,
BlocksFirst: newRange.blocks.First(),
Expand Down
110 changes: 110 additions & 0 deletions core/filtermaps/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package filtermaps

import (
"context"
crand "crypto/rand"
"crypto/sha256"
"encoding/binary"
"math/big"
"math/rand"
"sync"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)

var testParams = Params{
Expand Down Expand Up @@ -104,6 +107,7 @@ func TestIndexerRandomRange(t *testing.T) {
fork, head = rand.Intn(len(forks)), rand.Intn(1001)
ts.chain.setCanonicalChain(forks[fork][:head+1])
case 2:
checkSnapshot = false
if head < 1000 {
checkSnapshot = !noHistory && head != 0 // no snapshot generated for block 0
// add blocks after the current head
Expand Down Expand Up @@ -158,6 +162,63 @@ func TestIndexerRandomRange(t *testing.T) {
}
}

func TestIndexerMatcherView(t *testing.T) {
testIndexerMatcherView(t, false)
}

func TestIndexerMatcherViewWithConcurrentRead(t *testing.T) {
testIndexerMatcherView(t, true)
}

func testIndexerMatcherView(t *testing.T, concurrentRead bool) {
ts := newTestSetup(t)
defer ts.close()

forks := make([][]common.Hash, 20)
hashes := make([]common.Hash, 20)
ts.chain.addBlocks(100, 5, 2, 4, true)
ts.setHistory(0, false)
for i := range forks {
if i != 0 {
ts.chain.setHead(100 - i)
ts.chain.addBlocks(i, 5, 2, 4, true)
}
ts.fm.WaitIdle()
forks[i] = ts.chain.getCanonicalChain()
hashes[i] = ts.matcherViewHash()
}
fork := len(forks) - 1
for i := 0; i < 5000; i++ {
oldFork := fork
fork = rand.Intn(len(forks))
stopCh := make(chan chan struct{})
if concurrentRead {
go func() {
for {
ts.matcherViewHash()
select {
case ch := <-stopCh:
close(ch)
return
default:
}
}
}()
}
ts.chain.setCanonicalChain(forks[fork])
ts.fm.WaitIdle()
if concurrentRead {
ch := make(chan struct{})
stopCh <- ch
<-ch
}
hash := ts.matcherViewHash()
if hash != hashes[fork] {
t.Fatalf("Matcher view hash mismatch when switching from for %d to %d", oldFork, fork)
}
}
}

func TestIndexerCompareDb(t *testing.T) {
ts := newTestSetup(t)
defer ts.close()
Expand Down Expand Up @@ -291,6 +352,55 @@ func (ts *testSetup) fmDbHash() common.Hash {
return result
}

func (ts *testSetup) matcherViewHash() common.Hash {
mb := ts.fm.NewMatcherBackend()
defer mb.Close()

ctx := context.Background()
params := mb.GetParams()
hasher := sha256.New()
var headPtr uint64
for b := uint64(0); ; b++ {
lvptr, err := mb.GetBlockLvPointer(ctx, b)
if err != nil || (b > 0 && lvptr == headPtr) {
break
}
var enc [8]byte
binary.LittleEndian.PutUint64(enc[:], lvptr)
hasher.Write(enc[:])
headPtr = lvptr
}
headMap := uint32(headPtr >> params.logValuesPerMap)
var enc [12]byte
for r := uint32(0); r < params.mapHeight; r++ {
binary.LittleEndian.PutUint32(enc[:4], r)
for m := uint32(0); m <= headMap; m++ {
binary.LittleEndian.PutUint32(enc[4:8], m)
row, _ := mb.GetFilterMapRow(ctx, m, r, false)
for _, v := range row {
binary.LittleEndian.PutUint32(enc[8:], v)
hasher.Write(enc[:])
}
}
}
var hash common.Hash
hasher.Sum(hash[:0])
for i := 0; i < 50; i++ {
hasher.Reset()
hasher.Write(hash[:])
lvptr := binary.LittleEndian.Uint64(hash[:8]) % headPtr
if log, _ := mb.GetLogByLvIndex(ctx, lvptr); log != nil {
enc, err := rlp.EncodeToBytes(log)
if err != nil {
panic(err)
}
hasher.Write(enc)
}
hasher.Sum(hash[:0])
}
return hash
}

func (ts *testSetup) close() {
if ts.fm != nil {
ts.fm.Stop()
Expand Down
43 changes: 17 additions & 26 deletions core/filtermaps/map_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (f *FilterMaps) renderMapsBefore(renderBefore uint32) (*mapRenderer, error)
if err != nil {
return nil, err
}
if snapshot := f.lastCanonicalSnapshotBefore(renderBefore); snapshot != nil && snapshot.mapIndex >= nextMap {
if snapshot := f.lastCanonicalSnapshotOfMap(nextMap); snapshot != nil {
return f.renderMapsFromSnapshot(snapshot)
}
if nextMap >= renderBefore {
Expand All @@ -97,14 +97,14 @@ func (f *FilterMaps) renderMapsBefore(renderBefore uint32) (*mapRenderer, error)
// snapshot made at a block boundary.
func (f *FilterMaps) renderMapsFromSnapshot(cp *renderedMap) (*mapRenderer, error) {
f.testSnapshotUsed = true
iter, err := f.newLogIteratorFromBlockDelimiter(cp.lastBlock)
iter, err := f.newLogIteratorFromBlockDelimiter(cp.lastBlock, cp.headDelimiter)
if err != nil {
return nil, fmt.Errorf("failed to create log iterator from block delimiter %d: %v", cp.lastBlock, err)
}
return &mapRenderer{
f: f,
currentMap: &renderedMap{
filterMap: cp.filterMap.copy(),
filterMap: cp.filterMap.fullCopy(),
mapIndex: cp.mapIndex,
lastBlock: cp.lastBlock,
blockLvPtrs: cp.blockLvPtrs,
Expand Down Expand Up @@ -137,14 +137,14 @@ func (f *FilterMaps) renderMapsFromMapBoundary(firstMap, renderBefore uint32, st
}, nil
}

// lastCanonicalSnapshotBefore returns the latest cached snapshot that matches
// the current targetView.
func (f *FilterMaps) lastCanonicalSnapshotBefore(renderBefore uint32) *renderedMap {
// lastCanonicalSnapshotOfMap returns the latest cached snapshot of the given map
// that is also consistent with the current targetView.
func (f *FilterMaps) lastCanonicalSnapshotOfMap(mapIndex uint32) *renderedMap {
var best *renderedMap
for _, blockNumber := range f.renderSnapshots.Keys() {
if cp, _ := f.renderSnapshots.Get(blockNumber); cp != nil && blockNumber < f.indexedRange.blocks.AfterLast() &&
blockNumber <= f.targetView.headNumber && f.targetView.getBlockId(blockNumber) == cp.lastBlockId &&
cp.mapIndex < renderBefore && (best == nil || blockNumber > best.lastBlock) {
cp.mapIndex == mapIndex && (best == nil || blockNumber > best.lastBlock) {
best = cp
}
}
Expand All @@ -171,10 +171,9 @@ func (f *FilterMaps) lastCanonicalMapBoundaryBefore(renderBefore uint32) (nextMa
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to retrieve last block of reverse iterated map %d: %v", mapIndex, err)
}
if lastBlock >= f.indexedView.headNumber || lastBlock >= f.targetView.headNumber ||
lastBlockId != f.targetView.getBlockId(lastBlock) {
// map is not full or inconsistent with targetView; roll back
continue
if (f.indexedRange.headIndexed && mapIndex >= f.indexedRange.maps.Last()) ||
lastBlock >= f.targetView.headNumber || lastBlockId != f.targetView.getBlockId(lastBlock) {
continue // map is not full or inconsistent with targetView; roll back
}
lvPtr, err := f.getBlockLvPointer(lastBlock)
if err != nil {
Expand Down Expand Up @@ -257,10 +256,13 @@ func (f *FilterMaps) loadHeadSnapshot() error {

// makeSnapshot creates a snapshot of the current state of the rendered map.
func (r *mapRenderer) makeSnapshot() {
r.f.renderSnapshots.Add(r.iterator.blockNumber, &renderedMap{
filterMap: r.currentMap.filterMap.copy(),
if r.iterator.blockNumber != r.currentMap.lastBlock {
panic("iterator state inconsistent with last block")
}
r.f.renderSnapshots.Add(r.currentMap.lastBlock, &renderedMap{
filterMap: r.currentMap.filterMap.fastCopy(),
mapIndex: r.currentMap.mapIndex,
lastBlock: r.iterator.blockNumber,
lastBlock: r.currentMap.lastBlock,
lastBlockId: r.f.targetView.getBlockId(r.currentMap.lastBlock),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly it's a bit weird to get the block id with r.f.targetView, r.iterator.chainView might be a better candidate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it does look better. It is an assumption that the two should always be the same, guaranteed by iterator.updateChainView that is always called after r.f.targetView could have been changed. But it is better to explicitly check rather that quietly assume so I also added a check.

blockLvPtrs: r.currentMap.blockLvPtrs,
finished: true,
Expand Down Expand Up @@ -661,24 +663,13 @@ var errUnindexedRange = errors.New("unindexed range")
// newLogIteratorFromBlockDelimiter creates a logIterator starting at the
// given block's first log value entry (the block delimiter), according to the
// current targetView.
func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber uint64) (*logIterator, error) {
func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber, lvIndex uint64) (*logIterator, error) {
if blockNumber > f.targetView.headNumber {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", blockNumber, f.targetView.headNumber)
}
if !f.indexedRange.blocks.Includes(blockNumber) {
return nil, errUnindexedRange
}
var lvIndex uint64
if f.indexedRange.headIndexed && blockNumber+1 == f.indexedRange.blocks.AfterLast() {
lvIndex = f.indexedRange.headDelimiter
} else {
var err error
lvIndex, err = f.getBlockLvPointer(blockNumber + 1)
if err != nil {
return nil, fmt.Errorf("failed to retrieve log value pointer of block %d after delimiter: %v", blockNumber+1, err)
}
lvIndex--
}
finished := blockNumber == f.targetView.headNumber
l := &logIterator{
chainView: f.targetView,
Expand Down
3 changes: 3 additions & 0 deletions core/filtermaps/matcher_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (fm *FilterMapsMatcherBackend) Close() {
// on write.
// GetFilterMapRow implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) {
fm.f.indexLock.RLock()
defer fm.f.indexLock.RUnlock()

return fm.f.getFilterMapRow(mapIndex, rowIndex, baseLayerOnly)
}

Expand Down
1 change: 1 addition & 0 deletions core/rawdb/accessors_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func DeleteBlockLvPointers(db ethdb.KeyValueStore, blocks common.Range[uint64],
// FilterMapsRange is a storage representation of the block range covered by the
// filter maps structure and the corresponting log value index range.
type FilterMapsRange struct {
Version uint32
HeadIndexed bool
HeadDelimiter uint64
BlocksFirst, BlocksAfterLast uint64
Expand Down