Skip to content

Commit 6beee4b

Browse files
zsfelfoldi0g-wh
authored andcommitted
core/filtermaps: clone cached slices, fix tempRange (ethereum#31680)
This PR ensures that caching a slice or a slice of slices will never affect the original version by always cloning a slice fetched from cache if it is not used in a guaranteed read only way.
1 parent fa83020 commit 6beee4b

File tree

4 files changed

+65
-5
lines changed

4 files changed

+65
-5
lines changed

core/filtermaps/filtermaps.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ type FilterMaps struct {
128128

129129
// test hooks
130130
testDisableSnapshots, testSnapshotUsed bool
131+
testProcessEventsHook func()
131132
}
132133

133134
// filterMap is a full or partial in-memory representation of a filter map where
@@ -573,7 +574,7 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32, baseLayerOnly bo
573574
}
574575
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
575576
}
576-
baseRow := baseRows[mapIndex&(f.baseRowGroupLength-1)]
577+
baseRow := slices.Clone(baseRows[mapIndex&(f.baseRowGroupLength-1)])
577578
if baseLayerOnly {
578579
return baseRow, nil
579580
}
@@ -610,7 +611,9 @@ func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []u
610611
if uint32(len(mapIndices)) != f.baseRowGroupLength { // skip base rows read if all rows are replaced
611612
var ok bool
612613
baseRows, ok = f.baseRowsCache.Get(baseMapRowIndex)
613-
if !ok {
614+
if ok {
615+
baseRows = slices.Clone(baseRows)
616+
} else {
614617
var err error
615618
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
616619
if err != nil {
@@ -656,7 +659,7 @@ func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 {
656659
// called from outside the indexerLoop goroutine.
657660
func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) {
658661
if blockNumber >= f.indexedRange.blocks.AfterLast() && f.indexedRange.headIndexed {
659-
return f.indexedRange.headDelimiter, nil
662+
return f.indexedRange.headDelimiter + 1, nil
660663
}
661664
if lvPointer, ok := f.lvPointerCache.Get(blockNumber); ok {
662665
return lvPointer, nil

core/filtermaps/indexer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ func (f *FilterMaps) waitForNewHead() {
165165
// processEvents processes all events, blocking only if a block processing is
166166
// happening and indexing should be suspended.
167167
func (f *FilterMaps) processEvents() {
168+
if f.testProcessEventsHook != nil {
169+
f.testProcessEventsHook()
170+
}
168171
for f.processSingleEvent(f.blockProcessing) {
169172
}
170173
}

core/filtermaps/indexer_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,58 @@ func testIndexerMatcherView(t *testing.T, concurrentRead bool) {
219219
}
220220
}
221221

222+
func TestLogsByIndex(t *testing.T) {
223+
ts := newTestSetup(t)
224+
defer func() {
225+
ts.fm.testProcessEventsHook = nil
226+
ts.close()
227+
}()
228+
229+
ts.chain.addBlocks(1000, 10, 3, 4, true)
230+
ts.setHistory(0, false)
231+
ts.fm.WaitIdle()
232+
firstLog := make([]uint64, 1001) // first valid log position per block
233+
lastLog := make([]uint64, 1001) // last valid log position per block
234+
for i := uint64(0); i <= ts.fm.indexedRange.headDelimiter; i++ {
235+
log, err := ts.fm.getLogByLvIndex(i)
236+
if err != nil {
237+
t.Fatalf("Error getting log by index %d: %v", i, err)
238+
}
239+
if log != nil {
240+
if firstLog[log.BlockNumber] == 0 {
241+
firstLog[log.BlockNumber] = i
242+
}
243+
lastLog[log.BlockNumber] = i
244+
}
245+
}
246+
var failed bool
247+
ts.fm.testProcessEventsHook = func() {
248+
if ts.fm.indexedRange.blocks.IsEmpty() {
249+
return
250+
}
251+
if lvi := firstLog[ts.fm.indexedRange.blocks.First()]; lvi != 0 {
252+
log, err := ts.fm.getLogByLvIndex(lvi)
253+
if log == nil || err != nil {
254+
t.Errorf("Error getting first log of indexed block range: %v", err)
255+
failed = true
256+
}
257+
}
258+
if lvi := lastLog[ts.fm.indexedRange.blocks.Last()]; lvi != 0 {
259+
log, err := ts.fm.getLogByLvIndex(lvi)
260+
if log == nil || err != nil {
261+
t.Errorf("Error getting last log of indexed block range: %v", err)
262+
failed = true
263+
}
264+
}
265+
}
266+
chain := ts.chain.getCanonicalChain()
267+
for i := 0; i < 1000 && !failed; i++ {
268+
head := rand.Intn(len(chain))
269+
ts.chain.setCanonicalChain(chain[:head+1])
270+
ts.fm.WaitIdle()
271+
}
272+
}
273+
222274
func TestIndexerCompareDb(t *testing.T) {
223275
ts := newTestSetup(t)
224276
defer ts.close()

core/filtermaps/map_renderer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"math"
23+
"slices"
2324
"sort"
2425
"time"
2526

@@ -107,7 +108,7 @@ func (f *FilterMaps) renderMapsFromSnapshot(cp *renderedMap) (*mapRenderer, erro
107108
filterMap: cp.filterMap.fullCopy(),
108109
mapIndex: cp.mapIndex,
109110
lastBlock: cp.lastBlock,
110-
blockLvPtrs: cp.blockLvPtrs,
111+
blockLvPtrs: slices.Clone(cp.blockLvPtrs),
111112
},
112113
finishedMaps: make(map[uint32]*renderedMap),
113114
finished: common.NewRange(cp.mapIndex, 0),
@@ -244,7 +245,7 @@ func (f *FilterMaps) loadHeadSnapshot() error {
244245
}
245246
}
246247
f.renderSnapshots.Add(f.indexedRange.blocks.Last(), &renderedMap{
247-
filterMap: fm,
248+
filterMap: fm.fullCopy(),
248249
mapIndex: f.indexedRange.maps.Last(),
249250
lastBlock: f.indexedRange.blocks.Last(),
250251
lastBlockId: f.indexedView.BlockId(f.indexedRange.blocks.Last()),
@@ -536,6 +537,7 @@ func (r *mapRenderer) getTempRange() (filterMapsRange, error) {
536537
} else {
537538
tempRange.blocks.SetAfterLast(0)
538539
}
540+
tempRange.headIndexed = false
539541
tempRange.headDelimiter = 0
540542
}
541543
return tempRange, nil

0 commit comments

Comments
 (0)