From 32232953cdab9a2607ef22a7633bd775ba16947e Mon Sep 17 00:00:00 2001 From: kaladinlight <35275952+kaladinlight@users.noreply.github.com> Date: Wed, 22 Jan 2025 09:22:57 -0700 Subject: [PATCH 1/2] publish new block transactions by address --- api/worker.go | 6 +- bchain/basechain.go | 4 + bchain/coins/blockchain.go | 5 ++ bchain/coins/eth/ethrpc.go | 14 +++- bchain/types.go | 3 +- blockbook-api.ts | 1 + blockbook.go | 8 +- db/sync.go | 4 +- docs/api.md | 15 +++- server/public.go | 6 +- server/websocket.go | 154 ++++++++++++++++++++++++++----------- server/ws_types.go | 3 +- static/test-websocket.html | 12 +++ 13 files changed, 173 insertions(+), 62 deletions(-) diff --git a/api/worker.go b/api/worker.go index 135417a609..eb6de23c88 100644 --- a/api/worker.go +++ b/api/worker.go @@ -221,7 +221,7 @@ func (w *Worker) getTransaction(txid string, spendingTxs bool, specificJSON bool } return nil, NewAPIError(fmt.Sprintf("Transaction '%v' not found (%v)", txid, err), true) } - return w.getTransactionFromBchainTx(bchainTx, height, spendingTxs, specificJSON, addresses) + return w.GetTransactionFromBchainTx(bchainTx, height, spendingTxs, specificJSON, addresses) } func (w *Worker) getParsedEthereumInputData(data string) *bchain.EthereumParsedInputData { @@ -284,8 +284,8 @@ func (w *Worker) getConfirmationETA(tx *Tx) (int64, uint32) { return etaSeconds, etaBlocks } -// getTransactionFromBchainTx reads transaction data from txid -func (w *Worker) getTransactionFromBchainTx(bchainTx *bchain.Tx, height int, spendingTxs bool, specificJSON bool, addresses map[string]struct{}) (*Tx, error) { +// GetTransactionFromBchainTx reads transaction data from txid +func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height int, spendingTxs bool, specificJSON bool, addresses map[string]struct{}) (*Tx, error) { var err error var ta *db.TxAddresses var tokens []TokenTransfer diff --git a/bchain/basechain.go b/bchain/basechain.go index 7e34c988ca..e703d422ea 100644 --- a/bchain/basechain.go +++ b/bchain/basechain.go @@ -95,3 +95,7 @@ func (b *BaseChain) EthereumTypeRpcCall(data, to, from string) (string, error) { func (b *BaseChain) EthereumTypeGetRawTransaction(txid string) (string, error) { return "", errors.New("not supported") } + +func (b *BaseChain) EthereumTypeGetTransactionReceipt(txid string) (*RpcReceipt, error) { + return nil, errors.New("not supported") +} diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index 65b65d41a4..ba63533147 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -374,6 +374,11 @@ func (c *blockChainWithMetrics) EthereumTypeGetRawTransaction(txid string) (v st return c.b.EthereumTypeGetRawTransaction(txid) } +func (c *blockChainWithMetrics) EthereumTypeGetTransactionReceipt(txid string) (v *bchain.RpcReceipt, err error) { + defer func(s time.Time) { c.observeRPCLatency("EthereumTypeGetTransactionReceipt", s, err) }(time.Now()) + return c.b.EthereumTypeGetTransactionReceipt(txid) +} + type mempoolWithMetrics struct { mempool bchain.Mempool m *common.Metrics diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index c054f41526..01beea8b1b 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -944,8 +944,7 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) { return nil, errors.Annotatef(err, "txid %v", txid) } tx.BaseFeePerGas = ht.BaseFeePerGas - var receipt bchain.RpcReceipt - err = b.RPC.CallContext(ctx, &receipt, "eth_getTransactionReceipt", hash) + receipt, err := b.EthereumTypeGetTransactionReceipt(txid) if err != nil { return nil, errors.Annotatef(err, "txid %v", txid) } @@ -957,7 +956,7 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) { if err != nil { return nil, errors.Annotatef(err, "txid %v", txid) } - btx, err = b.Parser.ethTxToTx(tx, &receipt, nil, time, confirmations, true) + btx, err = b.Parser.ethTxToTx(tx, receipt, nil, time, confirmations, true) if err != nil { return nil, errors.Annotatef(err, "txid %v", txid) } @@ -1190,6 +1189,15 @@ func (b *EthereumRPC) callRpcStringResult(rpcMethod string, args ...interface{}) return result, nil } +// EthereumTypeGetTransactionReceipt returns the transaction receipt by the transaction ID. +func (b *EthereumRPC) EthereumTypeGetTransactionReceipt(txid string) (*bchain.RpcReceipt, error) { + ctx, cancel := context.WithTimeout(context.Background(), b.Timeout) + defer cancel() + var r *bchain.RpcReceipt + err := b.RPC.CallContext(ctx, &r, "eth_getTransactionReceipt", ethcommon.HexToHash(txid)) + return r, err +} + // EthereumTypeGetBalance returns current balance of an address func (b *EthereumRPC) EthereumTypeGetBalance(addrDesc bchain.AddressDescriptor) (*big.Int, error) { ctx, cancel := context.WithTimeout(context.Background(), b.Timeout) diff --git a/bchain/types.go b/bchain/types.go index 8e214ae1b1..38f822132d 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -288,7 +288,7 @@ type MempoolTxidFilterEntries struct { } // OnNewBlockFunc is used to send notification about a new block -type OnNewBlockFunc func(hash string, height uint32) +type OnNewBlockFunc func(block *Block) // OnNewTxAddrFunc is used to send notification about a new transaction/address type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor) @@ -346,6 +346,7 @@ type BlockChain interface { EthereumTypeGetStakingPoolsData(addrDesc AddressDescriptor) ([]StakingPoolData, error) EthereumTypeRpcCall(data, to, from string) (string, error) EthereumTypeGetRawTransaction(txid string) (string, error) + EthereumTypeGetTransactionReceipt(txid string) (*RpcReceipt, error) GetTokenURI(contractDesc AddressDescriptor, tokenID *big.Int) (string, error) } diff --git a/blockbook-api.ts b/blockbook-api.ts index 56ec8ae73c..f3600c62bb 100644 --- a/blockbook-api.ts +++ b/blockbook-api.ts @@ -760,6 +760,7 @@ export interface WsSendTransactionReq { export interface WsSubscribeAddressesReq { /** List of addresses to subscribe for updates (e.g., new transactions). */ addresses: string[]; + newBlockTxs?: boolean; } export interface WsSubscribeFiatRatesReq { /** Fiat currency code (e.g. 'USD'). */ diff --git a/blockbook.go b/blockbook.go index fa0cfdd7e8..6ce8765ea7 100644 --- a/blockbook.go +++ b/blockbook.go @@ -520,11 +520,11 @@ func syncIndexLoop() { glog.Info("syncIndexLoop starting") // resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second common.TickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() { - if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil { + if err := syncWorker.ResyncIndex(onNewBlock, false); err != nil { glog.Error("syncIndexLoop ", errors.ErrorStack(err), ", will retry...") // retry once in case of random network error, after a slight delay time.Sleep(time.Millisecond * 2500) - if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil { + if err := syncWorker.ResyncIndex(onNewBlock, false); err != nil { glog.Error("syncIndexLoop ", errors.ErrorStack(err)) } } @@ -532,14 +532,14 @@ func syncIndexLoop() { glog.Info("syncIndexLoop stopped") } -func onNewBlockHash(hash string, height uint32) { +func onNewBlock(block *bchain.Block) { defer func() { if r := recover(); r != nil { glog.Error("onNewBlockHash recovered from panic: ", r) } }() for _, c := range callbacksOnNewBlock { - c(hash, height) + c(block) } } diff --git a/db/sync.go b/db/sync.go index e0ba75fc38..6816cb6d89 100644 --- a/db/sync.go +++ b/db/sync.go @@ -243,7 +243,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync return err } if onNewBlock != nil { - onNewBlock(res.block.Hash, res.block.Height) + onNewBlock(res.block) } w.metrics.BlockbookBestHeight.Set(float64(res.block.Height)) if res.block.Height > 0 && res.block.Height%1000 == 0 { @@ -325,7 +325,7 @@ func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, low } if onNewBlock != nil { - onNewBlock(b.Hash, b.Height) + onNewBlock(b) } w.metrics.BlockbookBestHeight.Set(float64(b.Height)) diff --git a/docs/api.md b/docs/api.md index d0fb05fab3..7aedc41d40 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1004,7 +1004,7 @@ The client can subscribe to the following events: - `subscribeNewBlock` - new block added to blockchain - `subscribeNewTransaction` - new transaction added to blockchain (all addresses) -- `subscribeAddresses` - new transaction for a given address (list of addresses) added to mempool +- `subscribeAddresses` - new transaction for a given address (list of addresses) added to mempool (and optionally confirmed in a new block) - `subscribeFiatRates` - new currency rate ticker There can be always only one subscription of given event per connection, i.e. new list of addresses replaces previous list of addresses. @@ -1035,6 +1035,19 @@ Example for subscribing to an address (or multiple addresses) } ``` +Example for subscribing to an address (or multiple addresses) including new block (confirmed) transactions + +```javascript +{ + "id":"1", + "method":"subscribeAddresses", + "params":{ + "addresses":["mnYYiDCb2JZXnqEeXta1nkt5oCVe2RVhJj", "tb1qp0we5epypgj4acd2c4au58045ruud2pd6heuee"] + "newBlockTxs" true, + } +} +``` + ## Legacy API V1 The legacy API is a compatible subset of API provided by **Bitcore Insight**. It is supported only for Bitcoin-type coins. The details of the REST/socket.io requests can be found in the Insight's documentation. diff --git a/server/public.go b/server/public.go index 82650d9b50..6f43a19f5f 100644 --- a/server/public.go +++ b/server/public.go @@ -233,9 +233,9 @@ func (s *PublicServer) Shutdown(ctx context.Context) error { } // OnNewBlock notifies users subscribed to bitcoind/hashblock about new block -func (s *PublicServer) OnNewBlock(hash string, height uint32) { - s.socketio.OnNewBlockHash(hash) - s.websocket.OnNewBlock(hash, height) +func (s *PublicServer) OnNewBlock(block *bchain.Block) { + s.socketio.OnNewBlockHash(block.Hash) + s.websocket.OnNewBlock(block) } // OnNewFiatRatesTicker notifies users subscribed to bitcoind/fiatrates about new ticker diff --git a/server/websocket.go b/server/websocket.go index 9a6d09cba3..9444dea073 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -49,6 +49,11 @@ type websocketChannel struct { getAddressInfoDescriptors map[string]struct{} } +type addressDetails struct { + requestID string + publishNewBlockTxs bool +} + // WebsocketServer is a handle to websocket server type WebsocketServer struct { upgrader *websocket.Upgrader @@ -66,8 +71,9 @@ type WebsocketServer struct { newTransactionEnabled bool newTransactionSubscriptions map[*websocketChannel]string newTransactionSubscriptionsLock sync.Mutex - addressSubscriptions map[string]map[*websocketChannel]string + addressSubscriptions map[string]map[*websocketChannel]*addressDetails addressSubscriptionsLock sync.Mutex + newBlockTxsSubscriptionCount int fiatRatesSubscriptions map[string]map[*websocketChannel]string fiatRatesTokenSubscriptions map[*websocketChannel][]string fiatRatesSubscriptionsLock sync.Mutex @@ -103,7 +109,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain. newBlockSubscriptions: make(map[*websocketChannel]string), newTransactionEnabled: is.EnableSubNewTx, newTransactionSubscriptions: make(map[*websocketChannel]string), - addressSubscriptions: make(map[string]map[*websocketChannel]string), + addressSubscriptions: make(map[string]map[*websocketChannel]*addressDetails), fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string), fiatRatesTokenSubscriptions: make(map[*websocketChannel][]string), } @@ -426,9 +432,9 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe return s.unsubscribeNewTransaction(c) }, "subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { - ad, err := s.unmarshalAddresses(req.Params) + ad, nbtxs, err := s.unmarshalAddresses(req.Params) if err == nil { - rv, err = s.subscribeAddresses(c, ad, req) + rv, err = s.subscribeAddresses(c, ad, nbtxs, req) } return }, @@ -884,21 +890,21 @@ func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res in return &subscriptionResponse{false}, nil } -func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, error) { +func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, bool, error) { r := WsSubscribeAddressesReq{} err := json.Unmarshal(params, &r) if err != nil { - return nil, err + return nil, false, err } rv := make([]string, len(r.Addresses)) for i, a := range r.Addresses { ad, err := s.chainParser.GetAddrDescFromAddress(a) if err != nil { - return nil, err + return nil, false, err } rv[i] = string(ad) } - return rv, nil + return rv, r.NewBlockTxs, nil } // doUnsubscribeAddresses addresses without addressSubscriptionsLock - can be called only from subscribeAddresses and unsubscribeAddresses @@ -906,8 +912,11 @@ func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) { for _, ads := range c.addrDescs { sa, e := s.addressSubscriptions[ads] if e { - for sc := range sa { + for sc, details := range sa { if sc == c { + if details.publishNewBlockTxs { + s.newBlockTxsSubscriptionCount-- + } delete(sa, c) } } @@ -919,7 +928,7 @@ func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) { c.addrDescs = nil } -func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []string, req *WsReq) (res interface{}, err error) { +func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []string, newBlockTxs bool, req *WsReq) (res interface{}, err error) { s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() // unsubscribe all previous subscriptions @@ -927,10 +936,16 @@ func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []str for _, ads := range addrDesc { as, ok := s.addressSubscriptions[ads] if !ok { - as = make(map[*websocketChannel]string) + as = make(map[*websocketChannel]*addressDetails) s.addressSubscriptions[ads] = as } - as[c] = req.ID + as[c] = &addressDetails{ + requestID: req.ID, + publishNewBlockTxs: newBlockTxs, + } + if newBlockTxs { + s.newBlockTxsSubscriptionCount++ + } } c.addrDescs = addrDesc s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeAddresses"})).Set(float64(len(s.addressSubscriptions))) @@ -1014,9 +1029,54 @@ func (s *WebsocketServer) onNewBlockAsync(hash string, height uint32) { glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") } +func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) { + for _, tx := range block.Txs { + var tokenTransfers bchain.TokenTransfers + var internalTransfers []bchain.EthereumInternalTransfer + if s.chainParser.GetChainType() == bchain.ChainEthereumType { + tokenTransfers, _ = s.chainParser.EthereumTypeGetTokenTransfersFromTx(&tx) + esd := tx.CoinSpecificData.(bchain.EthereumSpecificData) + if esd.InternalData != nil { + internalTransfers = esd.InternalData.Transfers + } + } + vins := make([]bchain.MempoolVin, len(tx.Vin)) + for i, vin := range tx.Vin { + vins[i] = bchain.MempoolVin{Vin: vin} + } + subscribed := s.getNewTxSubscriptions(vins, tx.Vout, tokenTransfers, internalTransfers) + if len(subscribed) > 0 { + go func(tx bchain.Tx, subscribed map[string]struct{}) { + if csd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData); ok { + receipt, err := s.chain.EthereumTypeGetTransactionReceipt(tx.Txid) + if err != nil { + glog.Error("EthereumTypeGetTransactionReceipt error ", err, " for ", tx.Txid) + return + } + csd.Receipt = receipt + tx.CoinSpecificData = csd + } + atx, err := s.api.GetTransactionFromBchainTx(&tx, int(block.Height), false, false, nil) + if err != nil { + glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid) + return + } + for stringAddressDescriptor := range subscribed { + s.sendOnNewTxAddr(stringAddressDescriptor, atx, true) + } + }(tx, subscribed) + } + } +} + // OnNewBlock is a callback that broadcasts info about new block to subscribed clients -func (s *WebsocketServer) OnNewBlock(hash string, height uint32) { - go s.onNewBlockAsync(hash, height) +func (s *WebsocketServer) OnNewBlock(block *bchain.Block) { + s.addressSubscriptionsLock.Lock() + defer s.addressSubscriptionsLock.Unlock() + go s.onNewBlockAsync(block.Hash, block.Height) + if s.newBlockTxsSubscriptionCount > 0 { + go s.publishNewBlockTxsByAddr(block) + } } func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) { @@ -1031,7 +1091,7 @@ func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) { glog.Info("broadcasting new tx ", tx.Txid, " to ", len(s.newTransactionSubscriptions), " channels") } -func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx) { +func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx, newBlockTx bool) { addrDesc := bchain.AddressDescriptor(stringAddressDescriptor) addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc) if err != nil { @@ -1050,9 +1110,12 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap defer s.addressSubscriptionsLock.Unlock() as, ok := s.addressSubscriptions[stringAddressDescriptor] if ok { - for c, id := range as { + for c, details := range as { + if newBlockTx && !details.publishNewBlockTxs { + continue + } c.DataOut(&WsRes{ - ID: id, + ID: details.requestID, Data: &data, }) } @@ -1061,48 +1124,51 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap } } -func (s *WebsocketServer) getNewTxSubscriptions(tx *bchain.MempoolTx) map[string]struct{} { - // check if there is any subscription in inputs, outputs and token transfers +func (s *WebsocketServer) getNewTxSubscriptions(vins []bchain.MempoolVin, vouts []bchain.Vout, tokenTransfers bchain.TokenTransfers, internalTransfers []bchain.EthereumInternalTransfer) map[string]struct{} { + // check if there is any subscription in inputs, outputs and transfers s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() subscribed := make(map[string]struct{}) - for i := range tx.Vin { - sad := string(tx.Vin[i].AddrDesc) - if len(sad) > 0 { - as, ok := s.addressSubscriptions[sad] - if ok && len(as) > 0 { + processAddress := func(address string) { + if addrDesc, err := s.chainParser.GetAddrDescFromAddress(address); err == nil && len(addrDesc) > 0 { + sad := string(addrDesc) + if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 { subscribed[sad] = struct{}{} } } } - for i := range tx.Vout { - addrDesc, err := s.chainParser.GetAddrDescFromVout(&tx.Vout[i]) - if err == nil && len(addrDesc) > 0 { + processVout := func(vout bchain.Vout) { + if addrDesc, err := s.chainParser.GetAddrDescFromVout(&vout); err == nil && len(addrDesc) > 0 { sad := string(addrDesc) - as, ok := s.addressSubscriptions[sad] - if ok && len(as) > 0 { + if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 { subscribed[sad] = struct{}{} } } } - for i := range tx.TokenTransfers { - addrDesc, err := s.chainParser.GetAddrDescFromAddress(tx.TokenTransfers[i].From) - if err == nil && len(addrDesc) > 0 { - sad := string(addrDesc) - as, ok := s.addressSubscriptions[sad] - if ok && len(as) > 0 { + for i := range vins { + if sad := string(vins[i].AddrDesc); len(sad) > 0 { + if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 { subscribed[sad] = struct{}{} } - } - addrDesc, err = s.chainParser.GetAddrDescFromAddress(tx.TokenTransfers[i].To) - if err == nil && len(addrDesc) > 0 { - sad := string(addrDesc) - as, ok := s.addressSubscriptions[sad] - if ok && len(as) > 0 { - subscribed[sad] = struct{}{} + } else if s.chainParser.GetChainType() == bchain.ChainBitcoinType { + processVout(vouts[vins[i].Vout]) + } else if s.chainParser.GetChainType() == bchain.ChainEthereumType { + if len(vins[i].Addresses) > 0 { + processAddress(vins[i].Addresses[0]) } } } + for i := range vouts { + processVout(vouts[i]) + } + for i := range tokenTransfers { + processAddress(tokenTransfers[i].From) + processAddress(tokenTransfers[i].To) + } + for i := range internalTransfers { + processAddress(internalTransfers[i].From) + processAddress(internalTransfers[i].To) + } return subscribed } @@ -1114,13 +1180,13 @@ func (s *WebsocketServer) onNewTxAsync(tx *bchain.MempoolTx, subscribed map[stri } s.sendOnNewTx(atx) for stringAddressDescriptor := range subscribed { - s.sendOnNewTxAddr(stringAddressDescriptor, atx) + s.sendOnNewTxAddr(stringAddressDescriptor, atx, false) } } // OnNewTx is a callback that broadcasts info about a tx affecting subscribed address func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) { - subscribed := s.getNewTxSubscriptions(tx) + subscribed := s.getNewTxSubscriptions(tx.Vin, tx.Vout, tx.TokenTransfers, nil) if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 { go s.onNewTxAsync(tx, subscribed) } diff --git a/server/ws_types.go b/server/ws_types.go index 3f17f6cea7..39ec11ff75 100644 --- a/server/ws_types.go +++ b/server/ws_types.go @@ -147,7 +147,8 @@ type WsSendTransactionReq struct { // WsSubscribeAddressesReq is used to subscribe to updates on a list of addresses. type WsSubscribeAddressesReq struct { - Addresses []string `json:"addresses" ts_doc:"List of addresses to subscribe for updates (e.g., new transactions)."` + Addresses []string `json:"addresses" ts_doc:"List of addresses to subscribe for updates (e.g., new transactions)."` + NewBlockTxs bool `json:"newBlockTxs,omitempty"` } // WsSubscribeFiatRatesReq subscribes to updates of fiat rates for a specific currency or set of tokens. diff --git a/static/test-websocket.html b/static/test-websocket.html index de9b588176..af3fae8632 100644 --- a/static/test-websocket.html +++ b/static/test-websocket.html @@ -385,8 +385,10 @@ function subscribeAddresses() { const method = 'subscribeAddresses'; var addresses = paramAsArray('subscribeAddressesName'); + var newBlockTxs = document.getElementById('newBlockTxs').checked const params = { addresses, + newBlockTxs, }; if (subscribeAddressesId) { delete subscriptions[subscribeAddressesId]; @@ -1264,6 +1266,16 @@

Blockbook Websocket Test Page

id="subscribeAddressesName" value="0xba98d6a5ac827632e3457de7512d211e4ff7e8bd,0x73d0385f4d8e00c5e6504c6030f47bf6212736a8" /> +
+ + +
From 7c1818977963d9ce668bbf5b3f39ec845acbd558 Mon Sep 17 00:00:00 2001 From: kaladinlight <35275952+kaladinlight@users.noreply.github.com> Date: Mon, 6 Oct 2025 10:20:02 -0600 Subject: [PATCH 2/2] copilot feedback --- docs/api.md | 4 ++-- server/websocket.go | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/api.md b/docs/api.md index 7aedc41d40..0f29729a2d 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1042,8 +1042,8 @@ Example for subscribing to an address (or multiple addresses) including new bloc "id":"1", "method":"subscribeAddresses", "params":{ - "addresses":["mnYYiDCb2JZXnqEeXta1nkt5oCVe2RVhJj", "tb1qp0we5epypgj4acd2c4au58045ruud2pd6heuee"] - "newBlockTxs" true, + "addresses":["mnYYiDCb2JZXnqEeXta1nkt5oCVe2RVhJj", "tb1qp0we5epypgj4acd2c4au58045ruud2pd6heuee"], + "newBlockTxs": true, } } ``` diff --git a/server/websocket.go b/server/websocket.go index 9444dea073..ce0664d659 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1151,7 +1151,10 @@ func (s *WebsocketServer) getNewTxSubscriptions(vins []bchain.MempoolVin, vouts subscribed[sad] = struct{}{} } } else if s.chainParser.GetChainType() == bchain.ChainBitcoinType { - processVout(vouts[vins[i].Vout]) + vout := int(vins[i].Vout) + if vout >= 0 && vout < len(vouts) { + processVout(vouts[vins[i].Vout]) + } } else if s.chainParser.GetChainType() == bchain.ChainEthereumType { if len(vins[i].Addresses) > 0 { processAddress(vins[i].Addresses[0])