From ca42c98c60f31afc3db63b74e623fb5e0c5859b0 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Wed, 1 Oct 2025 16:43:46 -0700 Subject: [PATCH 01/23] feat: initial api with default timeouts --- eth/api_backend.go | 8 ++++++ internal/ethapi/api.go | 56 ++++++++++++++++++++++++++++++++++++++ internal/ethapi/backend.go | 2 ++ internal/ethapi/errors.go | 13 +++++++++ 4 files changed, 79 insertions(+) diff --git a/eth/api_backend.go b/eth/api_backend.go index 3ae73e78af1..cbcf428d13e 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -486,3 +486,11 @@ func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, re func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (*types.Transaction, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) { return b.eth.stateAtTransaction(ctx, block, txIndex, reexec) } + +func (b *EthAPIBackend) RPCTxSyncDefaultTimeout() time.Duration { + return 2 * time.Second +} + +func (b *EthAPIBackend) RPCTxSyncMaxTimeout() time.Duration { + return 5 * time.Minute +} diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a2cb28d3b2d..8b28bb7092d 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1652,6 +1652,62 @@ func (api *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil return SubmitTransaction(ctx, api.b, tx) } +// SendRawTransactionSync will add the signed transaction to the transaction pool +// and wait for the transaction to be mined until the timeout (in milliseconds) is reached. +func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hexutil.Bytes, timeoutMs *hexutil.Uint64) (map[string]interface{}, error) { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(input); err != nil { + return nil, err + } + hash, err := SubmitTransaction(ctx, api.b, tx) + if err != nil { + return nil, err + } + + // compute effective timeout + max := api.b.RPCTxSyncMaxTimeout() + def := api.b.RPCTxSyncDefaultTimeout() + + eff := def + if timeoutMs != nil && *timeoutMs > 0 { + req := time.Duration(*timeoutMs) * time.Millisecond + if req > max { + eff = max + } else { + eff = req // allow shorter than default + } + } + + // Wait for receipt until timeout + receiptCtx, cancel := context.WithTimeout(ctx, eff) + defer cancel() + + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + receipt, getErr := api.GetTransactionReceipt(receiptCtx, hash) + // If tx-index still building, keep polling + if getErr != nil && !errors.Is(getErr, NewTxIndexingError()) { + // transient or other error: just keep polling + } + if receipt != nil { + return receipt, nil + } + case <-receiptCtx.Done(): + if err := ctx.Err(); err != nil { + return nil, err // context canceled / deadline exceeded upstream + } + return nil, &txSyncTimeoutError{ + msg: fmt.Sprintf("The transaction was added to the transaction pool but wasn't processed in %v.", eff), + hash: hash, + } + } + } +} + // Sign calculates an ECDSA signature for: // keccak256("\x19Ethereum Signed Message:\n" + len(message) + message). // diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index f709a1fcdcc..af3d592b82b 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -53,6 +53,8 @@ type Backend interface { RPCEVMTimeout() time.Duration // global timeout for eth_call over rpc: DoS protection RPCTxFeeCap() float64 // global tx fee cap for all transaction related APIs UnprotectedAllowed() bool // allows only for EIP155 transactions. + RPCTxSyncDefaultTimeout() time.Duration + RPCTxSyncMaxTimeout() time.Duration // Blockchain API SetHead(number uint64) diff --git a/internal/ethapi/errors.go b/internal/ethapi/errors.go index 154938fa0e3..235f5b3fa8d 100644 --- a/internal/ethapi/errors.go +++ b/internal/ethapi/errors.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/vm" @@ -33,6 +34,11 @@ type revertError struct { reason string // revert reason hex encoded } +type txSyncTimeoutError struct { + msg string + hash common.Hash +} + // ErrorCode returns the JSON error code for a revert. // See: https://ethereum.org/en/developers/docs/apis/json-rpc/#error-codes func (e *revertError) ErrorCode() int { @@ -108,6 +114,7 @@ const ( errCodeInvalidParams = -32602 errCodeReverted = -32000 errCodeVMError = -32015 + errCodeTxSyncTimeout = 4 ) func txValidationError(err error) *invalidTxError { @@ -168,3 +175,9 @@ type blockGasLimitReachedError struct{ message string } func (e *blockGasLimitReachedError) Error() string { return e.message } func (e *blockGasLimitReachedError) ErrorCode() int { return errCodeBlockGasLimitReached } + +func (e *txSyncTimeoutError) Error() string { return e.msg } +func (e *txSyncTimeoutError) ErrorCode() int { return errCodeTxSyncTimeout } + +// ErrorData should be JSON-safe; return the 0x-hex string. +func (e *txSyncTimeoutError) ErrorData() interface{} { return e.hash.Hex() } From a33a1c91420ad529a5c7a8edf4881c8353870115 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Wed, 1 Oct 2025 17:41:47 -0700 Subject: [PATCH 02/23] add happy and timeout unit tests --- internal/ethapi/api_test.go | 142 +++++++++++++++++++++++++++++++++++- 1 file changed, 140 insertions(+), 2 deletions(-) diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index d3278c04e79..04ac249e4b4 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -440,6 +440,14 @@ type testBackend struct { pending *types.Block pendingReceipts types.Receipts + + // test-only fields for SendRawTransactionSync + autoMine bool + mined bool + syncDefaultTO time.Duration + syncMaxTO time.Duration + sentTx *types.Transaction + sentTxHash common.Hash } func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.Engine, generator func(i int, b *core.BlockGen)) *testBackend { @@ -592,14 +600,38 @@ func (b testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscr func (b testBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { panic("implement me") } -func (b testBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - panic("implement me") +func (b *testBackend) SendTx(ctx context.Context, tx *types.Transaction) error { + b.sentTx = tx + b.sentTxHash = tx.Hash() + if b.autoMine { + b.mined = true + } + return nil } func (b testBackend) GetCanonicalTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) { + // in-memory fast path for tests + if b.mined && txHash == b.sentTxHash { + return true, b.sentTx, common.HexToHash("0x01"), 1, 0 + } + // fallback to existing DB-backed path tx, blockHash, blockNumber, index := rawdb.ReadCanonicalTransaction(b.db, txHash) return tx != nil, tx, blockHash, blockNumber, index } func (b testBackend) GetCanonicalReceipt(tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64) (*types.Receipt, error) { + // In-memory fast path used by tests + if b.mined && tx != nil && tx.Hash() == b.sentTxHash && blockHash == common.HexToHash("0x01") && blockNumber == 1 && blockIndex == 0 { + return &types.Receipt{ + Type: tx.Type(), + Status: types.ReceiptStatusSuccessful, + CumulativeGasUsed: 21000, + GasUsed: 21000, + EffectiveGasPrice: big.NewInt(1), + BlockHash: blockHash, + BlockNumber: new(big.Int).SetUint64(blockNumber), + TransactionIndex: 0, + }, nil + } + // Fallback: use the chain's canonical receipt (DB-backed) return b.chain.GetCanonicalReceipt(tx, blockHash, blockNumber, blockIndex) } func (b testBackend) TxIndexDone() bool { @@ -3889,3 +3921,109 @@ func (b configTimeBackend) HeaderByNumber(_ context.Context, n rpc.BlockNumber) func (b configTimeBackend) CurrentHeader() *types.Header { return &types.Header{Time: b.time} } + +func (b *testBackend) RPCTxSyncDefaultTimeout() time.Duration { + if b.syncDefaultTO != 0 { + return b.syncDefaultTO + } + return 2 * time.Second +} +func (b *testBackend) RPCTxSyncMaxTimeout() time.Duration { + if b.syncMaxTO != 0 { + return b.syncMaxTO + } + return 5 * time.Minute +} +func (b *backendMock) RPCTxSyncDefaultTimeout() time.Duration { return 2 * time.Second } +func (b *backendMock) RPCTxSyncMaxTimeout() time.Duration { return 5 * time.Minute } + +func makeSignedRaw(t *testing.T, api *TransactionAPI, from, to common.Address, value *big.Int) (hexutil.Bytes, *types.Transaction) { + t.Helper() + + fillRes, err := api.FillTransaction(context.Background(), TransactionArgs{ + From: &from, + To: &to, + Value: (*hexutil.Big)(value), + }) + if err != nil { + t.Fatalf("FillTransaction failed: %v", err) + } + signRes, err := api.SignTransaction(context.Background(), argsFromTransaction(fillRes.Tx, from)) + if err != nil { + t.Fatalf("SignTransaction failed: %v", err) + } + return signRes.Raw, signRes.Tx +} + +// makeSelfSignedRaw is a convenience for a 0-ETH self-transfer. +func makeSelfSignedRaw(t *testing.T, api *TransactionAPI, addr common.Address) (hexutil.Bytes, *types.Transaction) { + return makeSignedRaw(t, api, addr, addr, big.NewInt(0)) +} + +func TestSendRawTransactionSync_Success(t *testing.T) { + t.Parallel() + genesis := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{}, + } + b := newTestBackend(t, 0, genesis, ethash.NewFaker(), nil) + b.autoMine = true // immediately “mines” the tx in-memory + + api := NewTransactionAPI(b, new(AddrLocker)) + + raw, _ := makeSelfSignedRaw(t, api, b.acc.Address) + + receipt, err := api.SendRawTransactionSync(context.Background(), raw, nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if receipt == nil { + t.Fatalf("expected non-nil receipt") + } + if _, ok := receipt["blockNumber"]; !ok { + t.Fatalf("expected blockNumber in receipt, got %#v", receipt) + } +} + +func TestSendRawTransactionSync_Timeout(t *testing.T) { + t.Parallel() + + genesis := &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{}, + } + b := newTestBackend(t, 0, genesis, ethash.NewFaker(), nil) + b.autoMine = false // don't mine, should time out + + api := NewTransactionAPI(b, new(AddrLocker)) + + raw, _ := makeSelfSignedRaw(t, api, b.acc.Address) + + timeout := hexutil.Uint64(200) // 200ms + receipt, err := api.SendRawTransactionSync(context.Background(), raw, &timeout) + + if receipt != nil { + t.Fatalf("expected nil receipt, got %#v", receipt) + } + if err == nil { + t.Fatalf("expected timeout error, got nil") + } + // assert error shape & data (hash) + var de interface { + ErrorCode() int + ErrorData() interface{} + } + if !errors.As(err, &de) { + t.Fatalf("expected data error with code/data, got %T %v", err, err) + } + if de.ErrorCode() != errCodeTxSyncTimeout { + t.Fatalf("expected code %d, got %d", errCodeTxSyncTimeout, de.ErrorCode()) + } + tx := new(types.Transaction) + if e := tx.UnmarshalBinary(raw); e != nil { + t.Fatal(e) + } + if got, want := de.ErrorData(), tx.Hash().Hex(); got != want { + t.Fatalf("expected ErrorData=%s, got %v", want, got) + } +} From 0684a33a50ee54eeba9b0a8462248f1e8a1733a8 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Wed, 1 Oct 2025 21:49:33 -0700 Subject: [PATCH 03/23] add flag --- cmd/geth/main.go | 2 ++ cmd/utils/flags.go | 18 ++++++++++++++++ eth/ethconfig/config.go | 48 +++++++++++++++++++++++------------------ 3 files changed, 47 insertions(+), 21 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 2465b52ad1f..2a36c7851c2 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -188,6 +188,8 @@ var ( utils.AllowUnprotectedTxs, utils.BatchRequestLimit, utils.BatchResponseMaxSize, + utils.RPCTxSyncDefaultFlag, + utils.RPCTxSyncMaxFlag, } metricsFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index c9da08578c9..346990a2efe 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -615,6 +615,18 @@ var ( Value: ethconfig.Defaults.LogQueryLimit, Category: flags.APICategory, } + RPCTxSyncDefaultFlag = &cli.DurationFlag{ + Name: "rpc.txsync.default", + Usage: "Default timeout for eth_sendRawTransactionSync (e.g. 2s, 500ms)", + Value: ethconfig.Defaults.TxSyncDefaultTimeout, + Category: flags.APICategory, + } + RPCTxSyncMaxFlag = &cli.DurationFlag{ + Name: "rpc.txsync.max", + Usage: "Maximum allowed timeout for eth_sendRawTransactionSync (e.g. 5m)", + Value: ethconfig.Defaults.TxSyncMaxTimeout, + Category: flags.APICategory, + } // Authenticated RPC HTTP settings AuthListenFlag = &cli.StringFlag{ Name: "authrpc.addr", @@ -1717,6 +1729,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(RPCGlobalLogQueryLimit.Name) { cfg.LogQueryLimit = ctx.Int(RPCGlobalLogQueryLimit.Name) } + if ctx.IsSet(RPCTxSyncDefaultFlag.Name) { + cfg.TxSyncDefaultTimeout = ctx.Duration(RPCTxSyncDefaultFlag.Name) + } + if ctx.IsSet(RPCTxSyncMaxFlag.Name) { + cfg.TxSyncMaxTimeout = ctx.Duration(RPCTxSyncMaxFlag.Name) + } if !ctx.Bool(SnapshotFlag.Name) || cfg.SnapshotCache == 0 { // If snap-sync is requested, this flag is also required if cfg.SyncMode == ethconfig.SnapSync { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 6020387bcdb..c4a0956b3b4 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -49,27 +49,29 @@ var FullNodeGPO = gasprice.Config{ // Defaults contains default settings for use on the Ethereum main net. var Defaults = Config{ - HistoryMode: history.KeepAll, - SyncMode: SnapSync, - NetworkId: 0, // enable auto configuration of networkID == chainID - TxLookupLimit: 2350000, - TransactionHistory: 2350000, - LogHistory: 2350000, - StateHistory: params.FullImmutabilityThreshold, - DatabaseCache: 512, - TrieCleanCache: 154, - TrieDirtyCache: 256, - TrieTimeout: 60 * time.Minute, - SnapshotCache: 102, - FilterLogCacheSize: 32, - LogQueryLimit: 1000, - Miner: miner.DefaultConfig, - TxPool: legacypool.DefaultConfig, - BlobPool: blobpool.DefaultConfig, - RPCGasCap: 50000000, - RPCEVMTimeout: 5 * time.Second, - GPO: FullNodeGPO, - RPCTxFeeCap: 1, // 1 ether + HistoryMode: history.KeepAll, + SyncMode: SnapSync, + NetworkId: 0, // enable auto configuration of networkID == chainID + TxLookupLimit: 2350000, + TransactionHistory: 2350000, + LogHistory: 2350000, + StateHistory: params.FullImmutabilityThreshold, + DatabaseCache: 512, + TrieCleanCache: 154, + TrieDirtyCache: 256, + TrieTimeout: 60 * time.Minute, + SnapshotCache: 102, + FilterLogCacheSize: 32, + LogQueryLimit: 1000, + Miner: miner.DefaultConfig, + TxPool: legacypool.DefaultConfig, + BlobPool: blobpool.DefaultConfig, + RPCGasCap: 50000000, + RPCEVMTimeout: 5 * time.Second, + GPO: FullNodeGPO, + RPCTxFeeCap: 1, // 1 ether + TxSyncDefaultTimeout: 20 * time.Second, + TxSyncMaxTimeout: 1 * time.Minute, } //go:generate go run github.com/fjl/gencodec -type Config -formats toml -out gen_config.go @@ -183,6 +185,10 @@ type Config struct { // OverrideVerkle (TODO: remove after the fork) OverrideVerkle *uint64 `toml:",omitempty"` + + // EIP-7966: eth_sendRawTransactionSync timeouts + TxSyncDefaultTimeout time.Duration `toml:",omitempty"` + TxSyncMaxTimeout time.Duration `toml:",omitempty"` } // CreateConsensusEngine creates a consensus engine for the given chain config. From e7a48a5c08ad2db6bfa171e46407670af0e8430f Mon Sep 17 00:00:00 2001 From: aodhgan Date: Wed, 1 Oct 2025 22:41:01 -0700 Subject: [PATCH 04/23] move to event driven --- internal/ethapi/api.go | 56 ++++++++++++++++++++++++++----------- internal/ethapi/api_test.go | 5 ++-- 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 8b28bb7092d..e35ffcceac6 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/gasestimator" "github.com/ethereum/go-ethereum/eth/tracers/logger" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi/override" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -1653,7 +1654,7 @@ func (api *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil } // SendRawTransactionSync will add the signed transaction to the transaction pool -// and wait for the transaction to be mined until the timeout (in milliseconds) is reached. +// and wait until the transaction has been included in a block and return the receipt, or the timeout. func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hexutil.Bytes, timeoutMs *hexutil.Uint64) (map[string]interface{}, error) { tx := new(types.Transaction) if err := tx.UnmarshalBinary(input); err != nil { @@ -1664,7 +1665,7 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex return nil, err } - // compute effective timeout + // effective timeout: min(requested, max), default if none max := api.b.RPCTxSyncMaxTimeout() def := api.b.RPCTxSyncDefaultTimeout() @@ -1674,40 +1675,63 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex if req > max { eff = max } else { - eff = req // allow shorter than default + eff = req } } - // Wait for receipt until timeout receiptCtx, cancel := context.WithTimeout(ctx, eff) defer cancel() - ticker := time.NewTicker(time.Millisecond * 100) - defer ticker.Stop() + // Fast path: maybe already mined/indexed + if r, err := api.GetTransactionReceipt(receiptCtx, hash); err == nil && r != nil { + return r, nil + } + + // Subscribe to head changes and re-check on each new block + heads := make(chan core.ChainHeadEvent, 16) + var sub event.Subscription = api.b.SubscribeChainHeadEvent(heads) + if sub == nil { + return nil, errors.New("chain head subscription unavailable") + } + defer sub.Unsubscribe() for { select { - case <-ticker.C: - receipt, getErr := api.GetTransactionReceipt(receiptCtx, hash) - // If tx-index still building, keep polling - if getErr != nil && !errors.Is(getErr, NewTxIndexingError()) { - // transient or other error: just keep polling - } - if receipt != nil { - return receipt, nil - } case <-receiptCtx.Done(): + // Distinguish upstream cancellation from our timeout if err := ctx.Err(); err != nil { - return nil, err // context canceled / deadline exceeded upstream + return nil, err } return nil, &txSyncTimeoutError{ msg: fmt.Sprintf("The transaction was added to the transaction pool but wasn't processed in %v.", eff), hash: hash, } + + case err := <-sub.Err(): + return nil, err + + case <-heads: + receipt, getErr := api.GetTransactionReceipt(receiptCtx, hash) + // If tx-index still building, keep waiting; ignore transient errors + if getErr != nil && !errors.Is(getErr, NewTxIndexingError()) { + // ignore and wait for next head + continue + } + if receipt != nil { + return receipt, nil + } } } } +func (api *TransactionAPI) tryGetReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { + receipt, err := api.GetTransactionReceipt(ctx, hash) + if err != nil { + return nil, err + } + return receipt, nil +} + // Sign calculates an ECDSA signature for: // keccak256("\x19Ethereum Signed Message:\n" + len(message) + message). // diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 04ac249e4b4..5ee3453a1ac 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -448,6 +448,7 @@ type testBackend struct { syncMaxTO time.Duration sentTx *types.Transaction sentTxHash common.Hash + headFeed *event.Feed } func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.Engine, generator func(i int, b *core.BlockGen)) *testBackend { @@ -474,6 +475,7 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.E acc: acc, pending: blocks[n], pendingReceipts: receipts[n], + headFeed: new(event.Feed), } return backend } @@ -598,7 +600,7 @@ func (b testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscr panic("implement me") } func (b testBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { - panic("implement me") + return b.headFeed.Subscribe(ch) } func (b *testBackend) SendTx(ctx context.Context, tx *types.Transaction) error { b.sentTx = tx @@ -3959,7 +3961,6 @@ func makeSignedRaw(t *testing.T, api *TransactionAPI, from, to common.Address, v func makeSelfSignedRaw(t *testing.T, api *TransactionAPI, addr common.Address) (hexutil.Bytes, *types.Transaction) { return makeSignedRaw(t, api, addr, addr, big.NewInt(0)) } - func TestSendRawTransactionSync_Success(t *testing.T) { t.Parallel() genesis := &core.Genesis{ From e3fa487898d01d46338a9be84223b5b5bda9c880 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Wed, 1 Oct 2025 22:44:39 -0700 Subject: [PATCH 05/23] use configs in api backend --- eth/api_backend.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/api_backend.go b/eth/api_backend.go index cbcf428d13e..766a99fc1ef 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -488,9 +488,9 @@ func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Blo } func (b *EthAPIBackend) RPCTxSyncDefaultTimeout() time.Duration { - return 2 * time.Second + return b.eth.config.TxSyncDefaultTimeout } func (b *EthAPIBackend) RPCTxSyncMaxTimeout() time.Duration { - return 5 * time.Minute + return b.eth.config.TxSyncMaxTimeout } From 1e697d990c6ff83b7392380aedbcf49ef9daea39 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Wed, 1 Oct 2025 22:48:31 -0700 Subject: [PATCH 06/23] rm unused function --- internal/ethapi/api.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index e35ffcceac6..3faad83166f 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1724,14 +1724,6 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex } } -func (api *TransactionAPI) tryGetReceipt(ctx context.Context, hash common.Hash) (map[string]interface{}, error) { - receipt, err := api.GetTransactionReceipt(ctx, hash) - if err != nil { - return nil, err - } - return receipt, nil -} - // Sign calculates an ECDSA signature for: // keccak256("\x19Ethereum Signed Message:\n" + len(message) + message). // From 4c5beb4670ea983a5103d11d7410ff47efdc0660 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Thu, 2 Oct 2025 12:32:36 -0700 Subject: [PATCH 07/23] poll after head --- internal/ethapi/api.go | 99 +++++++++++++++++++++++++++++++----------- 1 file changed, 74 insertions(+), 25 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 3faad83166f..c75c8dc563b 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -41,7 +41,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/gasestimator" "github.com/ethereum/go-ethereum/eth/tracers/logger" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi/override" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -1665,61 +1664,111 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex return nil, err } - // effective timeout: min(requested, max), default if none - max := api.b.RPCTxSyncMaxTimeout() - def := api.b.RPCTxSyncDefaultTimeout() + maxTimeout := api.b.RPCTxSyncMaxTimeout() + defaultTimeout := api.b.RPCTxSyncDefaultTimeout() - eff := def + timeout := defaultTimeout if timeoutMs != nil && *timeoutMs > 0 { req := time.Duration(*timeoutMs) * time.Millisecond - if req > max { - eff = max + if req > maxTimeout { + timeout = maxTimeout } else { - eff = req + timeout = req } } - receiptCtx, cancel := context.WithTimeout(ctx, eff) + receiptCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // Fast path: maybe already mined/indexed + // Fast path. if r, err := api.GetTransactionReceipt(receiptCtx, hash); err == nil && r != nil { return r, nil } - // Subscribe to head changes and re-check on each new block + // Subscribe to new block events and check the receipt on each new block. heads := make(chan core.ChainHeadEvent, 16) - var sub event.Subscription = api.b.SubscribeChainHeadEvent(heads) - if sub == nil { - return nil, errors.New("chain head subscription unavailable") - } + sub := api.b.SubscribeChainHeadEvent(heads) defer sub.Unsubscribe() + subErrCh := sub.Err() + + // calculate poll/settle intervals + const ( + pollFraction = 20 + pollMin = 25 * time.Millisecond + pollMax = 500 * time.Millisecond + ) + settleInterval := timeout / pollFraction + if settleInterval < pollMin { + settleInterval = pollMin + } else if settleInterval > pollMax { + settleInterval = pollMax + } + + // Settle: short delay to bridge receipt-indexing lag after a new head. + // resetSettle re-arms a single timer safely (stop+drain+reset). + // On head: check once immediately, then reset; on timer: re-check; repeat until deadline. + var ( + settle *time.Timer + settleCh <-chan time.Time + ) + resetSettle := func(d time.Duration) { + if settle == nil { + settle = time.NewTimer(d) + settleCh = settle.C + return + } + if !settle.Stop() { + select { + case <-settle.C: + default: + } + } + settle.Reset(d) + } + defer func() { + if settle != nil && !settle.Stop() { + select { + case <-settle.C: + default: + } + } + }() + + // Start a settle cycle immediately to cover a missed-head race. + resetSettle(settleInterval) for { select { case <-receiptCtx.Done(): - // Distinguish upstream cancellation from our timeout if err := ctx.Err(); err != nil { return nil, err } return nil, &txSyncTimeoutError{ - msg: fmt.Sprintf("The transaction was added to the transaction pool but wasn't processed in %v.", eff), + msg: fmt.Sprintf("The transaction was added to the transaction pool but wasn't processed in %v.", timeout), hash: hash, } - case err := <-sub.Err(): + case err, ok := <-subErrCh: + if !ok || err == nil { + // subscription closed; disable this case and rely on settle timer + subErrCh = nil + continue + } return nil, err case <-heads: - receipt, getErr := api.GetTransactionReceipt(receiptCtx, hash) - // If tx-index still building, keep waiting; ignore transient errors - if getErr != nil && !errors.Is(getErr, NewTxIndexingError()) { - // ignore and wait for next head - continue + // Immediate re-check on new head, then grace to bridge indexing lag. + if r, err := api.GetTransactionReceipt(receiptCtx, hash); err == nil && r != nil { + return r, nil } - if receipt != nil { - return receipt, nil + resetSettle(settleInterval) + + case <-settleCh: + r, getErr := api.GetTransactionReceipt(receiptCtx, hash) + if r != nil && getErr == nil { + return r, nil } + resetSettle(settleInterval) } } } From a463b32a4bc0cc762c1571367a53b8ccff98de9d Mon Sep 17 00:00:00 2001 From: aodhgan Date: Thu, 2 Oct 2025 12:40:44 -0700 Subject: [PATCH 08/23] rm unneccessary var --- internal/ethapi/api.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index c75c8dc563b..1e3dc657a7c 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1764,8 +1764,7 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex resetSettle(settleInterval) case <-settleCh: - r, getErr := api.GetTransactionReceipt(receiptCtx, hash) - if r != nil && getErr == nil { + if r, err := api.GetTransactionReceipt(receiptCtx, hash); err == nil && r != nil { return r, nil } resetSettle(settleInterval) From 3a3c46a39b682a3d6a6d1b70cdf15144bf28c352 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Thu, 9 Oct 2025 12:18:06 -0700 Subject: [PATCH 09/23] use SubscribeTransactionReceipts --- internal/ethapi/api.go | 89 ++++++------------ internal/ethapi/api_test.go | 114 +++++++++++++++++++---- internal/ethapi/backend.go | 1 + internal/ethapi/transaction_args_test.go | 3 + 4 files changed, 130 insertions(+), 77 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 1e3dc657a7c..ccc0c3c7a7c 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1652,6 +1652,11 @@ func (api *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil return SubmitTransaction(ctx, api.b, tx) } +type ReceiptWithTx struct { + Receipt *types.Receipt + Tx *types.Transaction +} + // SendRawTransactionSync will add the signed transaction to the transaction pool // and wait until the transaction has been included in a block and return the receipt, or the timeout. func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hexutil.Bytes, timeoutMs *hexutil.Uint64) (map[string]interface{}, error) { @@ -1685,61 +1690,17 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex return r, nil } - // Subscribe to new block events and check the receipt on each new block. - heads := make(chan core.ChainHeadEvent, 16) - sub := api.b.SubscribeChainHeadEvent(heads) + // Subscribe to receipt stream (filtered to this tx) + rcpts := make(chan []*ReceiptWithTx, 1) + sub := api.b.SubscribeTransactionReceipts([]common.Hash{hash}, rcpts) defer sub.Unsubscribe() - subErrCh := sub.Err() - - // calculate poll/settle intervals - const ( - pollFraction = 20 - pollMin = 25 * time.Millisecond - pollMax = 500 * time.Millisecond - ) - settleInterval := timeout / pollFraction - if settleInterval < pollMin { - settleInterval = pollMin - } else if settleInterval > pollMax { - settleInterval = pollMax - } - // Settle: short delay to bridge receipt-indexing lag after a new head. - // resetSettle re-arms a single timer safely (stop+drain+reset). - // On head: check once immediately, then reset; on timer: re-check; repeat until deadline. - var ( - settle *time.Timer - settleCh <-chan time.Time - ) - resetSettle := func(d time.Duration) { - if settle == nil { - settle = time.NewTimer(d) - settleCh = settle.C - return - } - if !settle.Stop() { - select { - case <-settle.C: - default: - } - } - settle.Reset(d) - } - defer func() { - if settle != nil && !settle.Stop() { - select { - case <-settle.C: - default: - } - } - }() - - // Start a settle cycle immediately to cover a missed-head race. - resetSettle(settleInterval) + subErrCh := sub.Err() for { select { case <-receiptCtx.Done(): + // Upstream cancellation -> bubble it; otherwise emit our timeout error if err := ctx.Err(); err != nil { return nil, err } @@ -1750,24 +1711,30 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex case err, ok := <-subErrCh: if !ok || err == nil { - // subscription closed; disable this case and rely on settle timer + // subscription closed; disable this case subErrCh = nil continue } return nil, err - case <-heads: - // Immediate re-check on new head, then grace to bridge indexing lag. - if r, err := api.GetTransactionReceipt(receiptCtx, hash); err == nil && r != nil { - return r, nil - } - resetSettle(settleInterval) - - case <-settleCh: - if r, err := api.GetTransactionReceipt(receiptCtx, hash); err == nil && r != nil { - return r, nil + case batch := <-rcpts: + for _, rwt := range batch { + if rwt == nil || rwt.Receipt == nil || rwt.Receipt.TxHash != hash { + continue + } + + if rwt.Receipt.BlockNumber != nil && rwt.Receipt.BlockHash != (common.Hash{}) { + return MarshalReceipt( + rwt.Receipt, + rwt.Receipt.BlockHash, + rwt.Receipt.BlockNumber.Uint64(), + api.signer, + rwt.Tx, + int(rwt.Receipt.TransactionIndex), + ), nil + } + return api.GetTransactionReceipt(receiptCtx, hash) } - resetSettle(settleInterval) } } } diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 5ee3453a1ac..94d9982a83d 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -442,13 +442,19 @@ type testBackend struct { pendingReceipts types.Receipts // test-only fields for SendRawTransactionSync - autoMine bool - mined bool + receiptsFeed *event.Feed + headFeed *event.Feed // keep if other tests use it; otherwise remove + autoMine bool + + sentTx *types.Transaction + sentTxHash common.Hash + syncDefaultTO time.Duration syncMaxTO time.Duration - sentTx *types.Transaction - sentTxHash common.Hash - headFeed *event.Feed +} + +func fakeBlockHash(txh common.Hash) common.Hash { + return crypto.Keccak256Hash([]byte("testblock"), txh.Bytes()) } func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.Engine, generator func(i int, b *core.BlockGen)) *testBackend { @@ -476,6 +482,7 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.E pending: blocks[n], pendingReceipts: receipts[n], headFeed: new(event.Feed), + receiptsFeed: new(event.Feed), } return backend } @@ -605,23 +612,38 @@ func (b testBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) even func (b *testBackend) SendTx(ctx context.Context, tx *types.Transaction) error { b.sentTx = tx b.sentTxHash = tx.Hash() - if b.autoMine { - b.mined = true + + if b.autoMine && b.receiptsFeed != nil { + num := b.chain.CurrentHeader().Number.Uint64() + 1 + bh := fakeBlockHash(tx.Hash()) + + r := &types.Receipt{ + TxHash: tx.Hash(), + Status: types.ReceiptStatusSuccessful, + BlockHash: bh, + BlockNumber: new(big.Int).SetUint64(num), + TransactionIndex: 0, + CumulativeGasUsed: 21000, + GasUsed: 21000, + } + b.receiptsFeed.Send([]*ReceiptWithTx{{Receipt: r, Tx: tx}}) } return nil } -func (b testBackend) GetCanonicalTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) { - // in-memory fast path for tests - if b.mined && txHash == b.sentTxHash { - return true, b.sentTx, common.HexToHash("0x01"), 1, 0 +func (b *testBackend) GetCanonicalTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) { + // Treat the auto-mined tx as canonically placed at head+1. + if b.autoMine && txHash == b.sentTxHash { + num := b.chain.CurrentHeader().Number.Uint64() + 1 + return true, b.sentTx, fakeBlockHash(txHash), num, 0 } - // fallback to existing DB-backed path tx, blockHash, blockNumber, index := rawdb.ReadCanonicalTransaction(b.db, txHash) return tx != nil, tx, blockHash, blockNumber, index } -func (b testBackend) GetCanonicalReceipt(tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64) (*types.Receipt, error) { - // In-memory fast path used by tests - if b.mined && tx != nil && tx.Hash() == b.sentTxHash && blockHash == common.HexToHash("0x01") && blockNumber == 1 && blockIndex == 0 { +func (b *testBackend) GetCanonicalReceipt(tx *types.Transaction, blockHash common.Hash, blockNumber, blockIndex uint64) (*types.Receipt, error) { + if b.autoMine && tx != nil && tx.Hash() == b.sentTxHash && + blockHash == fakeBlockHash(tx.Hash()) && + blockIndex == 0 && + blockNumber == b.chain.CurrentHeader().Number.Uint64()+1 { return &types.Receipt{ Type: tx.Type(), Status: types.ReceiptStatusSuccessful, @@ -631,9 +653,9 @@ func (b testBackend) GetCanonicalReceipt(tx *types.Transaction, blockHash common BlockHash: blockHash, BlockNumber: new(big.Int).SetUint64(blockNumber), TransactionIndex: 0, + TxHash: tx.Hash(), }, nil } - // Fallback: use the chain's canonical receipt (DB-backed) return b.chain.GetCanonicalReceipt(tx, blockHash, blockNumber, blockIndex) } func (b testBackend) TxIndexDone() bool { @@ -3961,6 +3983,66 @@ func makeSignedRaw(t *testing.T, api *TransactionAPI, from, to common.Address, v func makeSelfSignedRaw(t *testing.T, api *TransactionAPI, addr common.Address) (hexutil.Bytes, *types.Transaction) { return makeSignedRaw(t, api, addr, addr, big.NewInt(0)) } + +func (b *testBackend) SubscribeTransactionReceipts(txHashes []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription { + // If no feed is wired for this test, return a no-op subscription + if b.receiptsFeed == nil { + return event.NewSubscription(func(quit <-chan struct{}) error { + <-quit + return nil + }) + } + + // No filter => forward batches directly + if len(txHashes) == 0 { + return b.receiptsFeed.Subscribe(ch) + } + + // Filtered: wrap the underlying feed and only forward matching receipts + in := make(chan []*ReceiptWithTx, 1) + sub := b.receiptsFeed.Subscribe(in) + + // Build a hash set for quick filtering + wanted := make(map[common.Hash]struct{}, len(txHashes)) + for _, h := range txHashes { + wanted[h] = struct{}{} + } + + return event.NewSubscription(func(quit <-chan struct{}) error { + defer sub.Unsubscribe() + for { + select { + case batch, ok := <-in: + if !ok { + return nil + } + var out []*ReceiptWithTx + for _, r := range batch { + if r != nil && r.Receipt != nil { + if _, ok := wanted[r.Receipt.TxHash]; ok { + out = append(out, r) + } + } + } + if len(out) == 0 { + continue + } + select { + case ch <- out: + case <-quit: + return nil + } + case err, ok := <-sub.Err(): + if !ok || err == nil { + return nil + } + return err + case <-quit: + return nil + } + } + }) +} func TestSendRawTransactionSync_Success(t *testing.T) { t.Parallel() genesis := &core.Genesis{ diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index af3d592b82b..b67ecef3227 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -98,6 +98,7 @@ type Backend interface { GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribeTransactionReceipts(txHashes []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription CurrentView() *filtermaps.ChainView NewMatcherBackend() filtermaps.MatcherBackend diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 30791f32b5e..dd75defecf9 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -411,3 +411,6 @@ func (b *backendMock) CurrentView() *filtermaps.ChainView { return nil func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil } func (b *backendMock) HistoryPruningCutoff() uint64 { return 0 } +func (b *backendMock) SubscribeTransactionReceipts(txHashes []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription { + return nil +} From 3d083d02fd07db7dce6524581b342f16479cc3b6 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Thu, 9 Oct 2025 12:27:49 -0700 Subject: [PATCH 10/23] rm redundant headfeed --- internal/ethapi/api.go | 6 +++--- internal/ethapi/api_test.go | 4 +--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index ccc0c3c7a7c..bb3de426fb9 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1691,8 +1691,8 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex } // Subscribe to receipt stream (filtered to this tx) - rcpts := make(chan []*ReceiptWithTx, 1) - sub := api.b.SubscribeTransactionReceipts([]common.Hash{hash}, rcpts) + receipts := make(chan []*ReceiptWithTx, 1) + sub := api.b.SubscribeTransactionReceipts([]common.Hash{hash}, receipts) defer sub.Unsubscribe() subErrCh := sub.Err() @@ -1717,7 +1717,7 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex } return nil, err - case batch := <-rcpts: + case batch := <-receipts: for _, rwt := range batch { if rwt == nil || rwt.Receipt == nil || rwt.Receipt.TxHash != hash { continue diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 94d9982a83d..c6da3669a86 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -443,7 +443,6 @@ type testBackend struct { // test-only fields for SendRawTransactionSync receiptsFeed *event.Feed - headFeed *event.Feed // keep if other tests use it; otherwise remove autoMine bool sentTx *types.Transaction @@ -481,7 +480,6 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.E acc: acc, pending: blocks[n], pendingReceipts: receipts[n], - headFeed: new(event.Feed), receiptsFeed: new(event.Feed), } return backend @@ -607,7 +605,7 @@ func (b testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscr panic("implement me") } func (b testBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { - return b.headFeed.Subscribe(ch) + panic("implement me") } func (b *testBackend) SendTx(ctx context.Context, tx *types.Transaction) error { b.sentTx = tx From 862c07ec363aa668aa8672ee2838cc0861997ea9 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Thu, 9 Oct 2025 16:55:23 -0700 Subject: [PATCH 11/23] use FilterReceipts, avoid circular deps --- eth/api_backend.go | 29 +++++++++++++++++++++++++++++ eth/filters/filter.go | 10 ++++------ eth/filters/filter_system.go | 2 +- internal/ethapi/api.go | 6 +++--- internal/ethapi/api_test.go | 3 +-- 5 files changed, 38 insertions(+), 12 deletions(-) diff --git a/eth/api_backend.go b/eth/api_backend.go index 766a99fc1ef..984e6ff5f6c 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -36,10 +36,12 @@ import ( "github.com/ethereum/go-ethereum/core/txpool/locals" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -494,3 +496,30 @@ func (b *EthAPIBackend) RPCTxSyncDefaultTimeout() time.Duration { func (b *EthAPIBackend) RPCTxSyncMaxTimeout() time.Duration { return b.eth.config.TxSyncMaxTimeout } + +func (b *EthAPIBackend) SubscribeTransactionReceipts( + txHashes []common.Hash, + out chan<- []*ethapi.ReceiptWithTx, +) event.Subscription { + ch := make(chan core.ChainEvent, 16) + sub := b.eth.blockchain.SubscribeChainEvent(ch) + + go func() { + defer sub.Unsubscribe() + for { + select { + case ev, ok := <-ch: + if !ok { + return + } + batch := filters.FilterReceipts(txHashes, ev) + if len(batch) > 0 { + out <- batch + } + case <-sub.Err(): + return + } + } + }() + return sub +} diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 02399bc8018..ee93a985f06 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/history" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -554,16 +555,13 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo } // ReceiptWithTx contains a receipt and its corresponding transaction -type ReceiptWithTx struct { - Receipt *types.Receipt - Transaction *types.Transaction -} +type ReceiptWithTx = ethapi.ReceiptWithTx -// filterReceipts returns the receipts matching the given criteria +// FilterReceipts returns the receipts matching the given criteria // In addition to returning receipts, it also returns the corresponding transactions. // This is because receipts only contain low-level data, while user-facing data // may require additional information from the Transaction. -func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx { +func FilterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx { var ret []*ReceiptWithTx receipts := ev.Receipts diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 02783fa5ec5..9210dba21cf 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -445,7 +445,7 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) // Handle transaction receipts subscriptions when a new block is added for _, f := range filters[TransactionReceiptsSubscription] { - matchedReceipts := filterReceipts(f.txHashes, ev) + matchedReceipts := FilterReceipts(f.txHashes, ev) if len(matchedReceipts) > 0 { f.receipts <- matchedReceipts } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index bb3de426fb9..a3d6aebd4d3 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1653,8 +1653,8 @@ func (api *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil } type ReceiptWithTx struct { - Receipt *types.Receipt - Tx *types.Transaction + Receipt *types.Receipt + Transaction *types.Transaction } // SendRawTransactionSync will add the signed transaction to the transaction pool @@ -1729,7 +1729,7 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex rwt.Receipt.BlockHash, rwt.Receipt.BlockNumber.Uint64(), api.signer, - rwt.Tx, + rwt.Transaction, int(rwt.Receipt.TransactionIndex), ), nil } diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index c6da3669a86..3105bba5328 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -441,7 +441,6 @@ type testBackend struct { pending *types.Block pendingReceipts types.Receipts - // test-only fields for SendRawTransactionSync receiptsFeed *event.Feed autoMine bool @@ -624,7 +623,7 @@ func (b *testBackend) SendTx(ctx context.Context, tx *types.Transaction) error { CumulativeGasUsed: 21000, GasUsed: 21000, } - b.receiptsFeed.Send([]*ReceiptWithTx{{Receipt: r, Tx: tx}}) + b.receiptsFeed.Send([]*ReceiptWithTx{{Receipt: r, Transaction: tx}}) } return nil } From 91be12e423d1de8704b31fd2c7843d7e195a0ced Mon Sep 17 00:00:00 2001 From: aodhgan Date: Thu, 9 Oct 2025 17:09:47 -0700 Subject: [PATCH 12/23] eliminate complex SubscribeTransactionReceipts on testBackend --- internal/ethapi/api_test.go | 60 +++---------------------------------- 1 file changed, 4 insertions(+), 56 deletions(-) diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 3105bba5328..c28b22e6eec 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -3981,64 +3981,12 @@ func makeSelfSignedRaw(t *testing.T, api *TransactionAPI, addr common.Address) ( return makeSignedRaw(t, api, addr, addr, big.NewInt(0)) } -func (b *testBackend) SubscribeTransactionReceipts(txHashes []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription { - // If no feed is wired for this test, return a no-op subscription +func (b *testBackend) SubscribeTransactionReceipts(_ []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription { if b.receiptsFeed == nil { - return event.NewSubscription(func(quit <-chan struct{}) error { - <-quit - return nil - }) - } - - // No filter => forward batches directly - if len(txHashes) == 0 { - return b.receiptsFeed.Subscribe(ch) - } - - // Filtered: wrap the underlying feed and only forward matching receipts - in := make(chan []*ReceiptWithTx, 1) - sub := b.receiptsFeed.Subscribe(in) - - // Build a hash set for quick filtering - wanted := make(map[common.Hash]struct{}, len(txHashes)) - for _, h := range txHashes { - wanted[h] = struct{}{} + return event.NewSubscription(func(quit <-chan struct{}) error { <-quit; return nil }) } - - return event.NewSubscription(func(quit <-chan struct{}) error { - defer sub.Unsubscribe() - for { - select { - case batch, ok := <-in: - if !ok { - return nil - } - var out []*ReceiptWithTx - for _, r := range batch { - if r != nil && r.Receipt != nil { - if _, ok := wanted[r.Receipt.TxHash]; ok { - out = append(out, r) - } - } - } - if len(out) == 0 { - continue - } - select { - case ch <- out: - case <-quit: - return nil - } - case err, ok := <-sub.Err(): - if !ok || err == nil { - return nil - } - return err - case <-quit: - return nil - } - } - }) + // Test will only publish the receipts it wants; we just forward. + return b.receiptsFeed.Subscribe(ch) } func TestSendRawTransactionSync_Success(t *testing.T) { t.Parallel() From 960d6c336eafd633b06b610ad9257c81eb221dea Mon Sep 17 00:00:00 2001 From: aodhgan Date: Thu, 9 Oct 2025 17:18:21 -0700 Subject: [PATCH 13/23] add to ethclient --- ethclient/ethclient.go | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 1195929f7d2..55588d6c3f6 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -18,11 +18,13 @@ package ethclient import ( + "bytes" "context" "encoding/json" "errors" "fmt" "math/big" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -696,6 +698,44 @@ func (ec *Client) SendTransaction(ctx context.Context, tx *types.Transaction) er return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data)) } +// SendRawTransactionSync submits a signed tx and waits for a receipt (or until +// the optional timeout elapses on the server side). If timeout == 0, the server +// uses its default. +func (ec *Client) SendRawTransactionSync( + ctx context.Context, + tx *types.Transaction, + timeout time.Duration, +) (*types.Receipt, error) { + + var buf bytes.Buffer + if err := tx.EncodeRLP(&buf); err != nil { + return nil, err + } + return ec.SendRawTransactionSyncRaw(ctx, buf.Bytes(), timeout) +} + +// SendRawTransactionSyncRaw is the low-level variant that takes the raw RLP. +func (ec *Client) SendRawTransactionSyncRaw( + ctx context.Context, + rawTx []byte, + timeout time.Duration, +) (*types.Receipt, error) { + + var out *types.Receipt + + // Build params: raw bytes as hex, plus optional timeout as hexutil.Uint64 + params := []any{hexutil.Bytes(rawTx)} + if timeout > 0 { + t := hexutil.Uint64(timeout.Milliseconds()) + params = append(params, t) + } + + if err := ec.c.CallContext(ctx, &out, "eth_sendRawTransactionSync", params...); err != nil { + return nil, err + } + return out, nil +} + // RevertErrorData returns the 'revert reason' data of a contract call. // // This can be used with CallContract and EstimateGas, and only when the server is Geth. From 4c075f62cd6b2863fbffeed386edfd305ad67701 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Thu, 9 Oct 2025 17:24:45 -0700 Subject: [PATCH 14/23] fix lint --- ethclient/ethclient.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 55588d6c3f6..fbffe2678fa 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -706,7 +706,6 @@ func (ec *Client) SendRawTransactionSync( tx *types.Transaction, timeout time.Duration, ) (*types.Receipt, error) { - var buf bytes.Buffer if err := tx.EncodeRLP(&buf); err != nil { return nil, err @@ -720,7 +719,6 @@ func (ec *Client) SendRawTransactionSyncRaw( rawTx []byte, timeout time.Duration, ) (*types.Receipt, error) { - var out *types.Receipt // Build params: raw bytes as hex, plus optional timeout as hexutil.Uint64 From 2ed437e36161210977d8d1715f96e1e3fcd800f3 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Sun, 12 Oct 2025 23:08:07 -0700 Subject: [PATCH 15/23] just subscribe directly using SubscribeChainEvent --- eth/api_backend.go | 29 ------------- eth/filters/filter.go | 10 +++-- eth/filters/filter_system.go | 2 +- internal/ethapi/api.go | 54 ++++++++++++------------ internal/ethapi/api_test.go | 33 ++++++++------- internal/ethapi/backend.go | 1 - internal/ethapi/transaction_args_test.go | 3 -- 7 files changed, 51 insertions(+), 81 deletions(-) diff --git a/eth/api_backend.go b/eth/api_backend.go index 984e6ff5f6c..766a99fc1ef 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -36,12 +36,10 @@ import ( "github.com/ethereum/go-ethereum/core/txpool/locals" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -496,30 +494,3 @@ func (b *EthAPIBackend) RPCTxSyncDefaultTimeout() time.Duration { func (b *EthAPIBackend) RPCTxSyncMaxTimeout() time.Duration { return b.eth.config.TxSyncMaxTimeout } - -func (b *EthAPIBackend) SubscribeTransactionReceipts( - txHashes []common.Hash, - out chan<- []*ethapi.ReceiptWithTx, -) event.Subscription { - ch := make(chan core.ChainEvent, 16) - sub := b.eth.blockchain.SubscribeChainEvent(ch) - - go func() { - defer sub.Unsubscribe() - for { - select { - case ev, ok := <-ch: - if !ok { - return - } - batch := filters.FilterReceipts(txHashes, ev) - if len(batch) > 0 { - out <- batch - } - case <-sub.Err(): - return - } - } - }() - return sub -} diff --git a/eth/filters/filter.go b/eth/filters/filter.go index ee93a985f06..02399bc8018 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/history" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -555,13 +554,16 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo } // ReceiptWithTx contains a receipt and its corresponding transaction -type ReceiptWithTx = ethapi.ReceiptWithTx +type ReceiptWithTx struct { + Receipt *types.Receipt + Transaction *types.Transaction +} -// FilterReceipts returns the receipts matching the given criteria +// filterReceipts returns the receipts matching the given criteria // In addition to returning receipts, it also returns the corresponding transactions. // This is because receipts only contain low-level data, while user-facing data // may require additional information from the Transaction. -func FilterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx { +func filterReceipts(txHashes []common.Hash, ev core.ChainEvent) []*ReceiptWithTx { var ret []*ReceiptWithTx receipts := ev.Receipts diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 9210dba21cf..02783fa5ec5 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -445,7 +445,7 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) // Handle transaction receipts subscriptions when a new block is added for _, f := range filters[TransactionReceiptsSubscription] { - matchedReceipts := FilterReceipts(f.txHashes, ev) + matchedReceipts := filterReceipts(f.txHashes, ev) if len(matchedReceipts) > 0 { f.receipts <- matchedReceipts } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a3d6aebd4d3..bcd1fb7b2a7 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1652,11 +1652,6 @@ func (api *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil return SubmitTransaction(ctx, api.b, tx) } -type ReceiptWithTx struct { - Receipt *types.Receipt - Transaction *types.Transaction -} - // SendRawTransactionSync will add the signed transaction to the transaction pool // and wait until the transaction has been included in a block and return the receipt, or the timeout. func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hexutil.Bytes, timeoutMs *hexutil.Uint64) (map[string]interface{}, error) { @@ -1664,6 +1659,12 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex if err := tx.UnmarshalBinary(input); err != nil { return nil, err } + + ch := make(chan core.ChainEvent, 128) + sub := api.b.SubscribeChainEvent(ch) + subErrCh := sub.Err() + defer sub.Unsubscribe() + hash, err := SubmitTransaction(ctx, api.b, tx) if err != nil { return nil, err @@ -1690,13 +1691,6 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex return r, nil } - // Subscribe to receipt stream (filtered to this tx) - receipts := make(chan []*ReceiptWithTx, 1) - sub := api.b.SubscribeTransactionReceipts([]common.Hash{hash}, receipts) - defer sub.Unsubscribe() - - subErrCh := sub.Err() - for { select { case <-receiptCtx.Done(): @@ -1717,23 +1711,27 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex } return nil, err - case batch := <-receipts: - for _, rwt := range batch { - if rwt == nil || rwt.Receipt == nil || rwt.Receipt.TxHash != hash { - continue - } - - if rwt.Receipt.BlockNumber != nil && rwt.Receipt.BlockHash != (common.Hash{}) { - return MarshalReceipt( - rwt.Receipt, - rwt.Receipt.BlockHash, - rwt.Receipt.BlockNumber.Uint64(), - api.signer, - rwt.Transaction, - int(rwt.Receipt.TransactionIndex), - ), nil + case ev := <-ch: + rs := ev.Receipts + txs := ev.Transactions + if len(rs) == 0 || len(rs) != len(txs) { + continue + } + for i := range rs { + if rs[i].TxHash == hash { + if rs[i].BlockNumber != nil && rs[i].BlockHash != (common.Hash{}) { + signer := types.LatestSigner(api.b.ChainConfig()) + return MarshalReceipt( + rs[i], + rs[i].BlockHash, + rs[i].BlockNumber.Uint64(), + signer, + txs[i], + int(rs[i].TransactionIndex), + ), nil + } + return api.GetTransactionReceipt(receiptCtx, hash) } - return api.GetTransactionReceipt(receiptCtx, hash) } } } diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index c28b22e6eec..ad2af1ec115 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -441,8 +441,8 @@ type testBackend struct { pending *types.Block pendingReceipts types.Receipts - receiptsFeed *event.Feed - autoMine bool + chainFeed *event.Feed + autoMine bool sentTx *types.Transaction sentTxHash common.Hash @@ -479,7 +479,7 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.E acc: acc, pending: blocks[n], pendingReceipts: receipts[n], - receiptsFeed: new(event.Feed), + chainFeed: new(event.Feed), } return backend } @@ -601,7 +601,7 @@ func (b testBackend) GetEVM(ctx context.Context, state *state.StateDB, header *t return vm.NewEVM(context, state, b.chain.Config(), *vmConfig) } func (b testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { - panic("implement me") + return b.chainFeed.Subscribe(ch) } func (b testBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { panic("implement me") @@ -610,11 +610,12 @@ func (b *testBackend) SendTx(ctx context.Context, tx *types.Transaction) error { b.sentTx = tx b.sentTxHash = tx.Hash() - if b.autoMine && b.receiptsFeed != nil { + if b.autoMine { + // Synthesize a "mined" receipt at head+1 num := b.chain.CurrentHeader().Number.Uint64() + 1 - bh := fakeBlockHash(tx.Hash()) - r := &types.Receipt{ + bh := fakeBlockHash(tx.Hash()) + receipt := &types.Receipt{ TxHash: tx.Hash(), Status: types.ReceiptStatusSuccessful, BlockHash: bh, @@ -623,7 +624,16 @@ func (b *testBackend) SendTx(ctx context.Context, tx *types.Transaction) error { CumulativeGasUsed: 21000, GasUsed: 21000, } - b.receiptsFeed.Send([]*ReceiptWithTx{{Receipt: r, Transaction: tx}}) + hdr := &types.Header{ + Number: new(big.Int).SetUint64(num), + } + + // Broadcast a ChainEvent that includes the receipts and txs + b.chainFeed.Send(core.ChainEvent{ + Header: hdr, + Receipts: types.Receipts{receipt}, + Transactions: types.Transactions{tx}, + }) } return nil } @@ -3981,13 +3991,6 @@ func makeSelfSignedRaw(t *testing.T, api *TransactionAPI, addr common.Address) ( return makeSignedRaw(t, api, addr, addr, big.NewInt(0)) } -func (b *testBackend) SubscribeTransactionReceipts(_ []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription { - if b.receiptsFeed == nil { - return event.NewSubscription(func(quit <-chan struct{}) error { <-quit; return nil }) - } - // Test will only publish the receipts it wants; we just forward. - return b.receiptsFeed.Subscribe(ch) -} func TestSendRawTransactionSync_Success(t *testing.T) { t.Parallel() genesis := &core.Genesis{ diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index b67ecef3227..af3d592b82b 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -98,7 +98,6 @@ type Backend interface { GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribeTransactionReceipts(txHashes []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription CurrentView() *filtermaps.ChainView NewMatcherBackend() filtermaps.MatcherBackend diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index dd75defecf9..30791f32b5e 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -411,6 +411,3 @@ func (b *backendMock) CurrentView() *filtermaps.ChainView { return nil func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil } func (b *backendMock) HistoryPruningCutoff() uint64 { return 0 } -func (b *backendMock) SubscribeTransactionReceipts(txHashes []common.Hash, ch chan<- []*ReceiptWithTx) event.Subscription { - return nil -} From 3aa35385074774970f93549d162fc63c4b51a7d1 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Sun, 12 Oct 2025 23:13:14 -0700 Subject: [PATCH 16/23] clean --- internal/ethapi/api.go | 2 +- internal/ethapi/api_test.go | 24 ++++++++++-------------- internal/ethapi/errors.go | 6 ++---- 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index bcd1fb7b2a7..2f24a6c7209 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1694,7 +1694,7 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex for { select { case <-receiptCtx.Done(): - // Upstream cancellation -> bubble it; otherwise emit our timeout error + // Upstream cancellation -> bubble it; otherwise emit the timeout error if err := ctx.Err(); err != nil { return nil, err } diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index ad2af1ec115..aaa002b5ec0 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -447,8 +447,8 @@ type testBackend struct { sentTx *types.Transaction sentTxHash common.Hash - syncDefaultTO time.Duration - syncMaxTO time.Duration + syncDefaultTimeout time.Duration + syncMaxTimeout time.Duration } func fakeBlockHash(txh common.Hash) common.Hash { @@ -613,24 +613,20 @@ func (b *testBackend) SendTx(ctx context.Context, tx *types.Transaction) error { if b.autoMine { // Synthesize a "mined" receipt at head+1 num := b.chain.CurrentHeader().Number.Uint64() + 1 - - bh := fakeBlockHash(tx.Hash()) receipt := &types.Receipt{ TxHash: tx.Hash(), Status: types.ReceiptStatusSuccessful, - BlockHash: bh, + BlockHash: fakeBlockHash(tx.Hash()), BlockNumber: new(big.Int).SetUint64(num), TransactionIndex: 0, CumulativeGasUsed: 21000, GasUsed: 21000, } - hdr := &types.Header{ - Number: new(big.Int).SetUint64(num), - } - // Broadcast a ChainEvent that includes the receipts and txs b.chainFeed.Send(core.ChainEvent{ - Header: hdr, + Header: &types.Header{ + Number: new(big.Int).SetUint64(num), + }, Receipts: types.Receipts{receipt}, Transactions: types.Transactions{tx}, }) @@ -3954,14 +3950,14 @@ func (b configTimeBackend) CurrentHeader() *types.Header { } func (b *testBackend) RPCTxSyncDefaultTimeout() time.Duration { - if b.syncDefaultTO != 0 { - return b.syncDefaultTO + if b.syncDefaultTimeout != 0 { + return b.syncDefaultTimeout } return 2 * time.Second } func (b *testBackend) RPCTxSyncMaxTimeout() time.Duration { - if b.syncMaxTO != 0 { - return b.syncMaxTO + if b.syncMaxTimeout != 0 { + return b.syncMaxTimeout } return 5 * time.Minute } diff --git a/internal/ethapi/errors.go b/internal/ethapi/errors.go index 235f5b3fa8d..30711a01679 100644 --- a/internal/ethapi/errors.go +++ b/internal/ethapi/errors.go @@ -176,8 +176,6 @@ type blockGasLimitReachedError struct{ message string } func (e *blockGasLimitReachedError) Error() string { return e.message } func (e *blockGasLimitReachedError) ErrorCode() int { return errCodeBlockGasLimitReached } -func (e *txSyncTimeoutError) Error() string { return e.msg } -func (e *txSyncTimeoutError) ErrorCode() int { return errCodeTxSyncTimeout } - -// ErrorData should be JSON-safe; return the 0x-hex string. +func (e *txSyncTimeoutError) Error() string { return e.msg } +func (e *txSyncTimeoutError) ErrorCode() int { return errCodeTxSyncTimeout } func (e *txSyncTimeoutError) ErrorData() interface{} { return e.hash.Hex() } From 455f09d00a08711cfd44dcdb8400f1f1010b6289 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Sun, 12 Oct 2025 23:19:15 -0700 Subject: [PATCH 17/23] fully qualify flag names --- cmd/geth/main.go | 4 ++-- cmd/utils/flags.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 2a36c7851c2..cc294b2f309 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -188,8 +188,8 @@ var ( utils.AllowUnprotectedTxs, utils.BatchRequestLimit, utils.BatchResponseMaxSize, - utils.RPCTxSyncDefaultFlag, - utils.RPCTxSyncMaxFlag, + utils.RPCTxSyncDefaultTimeoutFlag, + utils.RPCTxSyncMaxTimeoutFlag, } metricsFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 346990a2efe..0c5db9e6d8b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -615,14 +615,14 @@ var ( Value: ethconfig.Defaults.LogQueryLimit, Category: flags.APICategory, } - RPCTxSyncDefaultFlag = &cli.DurationFlag{ - Name: "rpc.txsync.default", + RPCTxSyncDefaultTimeoutFlag = &cli.DurationFlag{ + Name: "rpc.txsync.defaulttimeout", Usage: "Default timeout for eth_sendRawTransactionSync (e.g. 2s, 500ms)", Value: ethconfig.Defaults.TxSyncDefaultTimeout, Category: flags.APICategory, } - RPCTxSyncMaxFlag = &cli.DurationFlag{ - Name: "rpc.txsync.max", + RPCTxSyncMaxTimeoutFlag = &cli.DurationFlag{ + Name: "rpc.txsync.maxtimeout", Usage: "Maximum allowed timeout for eth_sendRawTransactionSync (e.g. 5m)", Value: ethconfig.Defaults.TxSyncMaxTimeout, Category: flags.APICategory, @@ -1729,11 +1729,11 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(RPCGlobalLogQueryLimit.Name) { cfg.LogQueryLimit = ctx.Int(RPCGlobalLogQueryLimit.Name) } - if ctx.IsSet(RPCTxSyncDefaultFlag.Name) { - cfg.TxSyncDefaultTimeout = ctx.Duration(RPCTxSyncDefaultFlag.Name) + if ctx.IsSet(RPCTxSyncDefaultTimeoutFlag.Name) { + cfg.TxSyncDefaultTimeout = ctx.Duration(RPCTxSyncDefaultTimeoutFlag.Name) } - if ctx.IsSet(RPCTxSyncMaxFlag.Name) { - cfg.TxSyncMaxTimeout = ctx.Duration(RPCTxSyncMaxFlag.Name) + if ctx.IsSet(RPCTxSyncMaxTimeoutFlag.Name) { + cfg.TxSyncMaxTimeout = ctx.Duration(RPCTxSyncMaxTimeoutFlag.Name) } if !ctx.Bool(SnapshotFlag.Name) || cfg.SnapshotCache == 0 { // If snap-sync is requested, this flag is also required From 1d662b50a629532707af1668050f39ab76e5fcf2 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Mon, 13 Oct 2025 12:50:24 -0700 Subject: [PATCH 18/23] simplify ethclient --- ethclient/ethclient.go | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index fbffe2678fa..93d63d4ef36 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -18,7 +18,6 @@ package ethclient import ( - "bytes" "context" "encoding/json" "errors" @@ -698,40 +697,36 @@ func (ec *Client) SendTransaction(ctx context.Context, tx *types.Transaction) er return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", hexutil.Encode(data)) } -// SendRawTransactionSync submits a signed tx and waits for a receipt (or until +// SendTransactionSync submits a signed tx and waits for a receipt (or until // the optional timeout elapses on the server side). If timeout == 0, the server // uses its default. -func (ec *Client) SendRawTransactionSync( +func (ec *Client) SendTransactionSync( ctx context.Context, tx *types.Transaction, - timeout time.Duration, + timeout ...time.Duration, ) (*types.Receipt, error) { - var buf bytes.Buffer - if err := tx.EncodeRLP(&buf); err != nil { + raw, err := tx.MarshalBinary() + if err != nil { return nil, err } - return ec.SendRawTransactionSyncRaw(ctx, buf.Bytes(), timeout) + return ec.SendRawTransactionSync(ctx, raw, timeout...) } -// SendRawTransactionSyncRaw is the low-level variant that takes the raw RLP. -func (ec *Client) SendRawTransactionSyncRaw( +func (ec *Client) SendRawTransactionSync( ctx context.Context, rawTx []byte, - timeout time.Duration, + timeout ...time.Duration, ) (*types.Receipt, error) { - var out *types.Receipt - - // Build params: raw bytes as hex, plus optional timeout as hexutil.Uint64 - params := []any{hexutil.Bytes(rawTx)} - if timeout > 0 { - t := hexutil.Uint64(timeout.Milliseconds()) - params = append(params, t) + var ms *hexutil.Uint64 + if len(timeout) > 0 && timeout[0] > 0 { + v := hexutil.Uint64(timeout[0] / time.Millisecond) + ms = &v } - - if err := ec.c.CallContext(ctx, &out, "eth_sendRawTransactionSync", params...); err != nil { + var receipt types.Receipt + if err := ec.c.CallContext(ctx, &receipt, "eth_sendRawTransactionSync", hexutil.Bytes(rawTx), ms); err != nil { return nil, err } - return out, nil + return &receipt, nil } // RevertErrorData returns the 'revert reason' data of a contract call. From 3ab8ec880bf9db256b469cf33996c01df8e16511 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Mon, 13 Oct 2025 12:57:23 -0700 Subject: [PATCH 19/23] check for DeadlineExceeded --- internal/ethapi/api.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 2f24a6c7209..22e82bea342 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1694,14 +1694,19 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex for { select { case <-receiptCtx.Done(): - // Upstream cancellation -> bubble it; otherwise emit the timeout error + // If server-side wait window elapsed, return the structured timeout. + if errors.Is(receiptCtx.Err(), context.DeadlineExceeded) { + return nil, &txSyncTimeoutError{ + msg: fmt.Sprintf("The transaction was added to the transaction pool but wasn't processed in %v.", timeout), + hash: hash, + } + } + // Otherwise, bubble the caller's context error (canceled or deadline). if err := ctx.Err(); err != nil { return nil, err } - return nil, &txSyncTimeoutError{ - msg: fmt.Sprintf("The transaction was added to the transaction pool but wasn't processed in %v.", timeout), - hash: hash, - } + // Fallback: return the derived context's error. + return nil, receiptCtx.Err() case err, ok := <-subErrCh: if !ok || err == nil { From 409eea40630086611d7a75e44573a0cbe337f6c1 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Mon, 13 Oct 2025 13:07:10 -0700 Subject: [PATCH 20/23] dont continue on error --- internal/ethapi/api.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 22e82bea342..a96a1c6f47b 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1709,10 +1709,8 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex return nil, receiptCtx.Err() case err, ok := <-subErrCh: - if !ok || err == nil { - // subscription closed; disable this case - subErrCh = nil - continue + if !ok { + return nil, errors.New("chain subscription closed") } return nil, err From eed426c36fbd6eb8219d7d16ea0577d8114a520a Mon Sep 17 00:00:00 2001 From: aodhgan Date: Mon, 13 Oct 2025 13:16:50 -0700 Subject: [PATCH 21/23] handle unwell sub on success --- internal/ethapi/api.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a96a1c6f47b..a440256cb48 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -55,6 +55,7 @@ import ( const estimateGasErrorRatio = 0.015 var errBlobTxNotSupported = errors.New("signing blob transactions not supported") +var errSubClosed = errors.New("chain subscription closed") // EthereumAPI provides an API to access Ethereum related information. type EthereumAPI struct { @@ -1710,11 +1711,14 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex case err, ok := <-subErrCh: if !ok { - return nil, errors.New("chain subscription closed") + return nil, errSubClosed } return nil, err - case ev := <-ch: + case ev, ok := <-ch: + if !ok { + return nil, errSubClosed + } rs := ev.Receipts txs := ev.Transactions if len(rs) == 0 || len(rs) != len(txs) { From b05fbc18689c54f15bc372c28ab200264f02b421 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Wed, 15 Oct 2025 10:42:41 -0700 Subject: [PATCH 22/23] use pointer for timeout param --- ethclient/ethclient.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 93d63d4ef36..3d1c4105c47 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -703,24 +703,26 @@ func (ec *Client) SendTransaction(ctx context.Context, tx *types.Transaction) er func (ec *Client) SendTransactionSync( ctx context.Context, tx *types.Transaction, - timeout ...time.Duration, + timeout *time.Duration, ) (*types.Receipt, error) { raw, err := tx.MarshalBinary() if err != nil { return nil, err } - return ec.SendRawTransactionSync(ctx, raw, timeout...) + return ec.SendRawTransactionSync(ctx, raw, timeout) } func (ec *Client) SendRawTransactionSync( ctx context.Context, rawTx []byte, - timeout ...time.Duration, + timeout *time.Duration, ) (*types.Receipt, error) { var ms *hexutil.Uint64 - if len(timeout) > 0 && timeout[0] > 0 { - v := hexutil.Uint64(timeout[0] / time.Millisecond) - ms = &v + if timeout != nil { + if d := timeout.Milliseconds(); d > 0 { + ms = new(hexutil.Uint64) + *ms = hexutil.Uint64(uint64(d)) + } } var receipt types.Receipt if err := ec.c.CallContext(ctx, &receipt, "eth_sendRawTransactionSync", hexutil.Bytes(rawTx), ms); err != nil { From e203026c4dfb797a5d17167eba68e44cb965ea62 Mon Sep 17 00:00:00 2001 From: aodhgan Date: Wed, 15 Oct 2025 10:58:53 -0700 Subject: [PATCH 23/23] simplify deadline error --- internal/ethapi/api.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a440256cb48..482971135bd 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1702,11 +1702,6 @@ func (api *TransactionAPI) SendRawTransactionSync(ctx context.Context, input hex hash: hash, } } - // Otherwise, bubble the caller's context error (canceled or deadline). - if err := ctx.Err(); err != nil { - return nil, err - } - // Fallback: return the derived context's error. return nil, receiptCtx.Err() case err, ok := <-subErrCh: