From a12c5a20e2197e89715f236e716432bf3c4d80b1 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 31 Jul 2024 10:48:49 +0800 Subject: [PATCH 01/17] core/rawdb, triedb/pathdb: implement history indexer --- core/rawdb/accessors_history.go | 170 +++++++ core/rawdb/accessors_state.go | 31 ++ core/rawdb/database.go | 8 + core/rawdb/schema.go | 31 ++ triedb/pathdb/database.go | 19 + triedb/pathdb/database_test.go | 14 + triedb/pathdb/disklayer.go | 12 + triedb/pathdb/history.go | 42 +- triedb/pathdb/history_index.go | 433 ++++++++++++++++++ triedb/pathdb/history_index_block.go | 403 +++++++++++++++++ triedb/pathdb/history_index_block_test.go | 217 +++++++++ triedb/pathdb/history_index_test.go | 290 ++++++++++++ triedb/pathdb/history_indexer.go | 518 ++++++++++++++++++++++ triedb/pathdb/history_reader.go | 346 +++++++++++++++ triedb/pathdb/history_reader_test.go | 159 +++++++ triedb/pathdb/metrics.go | 6 + triedb/pathdb/reader.go | 109 +++++ 17 files changed, 2795 insertions(+), 13 deletions(-) create mode 100644 core/rawdb/accessors_history.go create mode 100644 triedb/pathdb/history_index.go create mode 100644 triedb/pathdb/history_index_block.go create mode 100644 triedb/pathdb/history_index_block_test.go create mode 100644 triedb/pathdb/history_index_test.go create mode 100644 triedb/pathdb/history_indexer.go create mode 100644 triedb/pathdb/history_reader.go create mode 100644 triedb/pathdb/history_reader_test.go diff --git a/core/rawdb/accessors_history.go b/core/rawdb/accessors_history.go new file mode 100644 index 000000000000..8940a7001327 --- /dev/null +++ b/core/rawdb/accessors_history.go @@ -0,0 +1,170 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "encoding/binary" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// ReadLastStateHistoryIndex retrieves the number of latest indexed state history. +func ReadLastStateHistoryIndex(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(headStateHistoryIndexKey) + if len(data) != 8 { + return nil + } + number := binary.BigEndian.Uint64(data) + return &number +} + +// WriteLastStateHistoryIndex stores the number of latest indexed state history +// into database. +func WriteLastStateHistoryIndex(db ethdb.KeyValueWriter, number uint64) { + if err := db.Put(headStateHistoryIndexKey, encodeBlockNumber(number)); err != nil { + log.Crit("Failed to store the state index tail", "err", err) + } +} + +// DeleteLastStateHistoryIndex removes the number of latest indexed state history. +func DeleteLastStateHistoryIndex(db ethdb.KeyValueWriter) { + if err := db.Delete(headStateHistoryIndexKey); err != nil { + log.Crit("Failed to delete the state index tail", "err", err) + } +} + +// ReadAccountHistoryIndex retrieves the account history index with the provided +// account address. +func ReadAccountHistoryIndex(db ethdb.KeyValueReader, address common.Address) []byte { + data, err := db.Get(accountHistoryIndexKey(address)) + if err != nil || len(data) == 0 { + return nil + } + return data +} + +// WriteAccountHistoryIndex writes the provided account history index into database. +func WriteAccountHistoryIndex(db ethdb.KeyValueWriter, address common.Address, data []byte) { + if err := db.Put(accountHistoryIndexKey(address), data); err != nil { + log.Crit("Failed to store account history index", "err", err) + } +} + +// DeleteAccountHistoryIndex deletes the specified account history index from +// the database. +func DeleteAccountHistoryIndex(db ethdb.KeyValueWriter, address common.Address) { + if err := db.Delete(accountHistoryIndexKey(address)); err != nil { + log.Crit("Failed to delete account history index", "err", err) + } +} + +// ReadStorageHistoryIndex retrieves the storage history index with the provided +// account address and storage key hash. +func ReadStorageHistoryIndex(db ethdb.KeyValueReader, address common.Address, storageHash common.Hash) []byte { + data, err := db.Get(storageHistoryIndexKey(address, storageHash)) + if err != nil || len(data) == 0 { + return nil + } + return data +} + +// WriteStorageHistoryIndex writes the provided storage history index into database. +func WriteStorageHistoryIndex(db ethdb.KeyValueWriter, address common.Address, storageHash common.Hash, data []byte) { + if err := db.Put(storageHistoryIndexKey(address, storageHash), data); err != nil { + log.Crit("Failed to store storage history index", "err", err) + } +} + +// DeleteStorageHistoryIndex deletes the specified state index from the database. +func DeleteStorageHistoryIndex(db ethdb.KeyValueWriter, address common.Address, storageHash common.Hash) { + if err := db.Delete(storageHistoryIndexKey(address, storageHash)); err != nil { + log.Crit("Failed to delete storage history index", "err", err) + } +} + +// ReadAccountHistoryIndexBlock retrieves the index block with the provided +// account address along with the block id. +func ReadAccountHistoryIndexBlock(db ethdb.KeyValueReader, address common.Address, blockID uint32) []byte { + data, err := db.Get(accountHistoryIndexBlockKey(address, blockID)) + if err != nil || len(data) == 0 { + return nil + } + return data +} + +// WriteAccountHistoryIndexBlock writes the provided index block into database. +func WriteAccountHistoryIndexBlock(db ethdb.KeyValueWriter, address common.Address, blockID uint32, data []byte) { + if err := db.Put(accountHistoryIndexBlockKey(address, blockID), data); err != nil { + log.Crit("Failed to store account index block", "err", err) + } +} + +// DeleteAccountHistoryIndexBlock deletes the specified index block from the database. +func DeleteAccountHistoryIndexBlock(db ethdb.KeyValueWriter, address common.Address, blockID uint32) { + if err := db.Delete(accountHistoryIndexBlockKey(address, blockID)); err != nil { + log.Crit("Failed to delete account index block", "err", err) + } +} + +// ReadStorageHistoryIndexBlock retrieves the index block with the provided state +// identifier along with the block id. +func ReadStorageHistoryIndexBlock(db ethdb.KeyValueReader, address common.Address, storageHash common.Hash, blockID uint32) []byte { + data, err := db.Get(storageHistoryIndexBlockKey(address, storageHash, blockID)) + if err != nil || len(data) == 0 { + return nil + } + return data +} + +// WriteStorageHistoryIndexBlock writes the provided index block into database. +func WriteStorageHistoryIndexBlock(db ethdb.KeyValueWriter, address common.Address, storageHash common.Hash, id uint32, data []byte) { + if err := db.Put(storageHistoryIndexBlockKey(address, storageHash, id), data); err != nil { + log.Crit("Failed to store storage index block", "err", err) + } +} + +// DeleteStorageHistoryIndexBlock deletes the specified index block from the database. +func DeleteStorageHistoryIndexBlock(db ethdb.KeyValueWriter, address common.Address, state common.Hash, id uint32) { + if err := db.Delete(storageHistoryIndexBlockKey(address, state, id)); err != nil { + log.Crit("Failed to delete storage index block", "err", err) + } +} + +// increaseKey increase the input key by one bit. Return nil if the entire +// addition operation overflows. +func increaseKey(key []byte) []byte { + for i := len(key) - 1; i >= 0; i-- { + key[i]++ + if key[i] != 0x0 { + return key + } + } + return nil +} + +// DeleteHistoryIndex completely removes all history indexing data, including indexes +// for accounts and storages. +// +// Note, this method assumes the storage space with prefix `StateHistoryIndexPrefix` +// is exclusively occupied by the history indexing data! +func DeleteHistoryIndex(db ethdb.KeyValueRangeDeleter) { + if err := db.DeleteRange(StateHistoryIndexPrefix, increaseKey(StateHistoryIndexPrefix)); err != nil { + log.Crit("Failed to delete history index range", "err", err) + } +} diff --git a/core/rawdb/accessors_state.go b/core/rawdb/accessors_state.go index 41e15debe94d..7d7b37641bf2 100644 --- a/core/rawdb/accessors_state.go +++ b/core/rawdb/accessors_state.go @@ -18,6 +18,7 @@ package rawdb import ( "encoding/binary" + "errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" @@ -255,6 +256,36 @@ func ReadStateHistory(db ethdb.AncientReaderOp, id uint64) ([]byte, []byte, []by return meta, accountIndex, storageIndex, accountData, storageData, nil } +// ReadStateHistoryList retrieves a list of state histories from database with +// specific range. Compute the position of state history in freezer by minus one +// since the id of first state history starts from one(zero for initial state). +func ReadStateHistoryList(db ethdb.AncientReaderOp, start uint64, count uint64) ([][]byte, [][]byte, [][]byte, [][]byte, [][]byte, error) { + metaList, err := db.AncientRange(stateHistoryMeta, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + aIndexList, err := db.AncientRange(stateHistoryAccountIndex, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + sIndexList, err := db.AncientRange(stateHistoryStorageIndex, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + aDataList, err := db.AncientRange(stateHistoryAccountData, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + sDataList, err := db.AncientRange(stateHistoryStorageData, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + if len(metaList) != len(aIndexList) || len(metaList) != len(sIndexList) || len(metaList) != len(aDataList) || len(metaList) != len(sDataList) { + return nil, nil, nil, nil, nil, errors.New("state history is corrupted") + } + return metaList, aIndexList, sIndexList, aDataList, sDataList, nil +} + // WriteStateHistory writes the provided state history to database. Compute the // position of state history in freezer by minus one since the id of first state // history starts from one(zero for initial state). diff --git a/core/rawdb/database.go b/core/rawdb/database.go index a03dbafb1f4f..7f6c3808adaa 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -387,6 +387,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { filterMapLastBlock stat filterMapBlockLV stat + // Path-mode archive data + stateIndex stat + // Verkle statistics verkleTries stat verkleStateLookups stat @@ -464,6 +467,10 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { case bytes.HasPrefix(key, bloomBitsMetaPrefix) && len(key) < len(bloomBitsMetaPrefix)+8: bloomBits.Add(size) + // Path-based historic state indexes + case bytes.HasPrefix(key, StateHistoryIndexPrefix) && len(key) >= len(StateHistoryIndexPrefix)+common.AddressLength: + stateIndex.Add(size) + // Verkle trie data is detected, determine the sub-category case bytes.HasPrefix(key, VerklePrefix): remain := key[len(VerklePrefix):] @@ -519,6 +526,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { {"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()}, {"Key-Value store", "Path trie account nodes", accountTries.Size(), accountTries.Count()}, {"Key-Value store", "Path trie storage nodes", storageTries.Size(), storageTries.Count()}, + {"Key-Value store", "Path state history indexes", stateIndex.Size(), stateIndex.Count()}, {"Key-Value store", "Verkle trie nodes", verkleTries.Size(), verkleTries.Count()}, {"Key-Value store", "Verkle trie state lookups", verkleStateLookups.Size(), verkleStateLookups.Count()}, {"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()}, diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index fa125cecc053..864a65ceeb49 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -76,6 +76,10 @@ var ( // trieJournalKey tracks the in-memory trie node layers across restarts. trieJournalKey = []byte("TrieJournal") + // headStateHistoryIndexKey tracks the ID of the latest state history that has + // been indexed. + headStateHistoryIndexKey = []byte("LastStateHistoryIndex") + // txIndexTailKey tracks the oldest block whose transactions have been indexed. txIndexTailKey = []byte("TransactionIndexTail") @@ -117,6 +121,9 @@ var ( TrieNodeStoragePrefix = []byte("O") // TrieNodeStoragePrefix + accountHash + hexPath -> trie node stateIDPrefix = []byte("L") // stateIDPrefix + state root -> state id + // State history indexing within path-based storage scheme + StateHistoryIndexPrefix = []byte("m") // StateHistoryIndexPrefix + account address or (account address + slotHash) -> index + // VerklePrefix is the database prefix for Verkle trie data, which includes: // (a) Trie nodes // (b) In-memory trie node journal @@ -362,3 +369,27 @@ func filterMapBlockLVKey(number uint64) []byte { binary.BigEndian.PutUint64(key[l:], number) return key } + +// accountHistoryIndexKey = StateHistoryIndexPrefix + address +func accountHistoryIndexKey(address common.Address) []byte { + return append(StateHistoryIndexPrefix, address.Bytes()...) +} + +// storageHistoryIndexKey = StateHistoryIndexPrefix + address + storageHash +func storageHistoryIndexKey(address common.Address, storageHash common.Hash) []byte { + return append(append(StateHistoryIndexPrefix, address.Bytes()...), storageHash.Bytes()...) +} + +// accountHistoryIndexBlockKey = StateHistoryIndexPrefix + address + blockID +func accountHistoryIndexBlockKey(address common.Address, blockID uint32) []byte { + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], blockID) + return append(append(StateHistoryIndexPrefix, address.Bytes()...), buf[:]...) +} + +// storageHistoryIndexBlockKey = StateHistoryIndexPrefix + address + storageHash + blockID +func storageHistoryIndexBlockKey(address common.Address, storageHash common.Hash, blockID uint32) []byte { + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], blockID) + return append(append(append(StateHistoryIndexPrefix, address.Bytes()...), storageHash.Bytes()...), buf[:]...) +} diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 3174a7c964b3..c694775ae3da 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -209,6 +209,7 @@ type Database struct { tree *layerTree // The group for all known layers freezer ethdb.ResettableAncientStore // Freezer for storing trie histories, nil possible in tests lock sync.RWMutex // Lock to prevent mutations from happening at the same time + indexer *historyIndexer // History indexer } // New attempts to load an already existing layer from a persistent key-value @@ -258,6 +259,10 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database { if err := db.setStateGenerator(); err != nil { log.Crit("Failed to setup the generator", "err", err) } + // TODO (rjl493456442) disable the background indexing in read-only mode + if db.freezer != nil { + db.indexer = newHistoryIndexer(db.diskdb, db.freezer, db.tree.bottom().stateID()) + } fields := config.fields() if db.isVerkle { fields = append(fields, "verkle", true) @@ -295,6 +300,11 @@ func (db *Database) repairHistory() error { log.Crit("Failed to retrieve head of state history", "err", err) } if frozen != 0 { + // TODO(rjl493456442) would be better to group them into a batch. + // + // Purge all state history indexing data first + rawdb.DeleteLastStateHistoryIndex(db.diskdb) + rawdb.DeleteHistoryIndex(db.diskdb) err := db.freezer.Reset() if err != nil { log.Crit("Failed to reset state histories", "err", err) @@ -477,6 +487,11 @@ func (db *Database) Enable(root common.Hash) error { // mappings can be huge and might take a while to clear // them, just leave them in disk and wait for overwriting. if db.freezer != nil { + // TODO(rjl493456442) would be better to group them into a batch. + // + // Purge all state history indexing data first + rawdb.DeleteLastStateHistoryIndex(db.diskdb) + rawdb.DeleteHistoryIndex(db.diskdb) if err := db.freezer.Reset(); err != nil { return err } @@ -599,6 +614,10 @@ func (db *Database) Close() error { } disk.resetCache() // release the memory held by clean cache + // Terminate the background state history indexer + if db.indexer != nil { + db.indexer.close() + } // Close the attached state history freezer. if db.freezer == nil { return nil diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 3b780c975da9..9f681d308bb5 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -163,6 +163,20 @@ func (t *tester) hashPreimage(hash common.Hash) common.Hash { return common.BytesToHash(t.preimages[hash]) } +func (t *tester) extend(layers int) { + for i := 0; i < layers; i++ { + var parent = types.EmptyRootHash + if len(t.roots) != 0 { + parent = t.roots[len(t.roots)-1] + } + root, nodes, states := t.generate(parent, true) + if err := t.db.Update(root, parent, uint64(i), nodes, states); err != nil { + panic(fmt.Errorf("failed to update state changes, err: %w", err)) + } + t.roots = append(t.roots, root) + } +} + func (t *tester) release() { t.db.Close() t.db.diskdb.Close() diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index 1c9efb024bd6..1a845be61ea9 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -325,6 +325,12 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { overflow = true oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation** } + // Notify the state history indexer for newly created history + if dl.db.indexer != nil { + if err := dl.db.indexer.extend(bottom.stateID()); err != nil { + return nil, err + } + } } // Mark the diskLayer as stale before applying any mutations on top. dl.stale = true @@ -418,6 +424,12 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { dl.stale = true + // Unindex the corresponding state history + if dl.db.indexer != nil { + if err := dl.db.indexer.shorten(dl.id); err != nil { + return nil, err + } + } // State change may be applied to node buffer, or the persistent // state, depends on if node buffer is empty or not. If the node // buffer is not empty, it means that the state transition that diff --git a/triedb/pathdb/history.go b/triedb/pathdb/history.go index aed0296da5bc..9372aef573c6 100644 --- a/triedb/pathdb/history.go +++ b/triedb/pathdb/history.go @@ -506,25 +506,41 @@ func (h *history) decode(accountData, storageData, accountIndexes, storageIndexe // readHistory reads and decodes the state history object by the given id. func readHistory(reader ethdb.AncientReader, id uint64) (*history, error) { - blob := rawdb.ReadStateHistoryMeta(reader, id) - if len(blob) == 0 { - return nil, fmt.Errorf("state history not found %d", id) + mData, accountIndexes, storageIndexes, accountData, storageData, err := rawdb.ReadStateHistory(reader, id) + if err != nil { + return nil, err } var m meta - if err := m.decode(blob); err != nil { + if err := m.decode(mData); err != nil { return nil, err } - var ( - dec = history{meta: &m} - accountData = rawdb.ReadStateAccountHistory(reader, id) - storageData = rawdb.ReadStateStorageHistory(reader, id) - accountIndexes = rawdb.ReadStateAccountIndex(reader, id) - storageIndexes = rawdb.ReadStateStorageIndex(reader, id) - ) - if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil { + h := history{meta: &m} + if err := h.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil { return nil, err } - return &dec, nil + return &h, nil +} + +// readHistories reads and decodes a list of state histories with the specific +// history range. +func readHistories(freezer ethdb.AncientReader, start uint64, count uint64) ([]*history, error) { + var histories []*history + metaList, aIndexList, sIndexList, aDataList, sDataList, err := rawdb.ReadStateHistoryList(freezer, start, count) + if err != nil { + return nil, err + } + for i := 0; i < len(metaList); i++ { + var m meta + if err := m.decode(metaList[i]); err != nil { + return nil, err + } + h := history{meta: &m} + if err := h.decode(aDataList[i], sDataList[i], aIndexList[i], sIndexList[i]); err != nil { + return nil, err + } + histories = append(histories, &h) + } + return histories, nil } // writeHistory persists the state history with the provided state set. diff --git a/triedb/pathdb/history_index.go b/triedb/pathdb/history_index.go new file mode 100644 index 000000000000..ed8f1e67d9b7 --- /dev/null +++ b/triedb/pathdb/history_index.go @@ -0,0 +1,433 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see desc.max { + return nil, fmt.Errorf("indexBlockDesc: min %d > max %d", desc.min, desc.max) + } + if lastID != 0 { + if lastID+1 != desc.id { + return nil, fmt.Errorf("index block id is out of order, last-id: %d, this-id: %d", lastID, desc.id) + } + if desc.min <= lastMax { + return nil, fmt.Errorf("index block range is out of order, last-max: %d, this-min: %d", lastMax, desc.min) + } + } + lastID = desc.id + lastMax = desc.max + descList = append(descList, &desc) + } + return descList, nil +} + +// indexReader is the structure to look up the state history index records +// associated with the specific state element. +type indexReader struct { + db ethdb.KeyValueReader + descList []*indexBlockDesc + readers map[uint32]*blockReader + state stateIdent +} + +// loadIndexData loads the index data associated with the specified state. +func loadIndexData(db ethdb.KeyValueReader, state stateIdent) ([]*indexBlockDesc, error) { + var blob []byte + if state.account { + blob = rawdb.ReadAccountHistoryIndex(db, state.address) + } else { + blob = rawdb.ReadStorageHistoryIndex(db, state.address, state.storageHash) + } + if len(blob) == 0 { + return nil, nil + } + return parseIndex(blob) +} + +// newIndexReader constructs a index reader for the specified state. Reader with +// empty data is allowed. +func newIndexReader(db ethdb.KeyValueReader, state stateIdent) (*indexReader, error) { + descList, err := loadIndexData(db, state) + if err != nil { + return nil, err + } + return &indexReader{ + descList: descList, + readers: make(map[uint32]*blockReader), + db: db, + state: state, + }, nil +} + +// refresh reloads the last section of index data to account for any additional +// elements that may have been written to disk. +func (r *indexReader) refresh() error { + // Release the reader for the last section of index data, as its content + // may have been modified by additional elements written to the disk. + if len(r.descList) != 0 { + last := r.descList[len(r.descList)-1] + if !last.full() { + delete(r.readers, last.id) + } + } + descList, err := loadIndexData(r.db, r.state) + if err != nil { + return err + } + r.descList = descList + return nil +} + +// readGreaterThan locates the first element that is greater than the specified +// id. If no such element is found, MaxUint64 is returned. +func (r *indexReader) readGreaterThan(id uint64) (uint64, error) { + index := sort.Search(len(r.descList), func(i int) bool { + return id < r.descList[i].max + }) + if index == len(r.descList) { + return math.MaxUint64, nil + } + desc := r.descList[index] + + br, ok := r.readers[desc.id] + if !ok { + var ( + err error + blob []byte + ) + if r.state.account { + blob = rawdb.ReadAccountHistoryIndexBlock(r.db, r.state.address, desc.id) + } else { + blob = rawdb.ReadStorageHistoryIndexBlock(r.db, r.state.address, r.state.storageHash, desc.id) + } + br, err = newBlockReader(blob) + if err != nil { + return 0, err + } + r.readers[desc.id] = br + } + // The supplied ID is not greater than block.max, ensuring that an element + // satisfying the condition can be found. + return br.readGreaterThan(id) +} + +// indexWriter is responsible for writing index data for a specific state (either +// an account or a storage slot). The state index follows a two-layer structure: +// the first layer consists of a list of fixed-size metadata, each linked to a +// second-layer block. The index data (monotonically increasing list of state +// history ids) is stored in these second-layer index blocks, which are size +// limited. +type indexWriter struct { + descList []*indexBlockDesc // The list of index block descriptions + bw *blockWriter // The live index block writer + frozen []*blockWriter // The finalized index block writers, waiting for flush + lastID uint64 // The ID of the latest tracked history + state stateIdent + db ethdb.KeyValueReader +} + +// newIndexWriter constructs the index writer for the specified state. +func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, error) { + var blob []byte + if state.account { + blob = rawdb.ReadAccountHistoryIndex(db, state.address) + } else { + blob = rawdb.ReadStorageHistoryIndex(db, state.address, state.storageHash) + } + if len(blob) == 0 { + desc := newIndexBlockDesc(0) + bw, _ := newBlockWriter(nil, desc) + return &indexWriter{ + descList: []*indexBlockDesc{desc}, + bw: bw, + state: state, + db: db, + }, nil + } + descList, err := parseIndex(blob) + if err != nil { + return nil, err + } + var ( + indexBlock []byte + lastDesc = descList[len(descList)-1] + ) + if state.account { + indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.address, lastDesc.id) + } else { + indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.address, state.storageHash, lastDesc.id) + } + bw, err := newBlockWriter(indexBlock, lastDesc) + if err != nil { + return nil, err + } + return &indexWriter{ + descList: descList, + lastID: lastDesc.max, + bw: bw, + state: state, + db: db, + }, nil +} + +// append adds the new element into the index writer. +func (w *indexWriter) append(id uint64) error { + if id <= w.lastID { + return fmt.Errorf("append element out of order, last: %d, this: %d", w.lastID, id) + } + if w.bw.full() { + if err := w.rotate(); err != nil { + return err + } + } + if err := w.bw.append(id); err != nil { + return err + } + w.lastID = id + + return nil +} + +// rotate creates a new index block for storing index records from scratch +// and caches the current full index block for finalization. +func (w *indexWriter) rotate() error { + var ( + err error + desc = newIndexBlockDesc(w.bw.desc.id + 1) + ) + w.frozen = append(w.frozen, w.bw) + w.bw, err = newBlockWriter(nil, desc) + if err != nil { + return err + } + w.descList = append(w.descList, desc) + return nil +} + +// finish finalizes all the frozen index block writers along with the live one +// if it's not empty, committing the index block data and the index meta into +// the supplied batch. +// +// This function is safe to be called multiple times. +func (w *indexWriter) finish(batch ethdb.Batch) { + var ( + writers = append(w.frozen, w.bw) + descList = w.descList + ) + // The live index block writer might be empty if the entire index write + // is created from scratch, remove it from committing. + if w.bw.empty() { + writers = writers[:len(writers)-1] + descList = descList[:len(descList)-1] + } + if len(writers) == 0 { + return // nothing to commit + } + for _, bw := range writers { + if w.state.account { + rawdb.WriteAccountHistoryIndexBlock(batch, w.state.address, bw.desc.id, bw.finish()) + } else { + rawdb.WriteStorageHistoryIndexBlock(batch, w.state.address, w.state.storageHash, bw.desc.id, bw.finish()) + } + } + w.frozen = nil // release all the frozen writers + + buf := make([]byte, 0, indexBlockDescSize*len(descList)) + for _, desc := range descList { + buf = append(buf, desc.encode()...) + } + if w.state.account { + rawdb.WriteAccountHistoryIndex(batch, w.state.address, buf) + } else { + rawdb.WriteStorageHistoryIndex(batch, w.state.address, w.state.storageHash, buf) + } +} + +// indexDeleter is responsible for deleting index data for a specific state. +type indexDeleter struct { + descList []*indexBlockDesc // The list of index block descriptions + bw *blockWriter // The live index block writer + dropped []uint32 // The list of index block id waiting for deleting + lastID uint64 // The ID of the latest tracked history + state stateIdent + db ethdb.KeyValueReader +} + +// newIndexDeleter constructs the index deleter for the specified state. +func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter, error) { + var blob []byte + if state.account { + blob = rawdb.ReadAccountHistoryIndex(db, state.address) + } else { + blob = rawdb.ReadStorageHistoryIndex(db, state.address, state.storageHash) + } + if len(blob) == 0 { + // TODO(rjl493456442) we can probably return an error here, + // deleter with no data is meaningless. + desc := newIndexBlockDesc(0) + bw, _ := newBlockWriter(nil, desc) + return &indexDeleter{ + descList: []*indexBlockDesc{desc}, + bw: bw, + state: state, + db: db, + }, nil + } + descList, err := parseIndex(blob) + if err != nil { + return nil, err + } + var ( + indexBlock []byte + lastDesc = descList[len(descList)-1] + ) + if state.account { + indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.address, lastDesc.id) + } else { + indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.address, state.storageHash, lastDesc.id) + } + bw, err := newBlockWriter(indexBlock, lastDesc) + if err != nil { + return nil, err + } + return &indexDeleter{ + descList: descList, + lastID: lastDesc.max, + bw: bw, + state: state, + db: db, + }, nil +} + +// empty returns an flag indicating whether the state index is empty. +func (d *indexDeleter) empty() bool { + return d.bw.empty() && len(d.descList) == 1 +} + +// pop removes the last written element from the index writer. +func (d *indexDeleter) pop(id uint64) error { + if id == 0 { + return fmt.Errorf("zero history ID is not valid") + } + if id != d.lastID { + return fmt.Errorf("pop element out of order, last: %d, this: %d", d.lastID, id) + } + if err := d.bw.pop(id); err != nil { + return err + } + if !d.bw.empty() { + d.lastID = d.bw.desc.max + return nil + } + // Discarding the last block writer if it becomes empty by popping an element + d.dropped = append(d.dropped, d.descList[len(d.descList)-1].id) + + // Reset the entire index writer if it becomes empty after popping an element + if d.empty() { + d.lastID = 0 + return nil + } + d.descList = d.descList[:len(d.descList)-1] + + // Open the previous block writer for deleting + var ( + indexBlock []byte + lastDesc = d.descList[len(d.descList)-1] + ) + if d.state.account { + indexBlock = rawdb.ReadAccountHistoryIndexBlock(d.db, d.state.address, lastDesc.id) + } else { + indexBlock = rawdb.ReadStorageHistoryIndexBlock(d.db, d.state.address, d.state.storageHash, lastDesc.id) + } + bw, err := newBlockWriter(indexBlock, lastDesc) + if err != nil { + return err + } + d.bw = bw + d.lastID = bw.desc.max + return nil +} + +// finish deletes the empty index blocks and updates the index meta. +// +// This function is safe to be called multiple times. +func (d *indexDeleter) finish(batch ethdb.Batch) { + for _, id := range d.dropped { + if d.state.account { + rawdb.DeleteAccountHistoryIndexBlock(batch, d.state.address, id) + } else { + rawdb.DeleteStorageHistoryIndexBlock(batch, d.state.address, d.state.storageHash, id) + } + } + d.dropped = nil + + // Flush the content of last block writer, regardless it's dirty or not + if !d.bw.empty() { + if d.state.account { + rawdb.WriteAccountHistoryIndexBlock(batch, d.state.address, d.bw.desc.id, d.bw.finish()) + } else { + rawdb.WriteStorageHistoryIndexBlock(batch, d.state.address, d.state.storageHash, d.bw.desc.id, d.bw.finish()) + } + } + // Flush the index metadata into the supplied batch + if d.empty() { + if d.state.account { + rawdb.DeleteAccountHistoryIndex(batch, d.state.address) + } else { + rawdb.DeleteStorageHistoryIndex(batch, d.state.address, d.state.storageHash) + } + } else { + buf := make([]byte, 0, indexBlockDescSize*len(d.descList)) + for _, desc := range d.descList { + buf = append(buf, desc.encode()...) + } + if d.state.account { + rawdb.WriteAccountHistoryIndex(batch, d.state.address, buf) + } else { + rawdb.WriteStorageHistoryIndex(batch, d.state.address, d.state.storageHash, buf) + } + } +} diff --git a/triedb/pathdb/history_index_block.go b/triedb/pathdb/history_index_block.go new file mode 100644 index 000000000000..854029904c4d --- /dev/null +++ b/triedb/pathdb/history_index_block.go @@ -0,0 +1,403 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see = indexBlockEntriesCap +} + +// encode packs index block descriptor into byte stream. +func (d *indexBlockDesc) encode() []byte { + var buf [indexBlockDescSize]byte + binary.BigEndian.PutUint64(buf[:8], d.min) + binary.BigEndian.PutUint64(buf[8:16], d.max) + binary.BigEndian.PutUint32(buf[16:20], d.entries) + binary.BigEndian.PutUint32(buf[20:24], d.id) + return buf[:] +} + +// decode unpacks index block descriptor from byte stream. +func (d *indexBlockDesc) decode(blob []byte) { + d.min = binary.BigEndian.Uint64(blob[:8]) + d.max = binary.BigEndian.Uint64(blob[8:16]) + d.entries = binary.BigEndian.Uint32(blob[16:20]) + d.id = binary.BigEndian.Uint32(blob[20:24]) +} + +// parseIndexBlock parses the index block with the supplied byte stream. +// The index block format can be illustrated as below: +// +// +---->+------------------+ +// | | Chunk1 | +// | +------------------+ +// | | ...... | +// | +-->+------------------+ +// | | | ChunkN | +// | | +------------------+ +// +-|---| Restart1 | +// | | Restart... | 4N bytes +// +---| RestartN | +// +------------------+ +// | Restart count | 4 bytes +// +------------------+ +// +// - Chunk list: A list of data chunks +// - Restart list: A list of 4-byte pointers, each pointing to the start position of a chunk +// - Restart count: The number of restarts in the block, stored at the end of the block (4 bytes) +// +// Each chunk begins with the full value of the first integer, followed by +// subsequent integers representing the differences between the current value +// and the preceding one. Integers are encoded with variable-size for best +// storage efficiency. Each chunk can be illustrated as below. +// +// Restart ---> +----------------+ +// | Full integer | +// +----------------+ +// | Diff with prev | +// +----------------+ +// | ... | +// +----------------+ +// | Diff with prev | +// +----------------+ +// +// Empty index block is regarded as invalid. +func parseIndexBlock(blob []byte) ([]uint32, []byte, error) { + if len(blob) < 4 { + return nil, nil, fmt.Errorf("corrupted index block, len: %d", len(blob)) + } + restartLen := binary.BigEndian.Uint32(blob[len(blob)-4:]) + if restartLen == 0 { + return nil, nil, errors.New("corrupted index block, no restart") + } + tailLen := int(restartLen+1) * 4 + if len(blob) < tailLen { + return nil, nil, fmt.Errorf("truncated restarts, size: %d, restarts: %d", len(blob), restartLen) + } + restarts := make([]uint32, 0, restartLen) + for i := restartLen; i > 0; i-- { + restart := binary.BigEndian.Uint32(blob[len(blob)-int(i+1)*4:]) + restarts = append(restarts, restart) + } + // Validate that restart points are strictly ordered and within the valid + // data range. + var prev uint32 + for i := 0; i < len(restarts); i++ { + if i != 0 { + if restarts[i] <= prev { + return nil, nil, fmt.Errorf("restart out of order, prev: %d, next: %d", prev, restarts[i]) + } + } + if int(restarts[i]) >= len(blob)-tailLen { + return nil, nil, fmt.Errorf("invalid restart position, restart: %d, size: %d", restarts[i], len(blob)-tailLen) + } + prev = restarts[i] + } + return restarts, blob[:len(blob)-tailLen], nil +} + +// blockReader is the reader to access the element within a block. +type blockReader struct { + restarts []uint32 + data []byte +} + +// newBlockReader constructs the block reader with the supplied block data. +func newBlockReader(blob []byte) (*blockReader, error) { + restarts, data, err := parseIndexBlock(blob) + if err != nil { + return nil, err + } + return &blockReader{ + restarts: restarts, + data: data, // safe to own the slice + }, nil +} + +// readGreaterThan locates the first element in the block that is greater than +// the specified value. If no such element is found, MaxUint64 is returned. +func (br *blockReader) readGreaterThan(id uint64) (uint64, error) { + var err error + index := sort.Search(len(br.restarts), func(i int) bool { + item, n := binary.Uvarint(br.data[br.restarts[i]:]) + if n <= 0 { + err = fmt.Errorf("failed to decode item at restart %d", br.restarts[i]) + } + return item > id + }) + if err != nil { + return 0, err + } + if index == 0 { + item, _ := binary.Uvarint(br.data[br.restarts[0]:]) + return item, nil + } + var ( + start int + limit int + result uint64 + ) + if index == len(br.restarts) { + // The element being searched falls within the last restart section, + // there is no guarantee such element can be found. + start = int(br.restarts[len(br.restarts)-1]) + limit = len(br.data) + } else { + // The element being searched falls within the non-last restart section, + // such element can be found for sure. + start = int(br.restarts[index-1]) + limit = int(br.restarts[index]) + } + pos := start + for pos < limit { + x, n := binary.Uvarint(br.data[pos:]) + if pos == start { + result = x + } else { + result += x + } + if result > id { + return result, nil + } + pos += n + } + // The element which is greater than specified id is not found. + if index == len(br.restarts) { + return math.MaxUint64, nil + } + // The element which is the first one greater than the specified id + // is exactly the one located at the restart point. + item, _ := binary.Uvarint(br.data[br.restarts[index]:]) + return item, nil +} + +type blockWriter struct { + desc *indexBlockDesc // Descriptor of the block + restarts []uint32 // Offsets into the data slice, marking the start of each section + scratch []byte // Buffer used for encoding full integers or value differences + data []byte // Aggregated encoded data slice +} + +func newBlockWriter(blob []byte, desc *indexBlockDesc) (*blockWriter, error) { + scratch := make([]byte, binary.MaxVarintLen64) + if len(blob) == 0 { + return &blockWriter{ + desc: desc, + scratch: scratch, + data: make([]byte, 0, 1024), + }, nil + } + restarts, data, err := parseIndexBlock(blob) + if err != nil { + return nil, err + } + return &blockWriter{ + desc: desc, + restarts: restarts, + scratch: scratch, + data: data, // safe to own the slice + }, nil +} + +// append adds a new element to the block. The new element must be greater than +// the previous one. The provided ID is assumed to always be greater than 0. +func (b *blockWriter) append(id uint64) error { + if id == 0 { + return errors.New("invalid zero id") + } + if id <= b.desc.max { + return fmt.Errorf("append element out of order, last: %d, this: %d", b.desc.max, id) + } + // Rotate the current restart section if it's full + if b.desc.entries%indexBlockRestartLen == 0 { + // Save the offset within the data slice as the restart point + // for the next section. + b.restarts = append(b.restarts, uint32(len(b.data))) + + // The restart point item can either be encoded in variable + // size or fixed size. Although variable-size encoding is + // slightly slower (2ns per operation), it is still relatively + // fast, therefore, it's picked for better space efficiency. + // + // The first element in a restart range is encoded using its + // full value. + n := binary.PutUvarint(b.scratch[0:], id) + b.data = append(b.data, b.scratch[:n]...) + } else { + // The current section is not full, append the element. + // The element which is not the first one in the section + // is encoded using the value difference from the preceding + // element. + n := binary.PutUvarint(b.scratch[0:], id-b.desc.max) + b.data = append(b.data, b.scratch[:n]...) + } + b.desc.entries++ + + // The state history ID must be greater than 0. + if b.desc.min == 0 { + b.desc.min = id + } + b.desc.max = id + return nil +} + +// scanSection traverses the specified section and terminates if fn returns true. +func (b *blockWriter) scanSection(section int, fn func(uint64, int) bool) { + var ( + value uint64 + start = int(b.restarts[section]) + pos = start + limit int + ) + if section == len(b.restarts)-1 { + limit = len(b.data) + } else { + limit = int(b.restarts[section+1]) + } + for pos < limit { + x, n := binary.Uvarint(b.data[pos:]) + if pos == start { + value = x + } else { + value += x + } + if fn(value, pos) { + return + } + pos += n + } +} + +// sectionLast returns the last element in the specified section. +func (b *blockWriter) sectionLast(section int) uint64 { + var n uint64 + b.scanSection(section, func(v uint64, _ int) bool { + n = v + return false + }) + return n +} + +// sectionSearch looks up the specified value in the given section, +// the position and the preceding value will be returned if found. +func (b *blockWriter) sectionSearch(section int, n uint64) (found bool, prev uint64, pos int) { + b.scanSection(section, func(v uint64, p int) bool { + if n == v { + pos = p + found = true + return true // terminate iteration + } + prev = v + return false // continue iteration + }) + return found, prev, pos +} + +// pop removes the last element from the block. The assumption is held that block +// writer must be non-empty. +func (b *blockWriter) pop(id uint64) error { + if id == 0 { + return errors.New("invalid zero id") + } + if id != b.desc.max { + return fmt.Errorf("pop element out of order, last: %d, this: %d", b.desc.max, id) + } + // If there is only one entry left, the entire block should be reset + if b.desc.entries == 1 { + b.desc.min = 0 + b.desc.max = 0 + b.desc.entries = 0 + b.restarts = nil + b.data = b.data[:0] + return nil + } + // Pop the last restart section if the section becomes empty after removing + // one element. + if b.desc.entries%indexBlockRestartLen == 1 { + b.data = b.data[:b.restarts[len(b.restarts)-1]] + b.restarts = b.restarts[:len(b.restarts)-1] + b.desc.max = b.sectionLast(len(b.restarts) - 1) + b.desc.entries -= 1 + return nil + } + // Look up the element preceding the one to be popped, in order to update + // the maximum element in the block. + found, prev, pos := b.sectionSearch(len(b.restarts)-1, id) + if !found { + return fmt.Errorf("pop element is not found, last: %d, this: %d", b.desc.max, id) + } + b.desc.max = prev + b.data = b.data[:pos] + b.desc.entries -= 1 + return nil +} + +func (b *blockWriter) empty() bool { + return b.desc.empty() +} + +func (b *blockWriter) full() bool { + return b.desc.full() +} + +// finish finalizes the index block encoding by appending the encoded restart points +// and the restart counter to the end of the block. +// +// This function is safe to be called multiple times. +func (b *blockWriter) finish() []byte { + var buf []byte + for _, number := range append(b.restarts, uint32(len(b.restarts))) { + binary.BigEndian.PutUint32(b.scratch[:4], number) + buf = append(buf, b.scratch[:4]...) + } + return append(b.data, buf...) +} diff --git a/triedb/pathdb/history_index_block_test.go b/triedb/pathdb/history_index_block_test.go new file mode 100644 index 000000000000..32bc3eda7358 --- /dev/null +++ b/triedb/pathdb/history_index_block_test.go @@ -0,0 +1,217 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see value + }) + got, err := br.readGreaterThan(value) + if err != nil { + t.Fatalf("Unexpected error, got %v", err) + } + if pos == len(elements) { + if got != math.MaxUint64 { + t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", got) + } + } else if got != elements[pos] { + t.Fatalf("Unexpected result, got %d, wanted %d", got, elements[pos]) + } + } +} + +func TestBlockWriterBasic(t *testing.T) { + bw, _ := newBlockWriter(nil, newIndexBlockDesc(0)) + if !bw.empty() { + t.Fatal("expected empty block") + } + bw.append(2) + if err := bw.append(1); err == nil { + t.Fatal("out-of-order insertion is not expected") + } + for i := 0; i < 10; i++ { + bw.append(uint64(i + 3)) + } + + bw, err := newBlockWriter(bw.finish(), newIndexBlockDesc(0)) + if err != nil { + t.Fatalf("Failed to construct the block writer, %v", err) + } + for i := 0; i < 10; i++ { + if err := bw.append(uint64(i + 100)); err != nil { + t.Fatalf("Failed to append value %d: %v", i, err) + } + } + bw.finish() +} + +func TestBlockWriterDelete(t *testing.T) { + bw, _ := newBlockWriter(nil, newIndexBlockDesc(0)) + for i := 0; i < 10; i++ { + bw.append(uint64(i + 1)) + } + // Pop unknown id, the request should be rejected + if err := bw.pop(100); err == nil { + t.Fatal("Expect error to occur for unknown id") + } + for i := 10; i >= 1; i-- { + if err := bw.pop(uint64(i)); err != nil { + t.Fatalf("Unexpected error for element popping, %v", err) + } + empty := i == 1 + if empty != bw.empty() { + t.Fatalf("Emptiness is not matched, want: %T, got: %T", empty, bw.empty()) + } + newMax := uint64(i - 1) + if bw.desc.max != newMax { + t.Fatalf("Maxmium element is not matched, want: %d, got: %d", newMax, bw.desc.max) + } + } +} + +func TestBlcokWriterDeleteWithData(t *testing.T) { + elements := []uint64{ + 1, 5, 10, 11, 20, + } + bw, _ := newBlockWriter(nil, newIndexBlockDesc(0)) + for i := 0; i < len(elements); i++ { + bw.append(elements[i]) + } + + // Re-construct the block writer with data + desc := &indexBlockDesc{ + id: 0, + min: 1, + max: 20, + entries: 5, + } + bw, err := newBlockWriter(bw.finish(), desc) + if err != nil { + t.Fatalf("Failed to construct block writer %v", err) + } + for i := len(elements) - 1; i > 0; i-- { + if err := bw.pop(elements[i]); err != nil { + t.Fatalf("Failed to pop element, %v", err) + } + newTail := elements[i-1] + + // Ensure the element can still be queried with no issue + br, err := newBlockReader(bw.finish()) + if err != nil { + t.Fatalf("Failed to construct the block reader, %v", err) + } + cases := []struct { + value uint64 + result uint64 + }{ + {0, 1}, + {1, 5}, + {10, 11}, + {19, 20}, + {20, math.MaxUint64}, + {21, math.MaxUint64}, + } + for _, c := range cases { + want := c.result + if c.value >= newTail { + want = math.MaxUint64 + } + got, err := br.readGreaterThan(c.value) + if err != nil { + t.Fatalf("Unexpected error, got %v", err) + } + if got != want { + t.Fatalf("Unexpected result, got %v, wanted %v", got, want) + } + } + } +} + +func TestCorruptedIndexBlock(t *testing.T) { + bw, _ := newBlockWriter(nil, newIndexBlockDesc(0)) + for i := 0; i < 10; i++ { + bw.append(uint64(i + 1)) + } + buf := bw.finish() + + // Mutate the buffer manually + buf[len(buf)-1]++ + _, err := newBlockWriter(buf, newIndexBlockDesc(0)) + if err == nil { + t.Fatal("Corrupted index block data is not detected") + } +} diff --git a/triedb/pathdb/history_index_test.go b/triedb/pathdb/history_index_test.go new file mode 100644 index 000000000000..b54194400e67 --- /dev/null +++ b/triedb/pathdb/history_index_test.go @@ -0,0 +1,290 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see value + }) + got, err := br.readGreaterThan(value) + if err != nil { + t.Fatalf("Unexpected error, got %v", err) + } + if pos == len(elements) { + if got != math.MaxUint64 { + t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", got) + } + } else if got != elements[pos] { + t.Fatalf("Unexpected result, got %d, wanted %d", got, elements[pos]) + } + } +} + +func TestEmptyIndexReader(t *testing.T) { + br, err := newIndexReader(rawdb.NewMemoryDatabase(), newAccountIdent(common.Address{0xa})) + if err != nil { + t.Fatalf("Failed to construct the index reader, %v", err) + } + res, err := br.readGreaterThan(100) + if err != nil { + t.Fatalf("Failed to query, %v", err) + } + if res != math.MaxUint64 { + t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", res) + } +} + +func TestIndexWriterBasic(t *testing.T) { + db := rawdb.NewMemoryDatabase() + iw, _ := newIndexWriter(db, newAccountIdent(common.Address{0xa})) + iw.append(2) + if err := iw.append(1); err == nil { + t.Fatal("out-of-order insertion is not expected") + } + for i := 0; i < 10; i++ { + iw.append(uint64(i + 3)) + } + batch := db.NewBatch() + iw.finish(batch) + batch.Write() + + iw, err := newIndexWriter(db, newAccountIdent(common.Address{0xa})) + if err != nil { + t.Fatalf("Failed to construct the block writer, %v", err) + } + for i := 0; i < 10; i++ { + if err := iw.append(uint64(i + 100)); err != nil { + t.Fatalf("Failed to append item, %v", err) + } + } + iw.finish(db.NewBatch()) +} + +func TestIndexWriterDelete(t *testing.T) { + db := rawdb.NewMemoryDatabase() + iw, _ := newIndexWriter(db, newAccountIdent(common.Address{0xa})) + for i := 0; i < indexBlockEntriesCap*4; i++ { + iw.append(uint64(i + 1)) + } + batch := db.NewBatch() + iw.finish(batch) + batch.Write() + + // Delete unknown id, the request should be rejected + id, _ := newIndexDeleter(db, newAccountIdent(common.Address{0xa})) + if err := id.pop(indexBlockEntriesCap * 5); err == nil { + t.Fatal("Expect error to occur for unknown id") + } + for i := indexBlockEntriesCap * 4; i >= 1; i-- { + if err := id.pop(uint64(i)); err != nil { + t.Fatalf("Unexpected error for element popping, %v", err) + } + if id.lastID != uint64(i-1) { + t.Fatalf("Unexpected lastID, want: %d, got: %d", uint64(i-1), iw.lastID) + } + if rand.Intn(10) == 0 { + batch := db.NewBatch() + id.finish(batch) + batch.Write() + } + } +} + +func TestBatchIndexerWrite(t *testing.T) { + var ( + db = rawdb.NewMemoryDatabase() + batch = newBatchIndexer(db, false) + histories = makeHistories(10) + ) + for i, h := range histories { + if err := batch.process(h, uint64(i+1)); err != nil { + t.Fatalf("Failed to process history, %v", err) + } + } + if err := batch.finish(true); err != nil { + t.Fatalf("Failed to finish batch indexer, %v", err) + } + indexed := rawdb.ReadLastStateHistoryIndex(db) + if indexed == nil || *indexed != uint64(10) { + t.Fatal("Unexpected index position") + } + var ( + accounts = make(map[common.Address][]uint64) + storages = make(map[common.Address]map[common.Hash][]uint64) + ) + for i, h := range histories { + for _, addr := range h.accountList { + accounts[addr] = append(accounts[addr], uint64(i+1)) + + if _, ok := storages[addr]; !ok { + storages[addr] = make(map[common.Hash][]uint64) + } + for _, slot := range h.storageList[addr] { + storages[addr][slot] = append(storages[addr][slot], uint64(i+1)) + } + } + } + for addr, indexes := range accounts { + ir, _ := newIndexReader(db, newAccountIdent(addr)) + for i := 0; i < len(indexes)-1; i++ { + n, err := ir.readGreaterThan(indexes[i]) + if err != nil { + t.Fatalf("Failed to read index, %v", err) + } + if n != indexes[i+1] { + t.Fatalf("Unexpected result, want %d, got %d", indexes[i+1], n) + } + } + n, err := ir.readGreaterThan(indexes[len(indexes)-1]) + if err != nil { + t.Fatalf("Failed to read index, %v", err) + } + if n != math.MaxUint64 { + t.Fatalf("Unexpected result, want math.MaxUint64, got %d", n) + } + } + for addr, slots := range storages { + for slotHash, indexes := range slots { + ir, _ := newIndexReader(db, newStorageIdent(addr, slotHash)) + for i := 0; i < len(indexes)-1; i++ { + n, err := ir.readGreaterThan(indexes[i]) + if err != nil { + t.Fatalf("Failed to read index, %v", err) + } + if n != indexes[i+1] { + t.Fatalf("Unexpected result, want %d, got %d", indexes[i+1], n) + } + } + n, err := ir.readGreaterThan(indexes[len(indexes)-1]) + if err != nil { + t.Fatalf("Failed to read index, %v", err) + } + if n != math.MaxUint64 { + t.Fatalf("Unexpected result, want math.MaxUint64, got %d", n) + } + } + } +} + +func TestBatchIndexerDelete(t *testing.T) { + var ( + db = rawdb.NewMemoryDatabase() + bw = newBatchIndexer(db, false) + histories = makeHistories(10) + ) + // Index histories + for i, h := range histories { + if err := bw.process(h, uint64(i+1)); err != nil { + t.Fatalf("Failed to process history, %v", err) + } + } + if err := bw.finish(true); err != nil { + t.Fatalf("Failed to finish batch indexer, %v", err) + } + + // Unindex histories + bd := newBatchIndexer(db, true) + for i := len(histories) - 1; i >= 0; i-- { + if err := bd.process(histories[i], uint64(i+1)); err != nil { + t.Fatalf("Failed to process history, %v", err) + } + } + if err := bd.finish(true); err != nil { + t.Fatalf("Failed to finish batch indexer, %v", err) + } + + indexed := rawdb.ReadLastStateHistoryIndex(db) + if indexed != nil { + t.Fatal("Unexpected index position") + } + it := db.NewIterator(rawdb.StateHistoryIndexPrefix, nil) + for it.Next() { + t.Fatal("Leftover history index data") + } + it.Release() +} diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go new file mode 100644 index 000000000000..ade4f1a41cf8 --- /dev/null +++ b/triedb/pathdb/history_indexer.go @@ -0,0 +1,518 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see = tailID { + return *lastIndexed + 1, nil + } + // History has been shortened without indexing. Discard the gapped segment + // in the history and shift to the first available element. + // + // The missing indexes corresponding to the gapped histories won't be visible. + // It's fine to leave them unindexed. + log.Info("History gap detected, discard old segment", "oldHead", *lastIndexed, "newHead", tailID) + return tailID, nil +} + +func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID uint64) { + defer close(done) + + beginID, err := i.next() + if err != nil { + log.Error("Failed to find next state history for indexing", "err", err) + return + } + // All available state histories have been indexed, and the last indexed one + // exceeds the most recent available state history. This situation may occur + // when the state is reverted manually (chain.SetHead) or the deep reorg is + // encountered. In such cases, no indexing should be scheduled. + if beginID > lastID { + return + } + log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) + + var ( + current = beginID + start = time.Now() + logged = time.Now() + batch = newBatchIndexer(i.disk, false) + ) + for current <= lastID { + count := lastID - current + 1 + if count > historyReadBatch { + count = historyReadBatch + } + histories, err := readHistories(i.freezer, current, count) + if err != nil { + // The history read might fall if the history is truncated from + // head due to revert operation. + log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err) + return + } + for _, h := range histories { + if err := batch.process(h, current); err != nil { + log.Error("Failed to index history", "err", err) + return + } + current += 1 + + // Occasionally report the indexing progress + if time.Since(logged) > time.Second*8 { + logged = time.Now() + + var ( + left = lastID - current + 1 + done = current - beginID + speed = done/uint64(time.Since(start)/time.Millisecond+1) + 1 // +1s to avoid division by zero + ) + // Override the ETA if larger than the largest until now + eta := time.Duration(left/speed) * time.Millisecond + log.Info("Indexing state history", "processed", done, "left", left, "eta", common.PrettyDuration(eta)) + } + } + // Check interruption signal and abort process if it's fired + if interrupt != nil { + if signal := interrupt.Load(); signal != 0 { + if err := batch.finish(true); err != nil { + log.Error("Failed to flush index", "err", err) + } + log.Info("State indexing interrupted") + return + } + } + } + if err := batch.finish(true); err != nil { + log.Error("Failed to flush index", "err", err) + } + log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) +} + +// historyIndexer manages the indexing and unindexing of state histories, +// providing access to historical states. +// +// Upon initialization, historyIndexer starts a one-time background process +// to complete the indexing of any remaining state histories. Once this +// process is finished, all state histories are marked as fully indexed, +// enabling handling of requests for historical states. Thereafter, any new +// state histories must be indexed or unindexed synchronously, ensuring that +// the history index is created or removed along with the corresponding +// state history. +type historyIndexer struct { + initer *indexIniter + disk ethdb.KeyValueStore + freezer ethdb.AncientStore +} + +// newHistoryIndexer constructs the history indexer and launches the background +// initer to complete the indexing of any remaining state histories. +func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64) *historyIndexer { + return &historyIndexer{ + initer: newIndexIniter(disk, freezer, lastHistoryID), + disk: disk, + freezer: freezer, + } +} + +func (i *historyIndexer) close() { + i.initer.close() +} + +func (i *historyIndexer) inited() bool { + return i.initer.inited() +} + +// extend sends the notification that new state history with specified ID +// has been written into the database and is ready for indexing. +func (i *historyIndexer) extend(historyID uint64) error { + signal := &interruptSignal{ + newLastID: historyID, + result: make(chan error, 1), + } + select { + case <-i.initer.closed: + return errors.New("indexer is closed") + case <-i.initer.done: + return indexSingle(historyID, i.disk, i.freezer) + case i.initer.interrupt <- signal: + return <-signal.result + } +} + +// shorten sends the notification that state history with specified ID +// is about to be deleted from the database and should be unindexed. +func (i *historyIndexer) shorten(historyID uint64) error { + signal := &interruptSignal{ + newLastID: historyID - 1, + result: make(chan error, 1), + } + select { + case <-i.initer.closed: + return errors.New("indexer is closed") + case <-i.initer.done: + return unindexSingle(historyID, i.disk, i.freezer) + case i.initer.interrupt <- signal: + return <-signal.result + } +} diff --git a/triedb/pathdb/history_reader.go b/triedb/pathdb/history_reader.go new file mode 100644 index 000000000000..b7aaf5e34f80 --- /dev/null +++ b/triedb/pathdb/history_reader.go @@ -0,0 +1,346 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see lastID { + return 0, fmt.Errorf("index reader is stale, limit: %d, last-state-id: %d", r.limit, lastID) + } + // Try to find the element which is greater than the specified target + res, err := r.reader.readGreaterThan(id) + if err != nil { + return 0, err + } + // Short circuit if the element is found within the current index + if res != math.MaxUint64 { + return res, nil + } + // The element was not found, and no additional histories have been indexed. + // Return a not-found result. + if r.limit == lastID { + return res, nil + } + // Refresh the index reader and give another attempt + indexed := rawdb.ReadLastStateHistoryIndex(r.db) + if indexed == nil || *indexed < lastID { + return 0, errors.New("state history hasn't been indexed yet") + } + if err := r.reader.refresh(); err != nil { + return 0, err + } + r.limit = *indexed + + return r.reader.readGreaterThan(id) +} + +// historyReader is the structure to access historic state data. +type historyReader struct { + disk ethdb.KeyValueReader + freezer ethdb.AncientReader + readers map[string]*indexReaderWithLimitTag +} + +// newHistoryReader constructs the history reader with the supplied db. +func newHistoryReader(disk ethdb.KeyValueReader, freezer ethdb.AncientReader) *historyReader { + return &historyReader{ + disk: disk, + freezer: freezer, + readers: make(map[string]*indexReaderWithLimitTag), + } +} + +// readAccountMetadata resolves the account metadata within the specified +// state history. +func (r *historyReader) readAccountMetadata(address common.Address, historyID uint64) ([]byte, error) { + blob := rawdb.ReadStateAccountIndex(r.freezer, historyID) + if len(blob)%accountIndexSize != 0 { + return nil, fmt.Errorf("account index is corrupted, historyID: %d", historyID) + } + n := len(blob) / accountIndexSize + + pos := sort.Search(n, func(i int) bool { + h := blob[accountIndexSize*i : accountIndexSize*i+common.HashLength] + return bytes.Compare(h, address.Bytes()) >= 0 + }) + if pos == n { + return nil, fmt.Errorf("account %#x is not found", address) + } + offset := accountIndexSize * pos + if address != common.BytesToAddress(blob[offset:offset+common.AddressLength]) { + return nil, fmt.Errorf("account %#x is not found", address) + } + return blob[offset : accountIndexSize*(pos+1)], nil +} + +// readStorageMetadata resolves the storage slot metadata within the specified +// state history. +func (r *historyReader) readStorageMetadata(storageKey common.Hash, storageHash common.Hash, historyID uint64, slotOffset, slotNumber int) ([]byte, error) { + // TODO(rj493456442) optimize it with partial read + blob := rawdb.ReadStateStorageIndex(r.freezer, historyID) + if len(blob)%slotIndexSize != 0 { + return nil, fmt.Errorf("storage indices is corrupted, historyID: %d", historyID) + } + if slotIndexSize*(slotOffset+slotNumber) > len(blob) { + return nil, errors.New("out of slice") + } + subSlice := blob[slotIndexSize*slotOffset : slotIndexSize*(slotOffset+slotNumber)] + + // TODO(rj493456442) get rid of the metadata resolution + var ( + m meta + target common.Hash + ) + blob = rawdb.ReadStateHistoryMeta(r.freezer, historyID) + if err := m.decode(blob); err != nil { + return nil, err + } + if m.version == stateHistoryV0 { + target = storageHash + } else { + target = storageKey + } + pos := sort.Search(slotNumber, func(i int) bool { + slotID := subSlice[slotIndexSize*i : slotIndexSize*i+common.HashLength] + return bytes.Compare(slotID, target.Bytes()) >= 0 + }) + if pos == slotNumber { + return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID) + } + offset := slotIndexSize * pos + if target != common.BytesToHash(subSlice[offset:offset+common.HashLength]) { + return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID) + } + return subSlice[offset : slotIndexSize*(pos+1)], nil +} + +// readAccount retrieves the account data from the specified state history. +func (r *historyReader) readAccount(address common.Address, historyID uint64) ([]byte, error) { + metadata, err := r.readAccountMetadata(address, historyID) + if err != nil { + return nil, err + } + length := int(metadata[common.AddressLength]) // one byte for account data length + offset := int(binary.BigEndian.Uint32(metadata[common.AddressLength+1 : common.AddressLength+5])) // four bytes for the account data offset + + // TODO(rj493456442) optimize it with partial read + data := rawdb.ReadStateAccountHistory(r.freezer, historyID) + if len(data) < length+offset { + return nil, fmt.Errorf("account data is truncated, address: %#x, historyID: %d", address, historyID) + } + return data[offset : offset+length], nil +} + +// readStorage retrieves the storage slot data from the specified state history. +func (r *historyReader) readStorage(address common.Address, storageKey common.Hash, storageHash common.Hash, historyID uint64) ([]byte, error) { + metadata, err := r.readAccountMetadata(address, historyID) + if err != nil { + return nil, err + } + // slotIndexOffset: + // The offset of storage indices associated with the specified account. + // slotIndexNumber: + // The number of storage indices associated with the specified account. + slotIndexOffset := int(binary.BigEndian.Uint32(metadata[common.AddressLength+5 : common.AddressLength+9])) + slotIndexNumber := int(binary.BigEndian.Uint32(metadata[common.AddressLength+9 : common.AddressLength+13])) + + slotMetadata, err := r.readStorageMetadata(storageKey, storageHash, historyID, slotIndexOffset, slotIndexNumber) + if err != nil { + return nil, err + } + length := int(slotMetadata[common.HashLength]) // one byte for slot data length + offset := int(binary.BigEndian.Uint32(slotMetadata[common.HashLength+1 : common.HashLength+5])) // four bytes for slot data offset + + // TODO(rj493456442) optimize it with partial read + data := rawdb.ReadStateStorageHistory(r.freezer, historyID) + if len(data) < offset+length { + return nil, errors.New("corrupted storage data") + } + return data[offset : offset+length], nil +} + +// read retrieves the state element data associated with the stateID. +// stateID: represents the ID of the state of the specified version; +// lastID: represents the ID of the latest/newest state history; +// latestValue: represents the state value at the current disk layer with ID == lastID; +func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint64, latestValue []byte) ([]byte, error) { + tail, err := r.freezer.Tail() + if err != nil { + return nil, err + } + // stateID == tail is allowed, as the first history object preserved + // is tail+1 + if stateID < tail { + return nil, errors.New("historical state has been pruned") + } + lastIndexedID := rawdb.ReadLastStateHistoryIndex(r.disk) + + // To serve the request, all state histories from stateID+1 to lastID + // must be indexed + if lastIndexedID == nil || *lastIndexedID < lastID { + indexed := "null" + if lastIndexedID != nil { + indexed = fmt.Sprintf("%d", *lastIndexedID) + } + return nil, fmt.Errorf("state history is not fully indexed, requested: %d, indexed: %s", stateID, indexed) + } + + // Construct the index reader to locate the corresponding history for + // state retrieval + ir, ok := r.readers[state.String()] + if !ok { + ir, err = newIndexReaderWithLimitTag(r.disk, state.stateIdent) + if err != nil { + return nil, err + } + r.readers[state.String()] = ir + } + historyID, err := ir.readGreaterThan(stateID, lastID) + if err != nil { + return nil, err + } + // The state was not found in the state histories, as it has not been modified + // since stateID. Use the data from the associated disk layer instead. + if historyID == math.MaxUint64 { + return latestValue, nil + } + if state.account { + return r.readAccount(state.address, historyID) + } + return r.readStorage(state.address, state.storageKey, state.storageHash, historyID) +} diff --git a/triedb/pathdb/history_reader_test.go b/triedb/pathdb/history_reader_test.go new file mode 100644 index 000000000000..7e2489e85d6a --- /dev/null +++ b/triedb/pathdb/history_reader_test.go @@ -0,0 +1,159 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see = db.tree.bottom().stateID() { + return + } + time.Sleep(100 * time.Millisecond) + } +} + +func checkHistoricState(env *tester, root common.Hash, hr *historyReader) error { + // Short circuit if the historical state is no longer available + if rawdb.ReadStateID(env.db.diskdb, root) == nil { + return nil + } + var ( + dl = env.db.tree.bottom() + stateID = rawdb.ReadStateID(env.db.diskdb, root) + accounts = env.snapAccounts[root] + storages = env.snapStorages[root] + ) + for addrHash, accountData := range accounts { + latest, _ := dl.account(addrHash, 0) + blob, err := hr.read(newAccountIdentQuery(env.accountPreimage(addrHash)), *stateID, dl.stateID(), latest) + if err != nil { + return err + } + if !bytes.Equal(accountData, blob) { + return fmt.Errorf("wrong account data, expected %x, got %x", accountData, blob) + } + } + for i := 0; i < len(env.roots); i++ { + if env.roots[i] == root { + break + } + // Find all accounts deleted in the past, ensure the associated data is null + for addrHash := range env.snapAccounts[env.roots[i]] { + if _, ok := accounts[addrHash]; !ok { + latest, _ := dl.account(addrHash, 0) + blob, err := hr.read(newAccountIdentQuery(env.accountPreimage(addrHash)), *stateID, dl.stateID(), latest) + if err != nil { + return err + } + if len(blob) != 0 { + return fmt.Errorf("wrong account data, expected null, got %x", blob) + } + } + } + } + for addrHash, slots := range storages { + for slotHash, slotData := range slots { + latest, _ := dl.storage(addrHash, slotHash, 0) + blob, err := hr.read(newStorageIdentQuery(env.accountPreimage(addrHash), env.hashPreimage(slotHash), slotHash), *stateID, dl.stateID(), latest) + if err != nil { + return err + } + if !bytes.Equal(slotData, blob) { + return fmt.Errorf("wrong storage data, expected %x, got %x", slotData, blob) + } + } + } + for i := 0; i < len(env.roots); i++ { + if env.roots[i] == root { + break + } + // Find all storage slots deleted in the past, ensure the associated data is null + for addrHash, slots := range env.snapStorages[env.roots[i]] { + for slotHash := range slots { + _, ok := storages[addrHash] + if ok { + _, ok = storages[addrHash][slotHash] + } + if !ok { + latest, _ := dl.storage(addrHash, slotHash, 0) + blob, err := hr.read(newStorageIdentQuery(env.accountPreimage(addrHash), env.hashPreimage(slotHash), slotHash), *stateID, dl.stateID(), latest) + if err != nil { + return err + } + if len(blob) != 0 { + return fmt.Errorf("wrong storage data, expected null, got %x", blob) + } + } + } + } + } + return nil +} + +func TestHistoryReader(t *testing.T) { + testHistoryReader(t, 0) // with all histories reserved + testHistoryReader(t, 10) // with latest 10 histories reserved +} + +func testHistoryReader(t *testing.T, historyLimit uint64) { + maxDiffLayers = 4 + defer func() { + maxDiffLayers = 128 + }() + //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) + + env := newTester(t, historyLimit, false, 64) + defer env.release() + waitIndexing(env.db) + + var ( + roots = env.roots + dRoot = env.db.tree.bottom().rootHash() + hr = newHistoryReader(env.db.diskdb, env.db.freezer) + ) + for _, root := range roots { + if root == dRoot { + break + } + if err := checkHistoricState(env, root, hr); err != nil { + t.Fatal(err) + } + } + + // Pile up more histories on top, ensuring the historic reader is not affected + env.extend(4) + waitIndexing(env.db) + + for _, root := range roots { + if root == dRoot { + break + } + if err := checkHistoricState(env, root, hr); err != nil { + t.Fatal(err) + } + } +} diff --git a/triedb/pathdb/metrics.go b/triedb/pathdb/metrics.go index 6d40c5713b4f..779f9d813ffe 100644 --- a/triedb/pathdb/metrics.go +++ b/triedb/pathdb/metrics.go @@ -73,8 +73,14 @@ var ( historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil) historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil) + indexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/index/time", nil) + unindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/unindex/time", nil) + lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil) lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil) + + historicalAccountReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/account/reads", nil) + historicalStorageReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/storage/reads", nil) ) // Metrics in generation diff --git a/triedb/pathdb/reader.go b/triedb/pathdb/reader.go index bc72db34e34c..e53f364b2680 100644 --- a/triedb/pathdb/reader.go +++ b/triedb/pathdb/reader.go @@ -19,10 +19,13 @@ package pathdb import ( "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/triedb/database" @@ -192,3 +195,109 @@ func (db *Database) StateReader(root common.Hash) (database.StateReader, error) layer: layer, }, nil } + +// HistoricalStateReader is a wrapper over history reader, providing access to +// historical state. +type HistoricalStateReader struct { + db *Database + reader *historyReader + id uint64 +} + +// HistoricReader constructs a reader for accessing the requested historic state. +func (db *Database) HistoricReader(root common.Hash) (*HistoricalStateReader, error) { + // Bail out if the state history hasn't been fully indexed + if db.indexer == nil || !db.indexer.inited() { + return nil, errors.New("state histories haven't been fully indexed yet") + } + // States older than current disk layer (disk layer is included) are available + // for accessing. + id := rawdb.ReadStateID(db.diskdb, root) + if id == nil { + return nil, fmt.Errorf("state %#x is not available", root) + } + return &HistoricalStateReader{ + id: *id, + db: db, + reader: newHistoryReader(db.diskdb, db.freezer), + }, nil +} + +// AccountRLP directly retrieves the account RLP associated with a particular +// address in the slim data format. An error will be returned if the read +// operation exits abnormally. Specifically, if the layer is already stale. +// +// Note: +// - the returned account is not a copy, please don't modify it. +// - no error will be returned if the requested account is not found in database. +func (r *HistoricalStateReader) AccountRLP(address common.Address) ([]byte, error) { + defer func(start time.Time) { + historicalAccountReadTimer.UpdateSince(start) + }(time.Now()) + + // TODO(rjl493456442): Theoretically, the obtained disk layer could become stale + // within a very short time window. + // + // While reading the account data while holding `db.tree.lock` can resolve + // this issue, but it will introduce a heavy contention over the lock. + // + // Let's optimistically assume the situation is very unlikely to happen, + // and try to define a low granularity lock if the current approach doesn't + // work later. + dl := r.db.tree.bottom() + latest, err := dl.account(crypto.Keccak256Hash(address.Bytes()), 0) + if err != nil { + return nil, err + } + return r.reader.read(newAccountIdentQuery(address), r.id, dl.stateID(), latest) +} + +// Account directly retrieves the account associated with a particular address in +// the slim data format. An error will be returned if the read operation exits +// abnormally. Specifically, if the layer is already stale. +// +// No error will be returned if the requested account is not found in database +func (r *HistoricalStateReader) Account(address common.Address) (*types.SlimAccount, error) { + blob, err := r.AccountRLP(address) + if err != nil { + return nil, err + } + if len(blob) == 0 { + return nil, nil + } + account := new(types.SlimAccount) + if err := rlp.DecodeBytes(blob, account); err != nil { + panic(err) + } + return account, nil +} + +// Storage directly retrieves the storage data associated with a particular key, +// within a particular account. An error will be returned if the read operation +// exits abnormally. Specifically, if the layer is already stale. +// +// Note: +// - the returned storage data is not a copy, please don't modify it. +// - no error will be returned if the requested slot is not found in database. +func (r *HistoricalStateReader) Storage(address common.Address, key common.Hash) ([]byte, error) { + defer func(start time.Time) { + historicalStorageReadTimer.UpdateSince(start) + }(time.Now()) + + // TODO(rjl493456442): Theoretically, the obtained disk layer could become stale + // within a very short time window. + // + // While reading the account data while holding `db.tree.lock` can resolve + // this issue, but it will introduce a heavy contention over the lock. + // + // Let's optimistically assume the situation is very unlikely to happen, + // and try to define a low granularity lock if the current approach doesn't + // work later. + dl := r.db.tree.bottom() + keyHash := crypto.Keccak256Hash(key.Bytes()) + latest, err := dl.storage(crypto.Keccak256Hash(address.Bytes()), keyHash, 0) + if err != nil { + return nil, err + } + return r.reader.read(newStorageIdentQuery(address, key, keyHash), r.id, dl.stateID(), latest) +} From ec4e8037277a863b86ef463fcf9809ef7114920a Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 29 May 2025 12:26:11 +0800 Subject: [PATCH 02/17] triedb/pathdb: only enable history indexing in archive mode --- triedb/pathdb/database.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index c694775ae3da..bb8e7bc0a714 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -114,12 +114,13 @@ type layer interface { // Config contains the settings for database. type Config struct { - StateHistory uint64 // Number of recent blocks to maintain state history for - TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie nodes - StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data - WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer - ReadOnly bool // Flag whether the database is opened in read only mode - SnapshotNoBuild bool // Flag Whether the background generation is allowed + StateHistory uint64 // Number of recent blocks to maintain state history for + EnableStateIndexing bool // Whether to enable state history indexing for external state access + TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie nodes + StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data + WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer + ReadOnly bool // Flag whether the database is opened in read only mode + SnapshotNoBuild bool // Flag Whether the background generation is allowed } // sanitize checks the provided user configurations and changes anything that's @@ -145,7 +146,12 @@ func (c *Config) fields() []interface{} { list = append(list, "triecache", common.StorageSize(c.TrieCleanSize)) list = append(list, "statecache", common.StorageSize(c.StateCleanSize)) list = append(list, "buffer", common.StorageSize(c.WriteBufferSize)) - list = append(list, "history", c.StateHistory) + + if c.StateHistory == 0 { + list = append(list, "history", "entire chain") + } else { + list = append(list, "history", fmt.Sprintf("last %d blocks", c.StateHistory)) + } return list } @@ -260,8 +266,9 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database { log.Crit("Failed to setup the generator", "err", err) } // TODO (rjl493456442) disable the background indexing in read-only mode - if db.freezer != nil { + if db.freezer != nil && db.config.EnableStateIndexing { db.indexer = newHistoryIndexer(db.diskdb, db.freezer, db.tree.bottom().stateID()) + log.Info("Enabled state history indexing") } fields := config.fields() if db.isVerkle { From 66cbec9e23b7f113aaf45f134d46f595e532205d Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 29 May 2025 12:49:22 +0800 Subject: [PATCH 03/17] core/rawdb: track the metadata key --- core/rawdb/database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 7f6c3808adaa..f87b931e5553 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -574,7 +574,7 @@ var knownMetadataKeys = [][]byte{ snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey, persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey, - filterMapsRangeKey, + filterMapsRangeKey, headStateHistoryIndexKey, } // printChainMetadata prints out chain metadata to stderr. From 884fba95e79fe321d388ff73ee5b9b309da2ebdb Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 29 May 2025 15:22:34 +0800 Subject: [PATCH 04/17] triedb/pathdb: update comment --- triedb/pathdb/history_indexer.go | 3 +++ triedb/pathdb/history_reader.go | 3 ++- triedb/pathdb/reader.go | 15 +++++++++++++-- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index ade4f1a41cf8..0a7d76f98926 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -479,6 +479,9 @@ func (i *historyIndexer) close() { i.initer.close() } +// inited returns a flag indicating whether the existing state histories +// have been fully indexed, in other words, whether they are available +// for external access. func (i *historyIndexer) inited() bool { return i.initer.inited() } diff --git a/triedb/pathdb/history_reader.go b/triedb/pathdb/history_reader.go index b7aaf5e34f80..259540e8cf3e 100644 --- a/triedb/pathdb/history_reader.go +++ b/triedb/pathdb/history_reader.go @@ -311,7 +311,8 @@ func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint6 lastIndexedID := rawdb.ReadLastStateHistoryIndex(r.disk) // To serve the request, all state histories from stateID+1 to lastID - // must be indexed + // must be indexed. It's not supposed to happen unless system is very + // wrong. if lastIndexedID == nil || *lastIndexedID < lastID { indexed := "null" if lastIndexedID != nil { diff --git a/triedb/pathdb/reader.go b/triedb/pathdb/reader.go index e53f364b2680..b1a7fdd92681 100644 --- a/triedb/pathdb/reader.go +++ b/triedb/pathdb/reader.go @@ -210,8 +210,19 @@ func (db *Database) HistoricReader(root common.Hash) (*HistoricalStateReader, er if db.indexer == nil || !db.indexer.inited() { return nil, errors.New("state histories haven't been fully indexed yet") } - // States older than current disk layer (disk layer is included) are available - // for accessing. + if db.freezer == nil { + return nil, errors.New("state histories are not available") + } + // States at the current disk layer or above are directly accessible via + // db.StateReader. + // + // States older than the current disk layer (including the disk layer + // itself) are available through historic state access. + // + // Note: the requested state may refer to a stale historic state that has + // already been pruned. This function does not validate availability, as + // underlying states may be pruned dynamically. Validity is checked during + // each actual state retrieval. id := rawdb.ReadStateID(db.diskdb, root) if id == nil { return nil, fmt.Errorf("state %#x is not available", root) From dd76ad2bab3a6a414c593db03fc97ec8f0f7fc78 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 29 May 2025 17:10:31 +0800 Subject: [PATCH 05/17] cmd/utils: remove the archive restriction on path --- cmd/utils/flags.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 81c4172a53d2..301daf550e8a 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1646,11 +1646,6 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.TransactionHistory = 0 log.Warn("Disabled transaction unindexing for archive node") } - - if cfg.StateScheme != rawdb.HashScheme { - cfg.StateScheme = rawdb.HashScheme - log.Warn("Forcing hash state-scheme for archive mode") - } } if ctx.IsSet(LogHistoryFlag.Name) { cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name) From 08c0094c921d67af99c60a81fb2cd92878a47513 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 29 May 2025 17:14:48 +0800 Subject: [PATCH 06/17] core: enable history indexing in archive mode --- core/blockchain.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 64345bc1a351..c393452e7f95 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -183,9 +183,10 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config { } if c.StateScheme == rawdb.PathScheme { config.PathDB = &pathdb.Config{ - StateHistory: c.StateHistory, - TrieCleanSize: c.TrieCleanLimit * 1024 * 1024, - StateCleanSize: c.SnapshotLimit * 1024 * 1024, + StateHistory: c.StateHistory, + EnableStateIndexing: c.TrieDirtyDisabled, + TrieCleanSize: c.TrieCleanLimit * 1024 * 1024, + StateCleanSize: c.SnapshotLimit * 1024 * 1024, // TODO(rjl493456442): The write buffer represents the memory limit used // for flushing both trie data and state data to disk. The config name From 669cde5154580c61cc647fef87b0981d0e8efa7f Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 29 May 2025 20:28:37 +0800 Subject: [PATCH 07/17] triedb/pathdb: add logs --- triedb/pathdb/history_indexer.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index 0a7d76f98926..2bdbd3f1ca55 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -305,6 +305,7 @@ func (i *indexIniter) run(lastID uint64) { if signal.newLastID == lastID+1 { lastID = signal.newLastID signal.result <- nil + log.Debug("Extended state history range", "last", lastID) continue } // The index limit is shortened by one, interrupt the current background @@ -315,22 +316,26 @@ func (i *indexIniter) run(lastID uint64) { // If all state histories, including the one to be reverted, have // been fully indexed, unindex it here and shut down the initializer. if checkDone() { + log.Info("Truncate the extra history", "id", lastID) if err := unindexSingle(lastID, i.disk, i.freezer); err != nil { signal.result <- err return } close(i.done) signal.result <- nil + log.Info("State histories have been fully indexed", "last", lastID-1) return } // Adjust the indexing target and relaunch the process lastID = signal.newLastID done, interrupt = make(chan struct{}), new(atomic.Int32) go i.index(done, interrupt, lastID) + log.Debug("Shortened state history range", "last", lastID) case <-done: if checkDone() { close(i.done) + log.Info("State histories have been fully indexed", "last", lastID) return } // Relaunch the background runner if some tasks are left @@ -361,10 +366,12 @@ func (i *indexIniter) next() (uint64, error) { // Start indexing from scratch if nothing has been indexed lastIndexed := rawdb.ReadLastStateHistoryIndex(i.disk) if lastIndexed == nil { + log.Debug("Initialize state history indexing from scratch", "id", tailID) return tailID, nil } // Resume indexing from the last interrupted position if *lastIndexed+1 >= tailID { + log.Debug("Resume state history indexing", "id", *lastIndexed+1, "tail", tailID) return *lastIndexed + 1, nil } // History has been shortened without indexing. Discard the gapped segment @@ -389,6 +396,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID // when the state is reverted manually (chain.SetHead) or the deep reorg is // encountered. In such cases, no indexing should be scheduled. if beginID > lastID { + log.Debug("State history is fully indexed", "last", lastID) return } log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) From e97b54548d02e71b412f8569abe3da4f3fbb2ec0 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Fri, 30 May 2025 10:24:13 +0800 Subject: [PATCH 08/17] triedb/pathdb: fix tests --- triedb/pathdb/database_test.go | 25 +++++++++++++------------ triedb/pathdb/history_reader_test.go | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 9f681d308bb5..2fd10dd08783 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -121,14 +121,15 @@ type tester struct { snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte // Keyed by the hash of account address and the hash of storage key } -func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int) *tester { +func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int, enableIndex bool) *tester { var ( disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false) db = New(disk, &Config{ - StateHistory: historyLimit, - TrieCleanSize: 256 * 1024, - StateCleanSize: 256 * 1024, - WriteBufferSize: 256 * 1024, + StateHistory: historyLimit, + EnableStateIndexing: enableIndex, + TrieCleanSize: 256 * 1024, + StateCleanSize: 256 * 1024, + WriteBufferSize: 256 * 1024, }, isVerkle) obj = &tester{ @@ -464,7 +465,7 @@ func TestDatabaseRollback(t *testing.T) { }() // Verify state histories - tester := newTester(t, 0, false, 32) + tester := newTester(t, 0, false, 32, false) defer tester.release() if err := tester.verifyHistory(); err != nil { @@ -498,7 +499,7 @@ func TestDatabaseRecoverable(t *testing.T) { }() var ( - tester = newTester(t, 0, false, 12) + tester = newTester(t, 0, false, 12, false) index = tester.bottomIndex() ) defer tester.release() @@ -542,7 +543,7 @@ func TestDisable(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 32) + tester := newTester(t, 0, false, 32, false) defer tester.release() stored := crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)) @@ -584,7 +585,7 @@ func TestCommit(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 12) + tester := newTester(t, 0, false, 12, false) defer tester.release() if err := tester.db.Commit(tester.lastHash(), false); err != nil { @@ -614,7 +615,7 @@ func TestJournal(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 12) + tester := newTester(t, 0, false, 12, false) defer tester.release() if err := tester.db.Journal(tester.lastHash()); err != nil { @@ -644,7 +645,7 @@ func TestCorruptedJournal(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 12) + tester := newTester(t, 0, false, 12, false) defer tester.release() if err := tester.db.Journal(tester.lastHash()); err != nil { @@ -692,7 +693,7 @@ func TestTailTruncateHistory(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 10, false, 12) + tester := newTester(t, 10, false, 12, false) defer tester.release() tester.db.Close() diff --git a/triedb/pathdb/history_reader_test.go b/triedb/pathdb/history_reader_test.go index 7e2489e85d6a..04bf4f643a70 100644 --- a/triedb/pathdb/history_reader_test.go +++ b/triedb/pathdb/history_reader_test.go @@ -126,7 +126,7 @@ func testHistoryReader(t *testing.T, historyLimit uint64) { }() //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) - env := newTester(t, historyLimit, false, 64) + env := newTester(t, historyLimit, false, 64, true) defer env.release() waitIndexing(env.db) From 4d019101ce6253196b33f34689814c91d23b6188 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 2 Jun 2025 09:38:36 +0800 Subject: [PATCH 09/17] triedb/pathdb: use uint16 for entries field --- triedb/pathdb/history_index_block.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/triedb/pathdb/history_index_block.go b/triedb/pathdb/history_index_block.go index 854029904c4d..5551302682a0 100644 --- a/triedb/pathdb/history_index_block.go +++ b/triedb/pathdb/history_index_block.go @@ -25,7 +25,7 @@ import ( ) const ( - indexBlockDescSize = 24 // The size of index block descriptor + indexBlockDescSize = 22 // The size of index block descriptor indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block indexBlockRestartLen = 256 // The restart interval length of index block historyIndexBatch = 65536 // The number of state histories for constructing or deleting indexes together @@ -37,7 +37,7 @@ const ( type indexBlockDesc struct { min uint64 // The minimum state ID retained within the block max uint64 // The maximum state ID retained within the block - entries uint32 // The number of state mutation records retained within the block + entries uint16 // The number of state mutation records retained within the block id uint32 // The id of the index block } @@ -61,8 +61,8 @@ func (d *indexBlockDesc) encode() []byte { var buf [indexBlockDescSize]byte binary.BigEndian.PutUint64(buf[:8], d.min) binary.BigEndian.PutUint64(buf[8:16], d.max) - binary.BigEndian.PutUint32(buf[16:20], d.entries) - binary.BigEndian.PutUint32(buf[20:24], d.id) + binary.BigEndian.PutUint16(buf[16:18], d.entries) + binary.BigEndian.PutUint32(buf[18:22], d.id) return buf[:] } @@ -70,8 +70,8 @@ func (d *indexBlockDesc) encode() []byte { func (d *indexBlockDesc) decode(blob []byte) { d.min = binary.BigEndian.Uint64(blob[:8]) d.max = binary.BigEndian.Uint64(blob[8:16]) - d.entries = binary.BigEndian.Uint32(blob[16:20]) - d.id = binary.BigEndian.Uint32(blob[20:24]) + d.entries = binary.BigEndian.Uint16(blob[16:18]) + d.id = binary.BigEndian.Uint32(blob[18:22]) } // parseIndexBlock parses the index block with the supplied byte stream. From 59a9ef560b2a225318f448f8965b6f92ef759f43 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 2 Jun 2025 10:21:53 +0800 Subject: [PATCH 10/17] triedb/pathdb: introduce metadata and version tracking --- core/rawdb/accessors_history.go | 32 +++++------ triedb/pathdb/database.go | 8 +-- triedb/pathdb/history_index_test.go | 8 +-- triedb/pathdb/history_indexer.go | 84 +++++++++++++++++++++------- triedb/pathdb/history_reader.go | 20 +++---- triedb/pathdb/history_reader_test.go | 4 +- 6 files changed, 96 insertions(+), 60 deletions(-) diff --git a/core/rawdb/accessors_history.go b/core/rawdb/accessors_history.go index 8940a7001327..8fbec95faaa7 100644 --- a/core/rawdb/accessors_history.go +++ b/core/rawdb/accessors_history.go @@ -17,35 +17,29 @@ package rawdb import ( - "encoding/binary" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" ) -// ReadLastStateHistoryIndex retrieves the number of latest indexed state history. -func ReadLastStateHistoryIndex(db ethdb.KeyValueReader) *uint64 { +// ReadStateHistoryIndexMetadata retrieves the metadata of state history index. +func ReadStateHistoryIndexMetadata(db ethdb.KeyValueReader) []byte { data, _ := db.Get(headStateHistoryIndexKey) - if len(data) != 8 { - return nil - } - number := binary.BigEndian.Uint64(data) - return &number + return data } -// WriteLastStateHistoryIndex stores the number of latest indexed state history +// WriteStateHistoryIndexMetadata stores the metadata of state history index // into database. -func WriteLastStateHistoryIndex(db ethdb.KeyValueWriter, number uint64) { - if err := db.Put(headStateHistoryIndexKey, encodeBlockNumber(number)); err != nil { - log.Crit("Failed to store the state index tail", "err", err) +func WriteStateHistoryIndexMetadata(db ethdb.KeyValueWriter, blob []byte) { + if err := db.Put(headStateHistoryIndexKey, blob); err != nil { + log.Crit("Failed to store the metadata of state history index", "err", err) } } -// DeleteLastStateHistoryIndex removes the number of latest indexed state history. -func DeleteLastStateHistoryIndex(db ethdb.KeyValueWriter) { +// DeleteStateHistoryIndexMetadata removes the metadata of state history index. +func DeleteStateHistoryIndexMetadata(db ethdb.KeyValueWriter) { if err := db.Delete(headStateHistoryIndexKey); err != nil { - log.Crit("Failed to delete the state index tail", "err", err) + log.Crit("Failed to delete the metadata of state history index", "err", err) } } @@ -158,12 +152,12 @@ func increaseKey(key []byte) []byte { return nil } -// DeleteHistoryIndex completely removes all history indexing data, including indexes -// for accounts and storages. +// DeleteStateHistoryIndex completely removes all history indexing data, including +// indexes for accounts and storages. // // Note, this method assumes the storage space with prefix `StateHistoryIndexPrefix` // is exclusively occupied by the history indexing data! -func DeleteHistoryIndex(db ethdb.KeyValueRangeDeleter) { +func DeleteStateHistoryIndex(db ethdb.KeyValueRangeDeleter) { if err := db.DeleteRange(StateHistoryIndexPrefix, increaseKey(StateHistoryIndexPrefix)); err != nil { log.Crit("Failed to delete history index range", "err", err) } diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index bb8e7bc0a714..f5edcc43b785 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -310,8 +310,8 @@ func (db *Database) repairHistory() error { // TODO(rjl493456442) would be better to group them into a batch. // // Purge all state history indexing data first - rawdb.DeleteLastStateHistoryIndex(db.diskdb) - rawdb.DeleteHistoryIndex(db.diskdb) + rawdb.DeleteStateHistoryIndexMetadata(db.diskdb) + rawdb.DeleteStateHistoryIndex(db.diskdb) err := db.freezer.Reset() if err != nil { log.Crit("Failed to reset state histories", "err", err) @@ -497,8 +497,8 @@ func (db *Database) Enable(root common.Hash) error { // TODO(rjl493456442) would be better to group them into a batch. // // Purge all state history indexing data first - rawdb.DeleteLastStateHistoryIndex(db.diskdb) - rawdb.DeleteHistoryIndex(db.diskdb) + rawdb.DeleteStateHistoryIndexMetadata(db.diskdb) + rawdb.DeleteStateHistoryIndex(db.diskdb) if err := db.freezer.Reset(); err != nil { return err } diff --git a/triedb/pathdb/history_index_test.go b/triedb/pathdb/history_index_test.go index b54194400e67..84f4a5cd41cc 100644 --- a/triedb/pathdb/history_index_test.go +++ b/triedb/pathdb/history_index_test.go @@ -189,8 +189,8 @@ func TestBatchIndexerWrite(t *testing.T) { if err := batch.finish(true); err != nil { t.Fatalf("Failed to finish batch indexer, %v", err) } - indexed := rawdb.ReadLastStateHistoryIndex(db) - if indexed == nil || *indexed != uint64(10) { + metadata := loadIndexMetadata(db) + if metadata == nil || metadata.Last != uint64(10) { t.Fatal("Unexpected index position") } var ( @@ -278,8 +278,8 @@ func TestBatchIndexerDelete(t *testing.T) { t.Fatalf("Failed to finish batch indexer, %v", err) } - indexed := rawdb.ReadLastStateHistoryIndex(db) - if indexed != nil { + metadata := loadIndexMetadata(db) + if metadata != nil { t.Fatal("Unexpected index position") } it := db.NewIterator(rawdb.StateHistoryIndexPrefix, nil) diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index 2bdbd3f1ca55..8e3f187c99da 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -28,10 +28,44 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" ) -// The batch size for reading state histories -const historyReadBatch = 1000 +const ( + // The batch size for reading state histories + historyReadBatch = 1000 + + stateIndexV0 = uint8(0) // initial version of state index structure + stateIndexVersion = stateIndexV0 // the current state index version +) + +type indexMetadata struct { + Version uint8 + Last uint64 +} + +func loadIndexMetadata(db ethdb.KeyValueReader) *indexMetadata { + blob := rawdb.ReadStateHistoryIndexMetadata(db) + if len(blob) == 0 { + return nil + } + var m indexMetadata + if err := rlp.DecodeBytes(blob, &m); err != nil { + return nil + } + return &m +} + +func storeIndexMetadata(db ethdb.KeyValueWriter, last uint64) { + var m indexMetadata + m.Version = stateIndexVersion + m.Last = last + blob, err := rlp.EncodeToBytes(m) + if err != nil { + log.Crit("Failed to encode index metadata", "err", err) + } + rawdb.WriteStateHistoryIndexMetadata(db, blob) +} // batchIndexer is a structure designed to perform batch indexing or unindexing // of state histories atomically. @@ -144,12 +178,12 @@ func (b *batchIndexer) finish(force bool) error { } // Update the position of last indexed state history if !b.delete { - rawdb.WriteLastStateHistoryIndex(batch, b.lastID) + storeIndexMetadata(batch, b.lastID) } else { if b.lastID == 1 { - rawdb.DeleteLastStateHistoryIndex(batch) + rawdb.DeleteStateHistoryIndexMetadata(batch) } else { - rawdb.WriteLastStateHistoryIndex(batch, b.lastID-1) + storeIndexMetadata(batch, b.lastID-1) } } if err := batch.Write(); err != nil { @@ -167,11 +201,11 @@ func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.Ancient indexHistoryTimer.UpdateSince(start) }(time.Now()) - indexed := rawdb.ReadLastStateHistoryIndex(db) - if indexed == nil || *indexed+1 != historyID { + metadata := loadIndexMetadata(db) + if metadata == nil || metadata.Last+1 != historyID { last := "null" - if indexed != nil { - last = fmt.Sprintf("%v", *indexed) + if metadata != nil { + last = fmt.Sprintf("%v", metadata.Last) } return fmt.Errorf("history indexing is out of order, last: %s, requested: %d", last, historyID) } @@ -196,11 +230,11 @@ func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.Ancie unindexHistoryTimer.UpdateSince(start) }(time.Now()) - indexed := rawdb.ReadLastStateHistoryIndex(db) - if indexed == nil || *indexed != historyID { + metadata := loadIndexMetadata(db) + if metadata == nil || metadata.Last != historyID { last := "null" - if indexed != nil { - last = fmt.Sprintf("%v", *indexed) + if metadata != nil { + last = fmt.Sprintf("%v", metadata.Last) } return fmt.Errorf("history unindexing is out of order, last: %s, requested: %d", last, historyID) } @@ -286,8 +320,8 @@ func (i *indexIniter) run(lastID uint64) { // checkDone indicates whether all requested state histories // have been fully indexed. checkDone = func() bool { - indexed := rawdb.ReadLastStateHistoryIndex(i.disk) - return indexed != nil && *indexed == lastID + metadata := loadIndexMetadata(i.disk) + return metadata != nil && metadata.Last == lastID } ) go i.index(done, interrupt, lastID) @@ -364,22 +398,22 @@ func (i *indexIniter) next() (uint64, error) { tailID := tail + 1 // compute the id of the oldest history // Start indexing from scratch if nothing has been indexed - lastIndexed := rawdb.ReadLastStateHistoryIndex(i.disk) - if lastIndexed == nil { + metadata := loadIndexMetadata(i.disk) + if metadata == nil { log.Debug("Initialize state history indexing from scratch", "id", tailID) return tailID, nil } // Resume indexing from the last interrupted position - if *lastIndexed+1 >= tailID { - log.Debug("Resume state history indexing", "id", *lastIndexed+1, "tail", tailID) - return *lastIndexed + 1, nil + if metadata.Last+1 >= tailID { + log.Debug("Resume state history indexing", "id", metadata.Last+1, "tail", tailID) + return metadata.Last + 1, nil } // History has been shortened without indexing. Discard the gapped segment // in the history and shift to the first available element. // // The missing indexes corresponding to the gapped histories won't be visible. // It's fine to leave them unindexed. - log.Info("History gap detected, discard old segment", "oldHead", *lastIndexed, "newHead", tailID) + log.Info("History gap detected, discard old segment", "oldHead", metadata.Last, "newHead", tailID) return tailID, nil } @@ -476,6 +510,14 @@ type historyIndexer struct { // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64) *historyIndexer { + // Purge the obsolete index data from the database. + metadata := loadIndexMetadata(disk) + if metadata != nil && metadata.Version != stateIndexVersion { + // TODO(rjl493456442) would be better to group them into a batch. + rawdb.DeleteStateHistoryIndexMetadata(disk) + rawdb.DeleteStateHistoryIndex(disk) + log.Info("Cleaned up obsolete state history index", "version", metadata.Version, "want", stateIndexVersion) + } return &historyIndexer{ initer: newIndexIniter(disk, freezer, lastHistoryID), disk: disk, diff --git a/triedb/pathdb/history_reader.go b/triedb/pathdb/history_reader.go index 259540e8cf3e..6ec5de63fa07 100644 --- a/triedb/pathdb/history_reader.go +++ b/triedb/pathdb/history_reader.go @@ -113,8 +113,8 @@ type indexReaderWithLimitTag struct { // newIndexReaderWithLimitTag constructs a index reader with indexing position. func newIndexReaderWithLimitTag(db ethdb.KeyValueReader, state stateIdent) (*indexReaderWithLimitTag, error) { // Read the last indexed ID before the index reader construction - indexed := rawdb.ReadLastStateHistoryIndex(db) - if indexed == nil { + metadata := loadIndexMetadata(db) + if metadata == nil { return nil, errors.New("state history hasn't been indexed yet") } r, err := newIndexReader(db, state) @@ -123,7 +123,7 @@ func newIndexReaderWithLimitTag(db ethdb.KeyValueReader, state stateIdent) (*ind } return &indexReaderWithLimitTag{ reader: r, - limit: *indexed, + limit: metadata.Last, db: db, }, nil } @@ -157,14 +157,14 @@ func (r *indexReaderWithLimitTag) readGreaterThan(id uint64, lastID uint64) (uin return res, nil } // Refresh the index reader and give another attempt - indexed := rawdb.ReadLastStateHistoryIndex(r.db) - if indexed == nil || *indexed < lastID { + metadata := loadIndexMetadata(r.db) + if metadata == nil || metadata.Last < lastID { return 0, errors.New("state history hasn't been indexed yet") } if err := r.reader.refresh(); err != nil { return 0, err } - r.limit = *indexed + r.limit = metadata.Last return r.reader.readGreaterThan(id) } @@ -308,15 +308,15 @@ func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint6 if stateID < tail { return nil, errors.New("historical state has been pruned") } - lastIndexedID := rawdb.ReadLastStateHistoryIndex(r.disk) // To serve the request, all state histories from stateID+1 to lastID // must be indexed. It's not supposed to happen unless system is very // wrong. - if lastIndexedID == nil || *lastIndexedID < lastID { + metadata := loadIndexMetadata(r.disk) + if metadata == nil || metadata.Last < lastID { indexed := "null" - if lastIndexedID != nil { - indexed = fmt.Sprintf("%d", *lastIndexedID) + if metadata != nil { + indexed = fmt.Sprintf("%d", metadata.Last) } return nil, fmt.Errorf("state history is not fully indexed, requested: %d, indexed: %s", stateID, indexed) } diff --git a/triedb/pathdb/history_reader_test.go b/triedb/pathdb/history_reader_test.go index 04bf4f643a70..f58263f618f9 100644 --- a/triedb/pathdb/history_reader_test.go +++ b/triedb/pathdb/history_reader_test.go @@ -28,8 +28,8 @@ import ( func waitIndexing(db *Database) { for { - id := rawdb.ReadLastStateHistoryIndex(db.diskdb) - if id != nil && *id >= db.tree.bottom().stateID() { + metadata := loadIndexMetadata(db.diskdb) + if metadata != nil && metadata.Last >= db.tree.bottom().stateID() { return } time.Sleep(100 * time.Millisecond) From 4625b43ee787bf42de7d07e937419dc3fabfb173 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 2 Jun 2025 10:38:05 +0800 Subject: [PATCH 11/17] triedb/pathdb: use uint16 for restart and uint8 for restartLen --- triedb/pathdb/history_index_block.go | 41 ++++++++++++++++------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/triedb/pathdb/history_index_block.go b/triedb/pathdb/history_index_block.go index 5551302682a0..d22629ef2309 100644 --- a/triedb/pathdb/history_index_block.go +++ b/triedb/pathdb/history_index_block.go @@ -85,15 +85,19 @@ func (d *indexBlockDesc) decode(blob []byte) { // | | | ChunkN | // | | +------------------+ // +-|---| Restart1 | -// | | Restart... | 4N bytes +// | | Restart... | 2N bytes // +---| RestartN | // +------------------+ -// | Restart count | 4 bytes +// | Restart count | 1 byte // +------------------+ // // - Chunk list: A list of data chunks -// - Restart list: A list of 4-byte pointers, each pointing to the start position of a chunk -// - Restart count: The number of restarts in the block, stored at the end of the block (4 bytes) +// - Restart list: A list of 2-byte pointers, each pointing to the start position of a chunk +// - Restart count: The number of restarts in the block, stored at the end of the block (1 byte) +// +// Note: the pointer is encoded as a uint16, which is sufficient within a chunk. +// A uint16 can cover offsets in the range [0, 65536), which is more than enough +// to store 4096 integers. // // Each chunk begins with the full value of the first integer, followed by // subsequent integers representing the differences between the current value @@ -111,26 +115,26 @@ func (d *indexBlockDesc) decode(blob []byte) { // +----------------+ // // Empty index block is regarded as invalid. -func parseIndexBlock(blob []byte) ([]uint32, []byte, error) { - if len(blob) < 4 { +func parseIndexBlock(blob []byte) ([]uint16, []byte, error) { + if len(blob) < 1 { return nil, nil, fmt.Errorf("corrupted index block, len: %d", len(blob)) } - restartLen := binary.BigEndian.Uint32(blob[len(blob)-4:]) + restartLen := blob[len(blob)-1] if restartLen == 0 { return nil, nil, errors.New("corrupted index block, no restart") } - tailLen := int(restartLen+1) * 4 + tailLen := int(restartLen)*2 + 1 if len(blob) < tailLen { return nil, nil, fmt.Errorf("truncated restarts, size: %d, restarts: %d", len(blob), restartLen) } - restarts := make([]uint32, 0, restartLen) - for i := restartLen; i > 0; i-- { - restart := binary.BigEndian.Uint32(blob[len(blob)-int(i+1)*4:]) + restarts := make([]uint16, 0, restartLen) + for i := int(restartLen); i > 0; i-- { + restart := binary.BigEndian.Uint16(blob[len(blob)-1-2*i:]) restarts = append(restarts, restart) } // Validate that restart points are strictly ordered and within the valid // data range. - var prev uint32 + var prev uint16 for i := 0; i < len(restarts); i++ { if i != 0 { if restarts[i] <= prev { @@ -147,7 +151,7 @@ func parseIndexBlock(blob []byte) ([]uint32, []byte, error) { // blockReader is the reader to access the element within a block. type blockReader struct { - restarts []uint32 + restarts []uint16 data []byte } @@ -222,7 +226,7 @@ func (br *blockReader) readGreaterThan(id uint64) (uint64, error) { type blockWriter struct { desc *indexBlockDesc // Descriptor of the block - restarts []uint32 // Offsets into the data slice, marking the start of each section + restarts []uint16 // Offsets into the data slice, marking the start of each section scratch []byte // Buffer used for encoding full integers or value differences data []byte // Aggregated encoded data slice } @@ -261,7 +265,7 @@ func (b *blockWriter) append(id uint64) error { if b.desc.entries%indexBlockRestartLen == 0 { // Save the offset within the data slice as the restart point // for the next section. - b.restarts = append(b.restarts, uint32(len(b.data))) + b.restarts = append(b.restarts, uint16(len(b.data))) // The restart point item can either be encoded in variable // size or fixed size. Although variable-size encoding is @@ -395,9 +399,10 @@ func (b *blockWriter) full() bool { // This function is safe to be called multiple times. func (b *blockWriter) finish() []byte { var buf []byte - for _, number := range append(b.restarts, uint32(len(b.restarts))) { - binary.BigEndian.PutUint32(b.scratch[:4], number) - buf = append(buf, b.scratch[:4]...) + for _, number := range b.restarts { + binary.BigEndian.PutUint16(b.scratch[:2], number) + buf = append(buf, b.scratch[:2]...) } + buf = append(buf, byte(len(b.restarts))) return append(b.data, buf...) } From e33f0d84364790836097b700455da0a44257486d Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 2 Jun 2025 11:00:59 +0800 Subject: [PATCH 12/17] triedb/pathdb: get rid of min field --- triedb/pathdb/history_index.go | 17 ++++++++------- triedb/pathdb/history_index_block.go | 25 ++++++++++------------- triedb/pathdb/history_index_block_test.go | 1 - 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/triedb/pathdb/history_index.go b/triedb/pathdb/history_index.go index ed8f1e67d9b7..0f79fe671cf7 100644 --- a/triedb/pathdb/history_index.go +++ b/triedb/pathdb/history_index.go @@ -37,7 +37,6 @@ func parseIndex(blob []byte) ([]*indexBlockDesc, error) { } var ( lastID uint32 - lastMax uint64 descList []*indexBlockDesc ) for i := 0; i < len(blob)/indexBlockDescSize; i++ { @@ -46,19 +45,23 @@ func parseIndex(blob []byte) ([]*indexBlockDesc, error) { if desc.empty() { return nil, errors.New("empty state history index block") } - if desc.min > desc.max { - return nil, fmt.Errorf("indexBlockDesc: min %d > max %d", desc.min, desc.max) - } if lastID != 0 { if lastID+1 != desc.id { return nil, fmt.Errorf("index block id is out of order, last-id: %d, this-id: %d", lastID, desc.id) } - if desc.min <= lastMax { + // Theoretically, order should be validated between consecutive index blocks, + // ensuring that elements within them are strictly ordered. However, since + // tracking the minimum element in each block has non-trivial storage overhead, + // this check is optimistically omitted. + // + // TODO(rjl493456442) the minimal element can be resolved from the index block, + // evaluate the check cost (mostly IO overhead). + + /* if desc.min <= lastMax { return nil, fmt.Errorf("index block range is out of order, last-max: %d, this-min: %d", lastMax, desc.min) - } + }*/ } lastID = desc.id - lastMax = desc.max descList = append(descList, &desc) } return descList, nil diff --git a/triedb/pathdb/history_index_block.go b/triedb/pathdb/history_index_block.go index d22629ef2309..930357f58749 100644 --- a/triedb/pathdb/history_index_block.go +++ b/triedb/pathdb/history_index_block.go @@ -25,7 +25,7 @@ import ( ) const ( - indexBlockDescSize = 22 // The size of index block descriptor + indexBlockDescSize = 14 // The size of index block descriptor indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block indexBlockRestartLen = 256 // The restart interval length of index block historyIndexBatch = 65536 // The number of state histories for constructing or deleting indexes together @@ -35,7 +35,6 @@ const ( // list of state mutation records associated with a specific state (either an // account or a storage slot). type indexBlockDesc struct { - min uint64 // The minimum state ID retained within the block max uint64 // The maximum state ID retained within the block entries uint16 // The number of state mutation records retained within the block id uint32 // The id of the index block @@ -59,19 +58,17 @@ func (d *indexBlockDesc) full() bool { // encode packs index block descriptor into byte stream. func (d *indexBlockDesc) encode() []byte { var buf [indexBlockDescSize]byte - binary.BigEndian.PutUint64(buf[:8], d.min) - binary.BigEndian.PutUint64(buf[8:16], d.max) - binary.BigEndian.PutUint16(buf[16:18], d.entries) - binary.BigEndian.PutUint32(buf[18:22], d.id) + binary.BigEndian.PutUint64(buf[0:8], d.max) + binary.BigEndian.PutUint16(buf[8:10], d.entries) + binary.BigEndian.PutUint32(buf[10:14], d.id) return buf[:] } // decode unpacks index block descriptor from byte stream. func (d *indexBlockDesc) decode(blob []byte) { - d.min = binary.BigEndian.Uint64(blob[:8]) - d.max = binary.BigEndian.Uint64(blob[8:16]) - d.entries = binary.BigEndian.Uint16(blob[16:18]) - d.id = binary.BigEndian.Uint32(blob[18:22]) + d.max = binary.BigEndian.Uint64(blob[:8]) + d.entries = binary.BigEndian.Uint16(blob[8:10]) + d.id = binary.BigEndian.Uint32(blob[10:14]) } // parseIndexBlock parses the index block with the supplied byte stream. @@ -287,9 +284,9 @@ func (b *blockWriter) append(id uint64) error { b.desc.entries++ // The state history ID must be greater than 0. - if b.desc.min == 0 { - b.desc.min = id - } + //if b.desc.min == 0 { + // b.desc.min = id + //} b.desc.max = id return nil } @@ -357,7 +354,7 @@ func (b *blockWriter) pop(id uint64) error { } // If there is only one entry left, the entire block should be reset if b.desc.entries == 1 { - b.desc.min = 0 + //b.desc.min = 0 b.desc.max = 0 b.desc.entries = 0 b.restarts = nil diff --git a/triedb/pathdb/history_index_block_test.go b/triedb/pathdb/history_index_block_test.go index 32bc3eda7358..173387b44781 100644 --- a/triedb/pathdb/history_index_block_test.go +++ b/triedb/pathdb/history_index_block_test.go @@ -155,7 +155,6 @@ func TestBlcokWriterDeleteWithData(t *testing.T) { // Re-construct the block writer with data desc := &indexBlockDesc{ id: 0, - min: 1, max: 20, entries: 5, } From 0c37548e680bf31fb612f8c4dd252424b7bebdf4 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 2 Jun 2025 12:28:06 +0800 Subject: [PATCH 13/17] triedb/pathdb: improve error handling --- triedb/pathdb/history_reader.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/triedb/pathdb/history_reader.go b/triedb/pathdb/history_reader.go index 6ec5de63fa07..486df7d86c78 100644 --- a/triedb/pathdb/history_reader.go +++ b/triedb/pathdb/history_reader.go @@ -156,7 +156,13 @@ func (r *indexReaderWithLimitTag) readGreaterThan(id uint64, lastID uint64) (uin if r.limit == lastID { return res, nil } - // Refresh the index reader and give another attempt + // Refresh the index reader and attempt again. If the latest indexed position + // is even below the ID of the disk layer, it indicates that state histories + // are being removed. In this case, it would theoretically be better to block + // the state rollback operation synchronously until all readers are released. + // Given that it's very unlikely to occur and users try to perform historical + // state queries while reverting the states at the same time. Simply returning + // an error should be sufficient for now. metadata := loadIndexMetadata(r.db) if metadata == nil || metadata.Last < lastID { return 0, errors.New("state history hasn't been indexed yet") @@ -189,8 +195,11 @@ func newHistoryReader(disk ethdb.KeyValueReader, freezer ethdb.AncientReader) *h // state history. func (r *historyReader) readAccountMetadata(address common.Address, historyID uint64) ([]byte, error) { blob := rawdb.ReadStateAccountIndex(r.freezer, historyID) + if len(blob) == 0 { + return nil, fmt.Errorf("account index is truncated, historyID: %d", historyID) + } if len(blob)%accountIndexSize != 0 { - return nil, fmt.Errorf("account index is corrupted, historyID: %d", historyID) + return nil, fmt.Errorf("account index is corrupted, historyID: %d, size: %d", historyID, len(blob)) } n := len(blob) / accountIndexSize @@ -213,11 +222,14 @@ func (r *historyReader) readAccountMetadata(address common.Address, historyID ui func (r *historyReader) readStorageMetadata(storageKey common.Hash, storageHash common.Hash, historyID uint64, slotOffset, slotNumber int) ([]byte, error) { // TODO(rj493456442) optimize it with partial read blob := rawdb.ReadStateStorageIndex(r.freezer, historyID) + if len(blob) == 0 { + return nil, fmt.Errorf("storage index is truncated, historyID: %d", historyID) + } if len(blob)%slotIndexSize != 0 { - return nil, fmt.Errorf("storage indices is corrupted, historyID: %d", historyID) + return nil, fmt.Errorf("storage indices is corrupted, historyID: %d, size: %d", historyID, len(blob)) } if slotIndexSize*(slotOffset+slotNumber) > len(blob) { - return nil, errors.New("out of slice") + return nil, fmt.Errorf("storage indices is truncated, historyID: %d, size: %d, offset: %d, length: %d", historyID, len(blob), slotOffset, slotNumber) } subSlice := blob[slotIndexSize*slotOffset : slotIndexSize*(slotOffset+slotNumber)] @@ -261,7 +273,7 @@ func (r *historyReader) readAccount(address common.Address, historyID uint64) ([ // TODO(rj493456442) optimize it with partial read data := rawdb.ReadStateAccountHistory(r.freezer, historyID) if len(data) < length+offset { - return nil, fmt.Errorf("account data is truncated, address: %#x, historyID: %d", address, historyID) + return nil, fmt.Errorf("account data is truncated, address: %#x, historyID: %d, size: %d, offset: %d, len: %d", address, historyID, len(data), offset, length) } return data[offset : offset+length], nil } @@ -289,7 +301,7 @@ func (r *historyReader) readStorage(address common.Address, storageKey common.Ha // TODO(rj493456442) optimize it with partial read data := rawdb.ReadStateStorageHistory(r.freezer, historyID) if len(data) < offset+length { - return nil, errors.New("corrupted storage data") + return nil, fmt.Errorf("storage data is truncated, address: %#x, key: %#x, historyID: %d, size: %d, offset: %d, len: %d", address, storageKey, historyID, len(data), offset, length) } return data[offset : offset+length], nil } @@ -340,6 +352,11 @@ func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint6 if historyID == math.MaxUint64 { return latestValue, nil } + // Resolve data from the specified state history object. Notably, since the history + // reader operates completely asynchronously with the indexer/unindexer, it's possible + // that the associated state histories are no longer available due to a rollback. + // Such truncation should be captured by the state resolver below, rather than returning + // invalid data. if state.account { return r.readAccount(state.address, historyID) } From c074cfa41f7c1e1bc2282fca6f2fff74d9732874 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 2 Jun 2025 14:30:47 +0800 Subject: [PATCH 14/17] triedb/pathdb: improve legacy data handling --- triedb/pathdb/history_indexer.go | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index 8e3f187c99da..e32550500fdd 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -51,6 +51,7 @@ func loadIndexMetadata(db ethdb.KeyValueReader) *indexMetadata { } var m indexMetadata if err := rlp.DecodeBytes(blob, &m); err != nil { + log.Error("Failed to decode index metadata", "err", err) return nil } return &m @@ -507,17 +508,32 @@ type historyIndexer struct { freezer ethdb.AncientStore } +// checkVersion checks whether the index data in the database matches the version. +func checkVersion(disk ethdb.KeyValueStore) { + blob := rawdb.ReadStateHistoryIndexMetadata(disk) + if len(blob) == 0 { + return + } + var m indexMetadata + err := rlp.DecodeBytes(blob, &m) + if err == nil && m.Version == stateIndexVersion { + return + } + // TODO(rjl493456442) would be better to group them into a batch. + rawdb.DeleteStateHistoryIndexMetadata(disk) + rawdb.DeleteStateHistoryIndex(disk) + + version := "unknown" + if err == nil { + version = fmt.Sprintf("%d", m.Version) + } + log.Info("Cleaned up obsolete state history index", "version", version, "want", stateIndexVersion) +} + // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64) *historyIndexer { - // Purge the obsolete index data from the database. - metadata := loadIndexMetadata(disk) - if metadata != nil && metadata.Version != stateIndexVersion { - // TODO(rjl493456442) would be better to group them into a batch. - rawdb.DeleteStateHistoryIndexMetadata(disk) - rawdb.DeleteStateHistoryIndex(disk) - log.Info("Cleaned up obsolete state history index", "version", metadata.Version, "want", stateIndexVersion) - } + checkVersion(disk) return &historyIndexer{ initer: newIndexIniter(disk, freezer, lastHistoryID), disk: disk, From ca2ebe2b624d6b018a1681a118930d2661f33119 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 2 Jun 2025 15:11:03 +0800 Subject: [PATCH 15/17] core/rawdb: fix range deletion --- core/rawdb/accessors_history.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/rawdb/accessors_history.go b/core/rawdb/accessors_history.go index 8fbec95faaa7..50512b7d1ad3 100644 --- a/core/rawdb/accessors_history.go +++ b/core/rawdb/accessors_history.go @@ -17,6 +17,8 @@ package rawdb import ( + "bytes" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" @@ -158,7 +160,9 @@ func increaseKey(key []byte) []byte { // Note, this method assumes the storage space with prefix `StateHistoryIndexPrefix` // is exclusively occupied by the history indexing data! func DeleteStateHistoryIndex(db ethdb.KeyValueRangeDeleter) { - if err := db.DeleteRange(StateHistoryIndexPrefix, increaseKey(StateHistoryIndexPrefix)); err != nil { + start := StateHistoryIndexPrefix + limit := increaseKey(bytes.Clone(StateHistoryIndexPrefix)) + if err := db.DeleteRange(start, limit); err != nil { log.Crit("Failed to delete history index range", "err", err) } } From be31bb9804872862a66d780e7f4429d183ccd53f Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 2 Jun 2025 15:34:30 +0800 Subject: [PATCH 16/17] triedb/pathdb: bump batch size --- triedb/pathdb/history_index_block.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/triedb/pathdb/history_index_block.go b/triedb/pathdb/history_index_block.go index 930357f58749..7b85667e9ddf 100644 --- a/triedb/pathdb/history_index_block.go +++ b/triedb/pathdb/history_index_block.go @@ -25,10 +25,10 @@ import ( ) const ( - indexBlockDescSize = 14 // The size of index block descriptor - indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block - indexBlockRestartLen = 256 // The restart interval length of index block - historyIndexBatch = 65536 // The number of state histories for constructing or deleting indexes together + indexBlockDescSize = 14 // The size of index block descriptor + indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block + indexBlockRestartLen = 256 // The restart interval length of index block + historyIndexBatch = 1_000_000 // The number of state history indexes for constructing or deleting as batch ) // indexBlockDesc represents a descriptor for an index block, which contains a From 71e283f41352c127d4fb5becc4a57b18f8a78755 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 5 Jun 2025 18:55:33 +0800 Subject: [PATCH 17/17] ethdb/pebble: track iterator number --- ethdb/pebble/pebble.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index 5c851af91015..e2e7b7a8418d 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -84,6 +84,7 @@ type Database struct { estimatedCompDebtGauge *metrics.Gauge // Gauge for tracking the number of bytes that need to be compacted liveCompGauge *metrics.Gauge // Gauge for tracking the number of in-progress compactions liveCompSizeGauge *metrics.Gauge // Gauge for tracking the size of in-progress compactions + liveIterGauge *metrics.Gauge // Gauge for tracking the number of live database iterators levelsGauge []*metrics.Gauge // Gauge for tracking the number of tables in levels quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag @@ -313,6 +314,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool) ( db.estimatedCompDebtGauge = metrics.GetOrRegisterGauge(namespace+"compact/estimateDebt", nil) db.liveCompGauge = metrics.GetOrRegisterGauge(namespace+"compact/live/count", nil) db.liveCompSizeGauge = metrics.GetOrRegisterGauge(namespace+"compact/live/size", nil) + db.liveIterGauge = metrics.GetOrRegisterGauge(namespace+"iter/count", nil) // Start up the metrics gathering and return go db.meter(metricsGatheringInterval, namespace) @@ -562,6 +564,7 @@ func (d *Database) meter(refresh time.Duration, namespace string) { d.seekCompGauge.Update(stats.Compact.ReadCount) d.liveCompGauge.Update(stats.Compact.NumInProgress) d.liveCompSizeGauge.Update(stats.Compact.InProgressBytes) + d.liveIterGauge.Update(stats.TableIters) d.liveMemTablesGauge.Update(stats.MemTable.Count) d.zombieMemTablesGauge.Update(stats.MemTable.ZombieCount)