Skip to content

Commit 053dcec

Browse files
s1narjl493456442
authored andcommitted
core: reduce load on txindexer from API (ethereum#31752)
Fixes ethereum#31732. This logic was removed in the recent refactoring in the txindexer to handle history cutoff (ethereum#31393). It was first introduced in this PR: ethereum#28908. I have tested it and it works as an alternative to ethereum#31745. This PR packs 3 changes to the flow of fetching txs from the API: - It caches the indexer tail after each run is over to avoid hitting the db all the time as was done originally in ethereum#28908. - Changes `backend.GetTransaction`. It doesn't return an error anymore when tx indexer is in progress. It shifts the responsibility to the caller to check the progress. The reason is that in most cases we anyway check the txpool for the tx. If it was indeed a pending tx we can avoid the indexer progress check. --------- Co-authored-by: Gary Rong <garyrong0905@gmail.com>
1 parent e8709e9 commit 053dcec

File tree

12 files changed

+150
-131
lines changed

12 files changed

+150
-131
lines changed

core/blockchain_reader.go

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -270,42 +270,20 @@ func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, max
270270
// GetTransactionLookup retrieves the lookup along with the transaction
271271
// itself associate with the given transaction hash.
272272
//
273-
// An error will be returned if the transaction is not found, and background
274-
// indexing for transactions is still in progress. The transaction might be
275-
// reachable shortly once it's indexed.
276-
//
277-
// A null will be returned in the transaction is not found and background
278-
// transaction indexing is already finished. The transaction is not existent
279-
// from the node's perspective.
280-
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLookupEntry, *types.Transaction, error) {
273+
// A null will be returned if the transaction is not found. This can be due to
274+
// the transaction indexer not being finished. The caller must explicitly check
275+
// the indexer progress.
276+
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLookupEntry, *types.Transaction) {
281277
bc.txLookupLock.RLock()
282278
defer bc.txLookupLock.RUnlock()
283279

284280
// Short circuit if the txlookup already in the cache, retrieve otherwise
285281
if item, exist := bc.txLookupCache.Get(hash); exist {
286-
return item.lookup, item.transaction, nil
282+
return item.lookup, item.transaction
287283
}
288284
tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash)
289285
if tx == nil {
290-
progress, err := bc.TxIndexProgress()
291-
if err != nil {
292-
// No error is returned if the transaction indexing progress is unreachable
293-
// due to unexpected internal errors. In such cases, it is impossible to
294-
// determine whether the transaction does not exist or has simply not been
295-
// indexed yet without a progress marker.
296-
//
297-
// In such scenarios, the transaction is treated as unreachable, though
298-
// this is clearly an unintended and unexpected situation.
299-
return nil, nil, nil
300-
}
301-
// The transaction indexing is not finished yet, returning an
302-
// error to explicitly indicate it.
303-
if !progress.Done() {
304-
return nil, nil, errors.New("transaction indexing still in progress")
305-
}
306-
// The transaction is already indexed, the transaction is either
307-
// not existent or not in the range of index, returning null.
308-
return nil, nil, nil
286+
return nil, nil
309287
}
310288
lookup := &rawdb.LegacyTxLookupEntry{
311289
BlockHash: blockHash,
@@ -316,7 +294,23 @@ func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLoo
316294
lookup: lookup,
317295
transaction: tx,
318296
})
319-
return lookup, tx, nil
297+
return lookup, tx
298+
}
299+
300+
// TxIndexDone returns true if the transaction indexer has finished indexing.
301+
func (bc *BlockChain) TxIndexDone() bool {
302+
progress, err := bc.TxIndexProgress()
303+
if err != nil {
304+
// No error is returned if the transaction indexing progress is unreachable
305+
// due to unexpected internal errors. In such cases, it is impossible to
306+
// determine whether the transaction does not exist or has simply not been
307+
// indexed yet without a progress marker.
308+
//
309+
// In such scenarios, the transaction is treated as unreachable, though
310+
// this is clearly an unintended and unexpected situation.
311+
return true
312+
}
313+
return progress.Done()
320314
}
321315

322316
// HasState checks if state trie is fully present in the database or not.
@@ -412,7 +406,7 @@ func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
412406
if bc.txIndexer == nil {
413407
return TxIndexProgress{}, errors.New("tx indexer is not enabled")
414408
}
415-
return bc.txIndexer.txIndexProgress()
409+
return bc.txIndexer.txIndexProgress(), nil
416410
}
417411

418412
// HistoryPruningCutoff returns the configured history pruning point.

core/txindexer.go

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package core
1818

1919
import (
20-
"errors"
2120
"fmt"
21+
"sync/atomic"
2222

2323
"github.com/ethereum/go-ethereum/common"
2424
"github.com/ethereum/go-ethereum/core/rawdb"
@@ -47,26 +47,38 @@ type txIndexer struct {
4747
// and all others shouldn't.
4848
limit uint64
4949

50+
// The current head of blockchain for transaction indexing. This field
51+
// is accessed by both the indexer and the indexing progress queries.
52+
head atomic.Uint64
53+
54+
// The current tail of the indexed transactions, null indicates
55+
// that no transactions have been indexed yet.
56+
//
57+
// This field is accessed by both the indexer and the indexing
58+
// progress queries.
59+
tail atomic.Pointer[uint64]
60+
5061
// cutoff denotes the block number before which the chain segment should
5162
// be pruned and not available locally.
52-
cutoff uint64
53-
db ethdb.Database
54-
progress chan chan TxIndexProgress
55-
term chan chan struct{}
56-
closed chan struct{}
63+
cutoff uint64
64+
db ethdb.Database
65+
term chan chan struct{}
66+
closed chan struct{}
5767
}
5868

5969
// newTxIndexer initializes the transaction indexer.
6070
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
6171
cutoff, _ := chain.HistoryPruningCutoff()
6272
indexer := &txIndexer{
63-
limit: limit,
64-
cutoff: cutoff,
65-
db: chain.db,
66-
progress: make(chan chan TxIndexProgress),
67-
term: make(chan chan struct{}),
68-
closed: make(chan struct{}),
73+
limit: limit,
74+
cutoff: cutoff,
75+
db: chain.db,
76+
term: make(chan chan struct{}),
77+
closed: make(chan struct{}),
6978
}
79+
indexer.head.Store(indexer.resolveHead())
80+
indexer.tail.Store(rawdb.ReadTxIndexTail(chain.db))
81+
7082
go indexer.loop(chain)
7183

7284
var msg string
@@ -154,6 +166,7 @@ func (indexer *txIndexer) repair(head uint64) {
154166
// A crash may occur between the two delete operations,
155167
// potentially leaving dangling indexes in the database.
156168
// However, this is considered acceptable.
169+
indexer.tail.Store(nil)
157170
rawdb.DeleteTxIndexTail(indexer.db)
158171
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
159172
log.Warn("Purge transaction indexes", "head", head, "tail", *tail)
@@ -174,6 +187,7 @@ func (indexer *txIndexer) repair(head uint64) {
174187
// Traversing the database directly within the transaction
175188
// index namespace might be slow and expensive, but we
176189
// have no choice.
190+
indexer.tail.Store(nil)
177191
rawdb.DeleteTxIndexTail(indexer.db)
178192
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
179193
log.Warn("Purge transaction indexes", "head", head, "cutoff", indexer.cutoff)
@@ -187,6 +201,7 @@ func (indexer *txIndexer) repair(head uint64) {
187201
// A crash may occur between the two delete operations,
188202
// potentially leaving dangling indexes in the database.
189203
// However, this is considered acceptable.
204+
indexer.tail.Store(&indexer.cutoff)
190205
rawdb.WriteTxIndexTail(indexer.db, indexer.cutoff)
191206
rawdb.DeleteAllTxLookupEntries(indexer.db, func(txhash common.Hash, blob []byte) bool {
192207
n := rawdb.DecodeTxLookupEntry(blob, indexer.db)
@@ -216,16 +231,15 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
216231

217232
// Listening to chain events and manipulate the transaction indexes.
218233
var (
219-
stop chan struct{} // Non-nil if background routine is active
220-
done chan struct{} // Non-nil if background routine is active
221-
head = indexer.resolveHead() // The latest announced chain head
222-
234+
stop chan struct{} // Non-nil if background routine is active
235+
done chan struct{} // Non-nil if background routine is active
223236
headCh = make(chan ChainHeadEvent)
224237
sub = chain.SubscribeChainHeadEvent(headCh)
225238
)
226239
defer sub.Unsubscribe()
227240

228241
// Validate the transaction indexes and repair if necessary
242+
head := indexer.head.Load()
229243
indexer.repair(head)
230244

231245
// Launch the initial processing if chain is not empty (head != genesis).
@@ -238,17 +252,18 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
238252
for {
239253
select {
240254
case h := <-headCh:
255+
indexer.head.Store(h.Header.Number.Uint64())
241256
if done == nil {
242257
stop = make(chan struct{})
243258
done = make(chan struct{})
244259
go indexer.run(h.Header.Number.Uint64(), stop, done)
245260
}
246-
head = h.Header.Number.Uint64()
261+
247262
case <-done:
248263
stop = nil
249264
done = nil
250-
case ch := <-indexer.progress:
251-
ch <- indexer.report(head)
265+
indexer.tail.Store(rawdb.ReadTxIndexTail(indexer.db))
266+
252267
case ch := <-indexer.term:
253268
if stop != nil {
254269
close(stop)
@@ -264,7 +279,7 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
264279
}
265280

266281
// report returns the tx indexing progress.
267-
func (indexer *txIndexer) report(head uint64) TxIndexProgress {
282+
func (indexer *txIndexer) report(head uint64, tail *uint64) TxIndexProgress {
268283
// Special case if the head is even below the cutoff,
269284
// nothing to index.
270285
if head < indexer.cutoff {
@@ -284,7 +299,6 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
284299
}
285300
// Compute how many blocks have been indexed
286301
var indexed uint64
287-
tail := rawdb.ReadTxIndexTail(indexer.db)
288302
if tail != nil {
289303
indexed = head - *tail + 1
290304
}
@@ -300,16 +314,12 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
300314
}
301315
}
302316

303-
// txIndexProgress retrieves the tx indexing progress, or an error if the
304-
// background tx indexer is already stopped.
305-
func (indexer *txIndexer) txIndexProgress() (TxIndexProgress, error) {
306-
ch := make(chan TxIndexProgress, 1)
307-
select {
308-
case indexer.progress <- ch:
309-
return <-ch, nil
310-
case <-indexer.closed:
311-
return TxIndexProgress{}, errors.New("indexer is closed")
312-
}
317+
// txIndexProgress retrieves the transaction indexing progress. The reported
318+
// progress may slightly lag behind the actual indexing state, as the tail is
319+
// only updated at the end of each indexing operation. However, this delay is
320+
// considered acceptable.
321+
func (indexer *txIndexer) txIndexProgress() TxIndexProgress {
322+
return indexer.report(indexer.head.Load(), indexer.tail.Load())
313323
}
314324

315325
// close shutdown the indexer. Safe to be called for multiple times.

core/txindexer_test.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,8 @@ func TestTxIndexer(t *testing.T) {
121121

122122
// Index the initial blocks from ancient store
123123
indexer := &txIndexer{
124-
limit: 0,
125-
db: db,
126-
progress: make(chan chan TxIndexProgress),
124+
limit: 0,
125+
db: db,
127126
}
128127
for i, limit := range c.limits {
129128
indexer.limit = limit
@@ -241,9 +240,8 @@ func TestTxIndexerRepair(t *testing.T) {
241240

242241
// Index the initial blocks from ancient store
243242
indexer := &txIndexer{
244-
limit: c.limit,
245-
db: db,
246-
progress: make(chan chan TxIndexProgress),
243+
limit: c.limit,
244+
db: db,
247245
}
248246
indexer.run(chainHead, make(chan struct{}), make(chan struct{}))
249247

@@ -432,15 +430,11 @@ func TestTxIndexerReport(t *testing.T) {
432430

433431
// Index the initial blocks from ancient store
434432
indexer := &txIndexer{
435-
limit: c.limit,
436-
cutoff: c.cutoff,
437-
db: db,
438-
progress: make(chan chan TxIndexProgress),
433+
limit: c.limit,
434+
cutoff: c.cutoff,
435+
db: db,
439436
}
440-
if c.tail != nil {
441-
rawdb.WriteTxIndexTail(db, *c.tail)
442-
}
443-
p := indexer.report(c.head)
437+
p := indexer.report(c.head, c.tail)
444438
if p.Indexed != c.expIndexed {
445439
t.Fatalf("Unexpected indexed: %d, expected: %d", p.Indexed, c.expIndexed)
446440
}

eth/api_backend.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -349,22 +349,20 @@ func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction
349349
// GetTransaction retrieves the lookup along with the transaction itself associate
350350
// with the given transaction hash.
351351
//
352-
// An error will be returned if the transaction is not found, and background
353-
// indexing for transactions is still in progress. The error is used to indicate the
354-
// scenario explicitly that the transaction might be reachable shortly.
355-
//
356-
// A null will be returned in the transaction is not found and background transaction
357-
// indexing is already finished. The transaction is not existent from the perspective
358-
// of node.
359-
func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) {
360-
lookup, tx, err := b.eth.blockchain.GetTransactionLookup(txHash)
361-
if err != nil {
362-
return false, nil, common.Hash{}, 0, 0, err
363-
}
352+
// A null will be returned if the transaction is not found. The transaction is not
353+
// existent from the node's perspective. This can be due to the transaction indexer
354+
// not being finished. The caller must explicitly check the indexer progress.
355+
func (b *EthAPIBackend) GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) {
356+
lookup, tx := b.eth.blockchain.GetTransactionLookup(txHash)
364357
if lookup == nil || tx == nil {
365-
return false, nil, common.Hash{}, 0, 0, nil
358+
return false, nil, common.Hash{}, 0, 0
366359
}
367-
return true, tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index, nil
360+
return true, tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index
361+
}
362+
363+
// TxIndexDone returns true if the transaction indexer has finished indexing.
364+
func (b *EthAPIBackend) TxIndexDone() bool {
365+
return b.eth.blockchain.TxIndexDone()
368366
}
369367

370368
func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
@@ -391,7 +389,7 @@ func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S
391389
return b.eth.txPool.SubscribeTransactions(ch, true)
392390
}
393391

394-
func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {
392+
func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress {
395393
prog := b.eth.Downloader().Progress()
396394
if txProg, err := b.eth.blockchain.TxIndexProgress(); err == nil {
397395
prog.TxIndexFinishedBlocks = txProg.Indexed

eth/tracers/api.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ type Backend interface {
8282
HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
8383
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
8484
BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
85-
GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error)
85+
GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64)
86+
TxIndexDone() bool
8687
RPCGasCap() uint64
8788
ChainConfig() *params.ChainConfig
8889
Engine() consensus.Engine
@@ -858,12 +859,13 @@ func containsTx(block *types.Block, hash common.Hash) bool {
858859
// TraceTransaction returns the structured logs created during the execution of EVM
859860
// and returns them as a JSON object.
860861
func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config *TraceConfig) (interface{}, error) {
861-
found, _, blockHash, blockNumber, index, err := api.backend.GetTransaction(ctx, hash)
862-
if err != nil {
863-
return nil, ethapi.NewTxIndexingError()
864-
}
865-
// Only mined txes are supported
862+
found, _, blockHash, blockNumber, index := api.backend.GetTransaction(hash)
866863
if !found {
864+
// Warn in case tx indexer is not done.
865+
if !api.backend.TxIndexDone() {
866+
return nil, ethapi.NewTxIndexingError()
867+
}
868+
// Only mined txes are supported
867869
return nil, errTxNotFound
868870
}
869871
// It shouldn't happen in practice.

eth/tracers/api_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,13 @@ func (b *testBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber)
116116
return b.chain.GetBlockByNumber(uint64(number)), nil
117117
}
118118

119-
func (b *testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) {
119+
func (b *testBackend) GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) {
120120
tx, hash, blockNumber, index := rawdb.ReadTransaction(b.chaindb, txHash)
121-
return tx != nil, tx, hash, blockNumber, index, nil
121+
return tx != nil, tx, hash, blockNumber, index
122+
}
123+
124+
func (b *testBackend) TxIndexDone() bool {
125+
return true
122126
}
123127

124128
func (b *testBackend) RPCGasCap() uint64 {

0 commit comments

Comments
 (0)