Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit f8b22e2

Browse files
committed
Implement eth/65 (EIP-2464)
1 parent 1b23af3 commit f8b22e2

File tree

12 files changed

+222
-43
lines changed

12 files changed

+222
-43
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ethcore/client-traits/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,9 @@ pub trait BlockChainClient:
287287
/// Get transaction with given hash.
288288
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction>;
289289

290+
/// Get pool transaction with a given hash.
291+
fn pooled_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>>;
292+
290293
/// Get uncle with given id.
291294
fn uncle(&self, id: UncleId) -> Option<encoded::Header>;
292295

ethcore/src/client/client.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1889,6 +1889,10 @@ impl BlockChainClient for Client {
18891889
self.transaction_address(id).and_then(|address| self.chain.read().transaction(&address))
18901890
}
18911891

1892+
fn pooled_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>> {
1893+
self.importer.miner.transaction(&hash)
1894+
}
1895+
18921896
fn uncle(&self, id: UncleId) -> Option<encoded::Header> {
18931897
let index = id.position;
18941898
self.block_body(id.block).and_then(|body| body.view().uncle_rlp_at(index))

ethcore/src/test_helpers/test_client.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,9 @@ impl BlockChainClient for TestBlockChainClient {
741741
fn transaction(&self, _id: TransactionId) -> Option<LocalizedTransaction> {
742742
None // Simple default.
743743
}
744+
fn pooled_transaction(&self, _hash: H256) -> Option<Arc<VerifiedTransaction>> {
745+
None
746+
}
744747

745748
fn uncle(&self, _id: UncleId) -> Option<encoded::Header> {
746749
None // Simple default.

ethcore/sync/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ rlp = "0.4.5"
3434
snapshot = { path = "../snapshot" }
3535
trace-time = "0.1"
3636
triehash-ethereum = { version = "0.2", path = "../../util/triehash-ethereum" }
37+
transaction-pool = "2"
3738

3839
[dev-dependencies]
3940
env_logger = "0.5"

ethcore/sync/src/chain/handler.rs

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,11 @@ use crate::{
2626
sync_packet::{
2727
PacketInfo,
2828
SyncPacket::{
29-
self, BlockBodiesPacket, BlockHeadersPacket, NewBlockHashesPacket, NewBlockPacket,
30-
PrivateStatePacket, PrivateTransactionPacket, ReceiptsPacket, SignedPrivateTransactionPacket,
31-
SnapshotDataPacket, SnapshotManifestPacket, StatusPacket,
29+
self, *,
3230
}
3331
},
3432
BlockSet, ChainSync, ForkConfirmation, PacketDecodeError, PeerAsking, PeerInfo, SyncRequester,
35-
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
33+
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
3634
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4,
3735
}
3836
};
@@ -53,6 +51,7 @@ use common_types::{
5351
verification::Unverified,
5452
snapshot::{ManifestData, RestorationStatus},
5553
};
54+
use transaction_pool::VerifiedTransaction;
5655

5756

5857
/// The Chain Sync Handler: handles responses from peers
@@ -70,6 +69,8 @@ impl SyncHandler {
7069
ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
7170
NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
7271
NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
72+
NewPooledTransactionHashesPacket => SyncHandler::on_peer_new_pooled_transactions(sync, io, peer, &rlp),
73+
PooledTransactionsPacket => SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp),
7374
SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
7475
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
7576
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
@@ -590,9 +591,11 @@ impl SyncHandler {
590591
difficulty,
591592
latest_hash,
592593
genesis,
594+
unsent_pooled_hashes: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(io.chain().transactions_to_propagate().into_iter().map(|tx| *tx.hash()).collect()) } else { None },
593595
asking: PeerAsking::Nothing,
594596
asking_blocks: Vec::new(),
595597
asking_hash: None,
598+
asking_pooled_transactions: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(Vec::new()) } else { None },
596599
asking_private_state: None,
597600
ask_time: Instant::now(),
598601
last_sent_transactions: Default::default(),
@@ -651,7 +654,7 @@ impl SyncHandler {
651654

652655
if false
653656
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_4.0))
654-
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_64.0))
657+
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_65.0))
655658
{
656659
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
657660
return Err(DownloaderImportError::Invalid);
@@ -698,6 +701,61 @@ impl SyncHandler {
698701
Ok(())
699702
}
700703

704+
/// Called when peer requests a set of pooled transactions
705+
pub fn on_peer_new_pooled_transactions(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> {
706+
for item in tx_rlp {
707+
let hash = item.as_val::<H256>().map_err(|_| DownloaderImportError::Invalid)?;
708+
709+
if io.chain().pooled_transaction(hash).is_none() {
710+
let unfetched = sync.unfetched_pooled_transactions.entry(hash).or_insert_with(|| super::UnfetchedTransaction {
711+
announcer: peer_id,
712+
next_fetch: Instant::now(),
713+
tries: 0,
714+
});
715+
716+
// Only reset the budget if we hear from multiple sources
717+
if unfetched.announcer != peer_id {
718+
unfetched.next_fetch = Instant::now();
719+
unfetched.tries = 0;
720+
}
721+
}
722+
}
723+
724+
Ok(())
725+
}
726+
727+
/// Called when peer sends us a list of pooled transactions
728+
pub fn on_peer_pooled_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> {
729+
let peer = match sync.peers.get(&peer_id).filter(|p| p.can_sync()) {
730+
Some(peer) => peer,
731+
None => {
732+
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
733+
return Ok(());
734+
}
735+
};
736+
737+
// TODO: actually check against asked hashes
738+
let item_count = tx_rlp.item_count()?;
739+
if let Some(p) = &peer.asking_pooled_transactions {
740+
if item_count > p.len() {
741+
trace!(target: "sync", "{} Peer sent us more transactions than was supposed to", peer_id);
742+
return Err(DownloaderImportError::Invalid);
743+
}
744+
} else {
745+
trace!(target: "sync", "{} Peer sent us pooled transactions but does not declare support for them", peer_id);
746+
return Err(DownloaderImportError::Invalid);
747+
}
748+
trace!(target: "sync", "{:02} -> PooledTransactions ({} entries)", peer_id, item_count);
749+
let mut transactions = Vec::with_capacity(item_count);
750+
for i in 0 .. item_count {
751+
let rlp = tx_rlp.at(i)?;
752+
let tx = rlp.as_raw().to_vec();
753+
transactions.push(tx);
754+
}
755+
io.chain().queue_transactions(transactions, peer_id);
756+
Ok(())
757+
}
758+
701759
/// Called when peer sends us signed private transaction packet
702760
fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
703761
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {

ethcore/sync/src/chain/mod.rs

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ malloc_size_of_is_0!(PeerInfo);
149149

150150
pub type PacketDecodeError = DecoderError;
151151

152+
/// Version 65 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
153+
pub const ETH_PROTOCOL_VERSION_65: (u8, u8) = (65, 0x11);
152154
/// Version 64 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
153155
pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11);
154156
/// Version 63 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
@@ -200,6 +202,7 @@ const STATUS_TIMEOUT: Duration = Duration::from_secs(10);
200202
const HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
201203
const BODIES_TIMEOUT: Duration = Duration::from_secs(20);
202204
const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10);
205+
const POOLED_TRANSACTIONS_TIMEOUT: Duration = Duration::from_secs(10);
203206
const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
204207
/// Max time to wait for the Snapshot Manifest packet to arrive from a peer after it's being asked.
205208
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
@@ -301,6 +304,7 @@ pub enum PeerAsking {
301304
BlockHeaders,
302305
BlockBodies,
303306
BlockReceipts,
307+
PooledTransactions,
304308
SnapshotManifest,
305309
SnapshotData,
306310
PrivateState,
@@ -335,6 +339,8 @@ pub struct PeerInfo {
335339
network_id: u64,
336340
/// Peer best block hash
337341
latest_hash: H256,
342+
/// Unpropagated tx pool hashes
343+
unsent_pooled_hashes: Option<H256FastSet>,
338344
/// Peer total difficulty if known
339345
difficulty: Option<U256>,
340346
/// Type of data currently being requested by us from a peer.
@@ -343,6 +349,8 @@ pub struct PeerInfo {
343349
asking_blocks: Vec<H256>,
344350
/// Holds requested header hash if currently requesting block header by hash
345351
asking_hash: Option<H256>,
352+
/// Holds requested transaction IDs
353+
asking_pooled_transactions: Option<Vec<H256>>,
346354
/// Holds requested private state hash
347355
asking_private_state: Option<H256>,
348356
/// Holds requested snapshot chunk hash if any.
@@ -641,6 +649,13 @@ enum PeerState {
641649
SameBlock
642650
}
643651

652+
#[derive(Clone, MallocSizeOf)]
653+
struct UnfetchedTransaction {
654+
announcer: PeerId,
655+
next_fetch: Instant,
656+
tries: usize,
657+
}
658+
644659
/// Blockchain sync handler.
645660
/// See module documentation for more details.
646661
#[derive(MallocSizeOf)]
@@ -676,6 +691,8 @@ pub struct ChainSync {
676691
sync_start_time: Option<Instant>,
677692
/// Transactions propagation statistics
678693
transactions_stats: TransactionsStats,
694+
/// Unfetched transactions
695+
unfetched_pooled_transactions: H256FastMap<UnfetchedTransaction>,
679696
/// Enable ancient block downloading
680697
download_old_blocks: bool,
681698
/// Shared private tx service.
@@ -717,6 +734,7 @@ impl ChainSync {
717734
snapshot: Snapshot::new(),
718735
sync_start_time: None,
719736
transactions_stats: TransactionsStats::default(),
737+
unfetched_pooled_transactions: Default::default(),
720738
private_tx_handler,
721739
warp_sync: config.warp_sync,
722740
status_sinks: Vec::new()
@@ -730,7 +748,7 @@ impl ChainSync {
730748
let last_imported_number = self.new_blocks.last_imported_block_number();
731749
SyncStatus {
732750
state: self.state.clone(),
733-
protocol_version: ETH_PROTOCOL_VERSION_64.0,
751+
protocol_version: ETH_PROTOCOL_VERSION_65.0,
734752
network_id: self.network_id,
735753
start_block_number: self.starting_block,
736754
last_imported_block_number: Some(last_imported_number),
@@ -764,8 +782,17 @@ impl ChainSync {
764782

765783
/// Updates the set of transactions recently sent to this peer to avoid spamming.
766784
pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) {
767-
if let Some(peer_info) = self.peers.get_mut(&peer_id) {
768-
peer_info.last_sent_transactions.extend(txs.iter().map(|tx| tx.hash()));
785+
for (id, peer) in &mut self.peers {
786+
let hashes = txs.iter().map(|tx| tx.hash());
787+
if *id == peer_id {
788+
peer.last_sent_transactions.extend(hashes);
789+
} else if let Some(s) = &mut peer.unsent_pooled_hashes {
790+
s.extend(hashes);
791+
}
792+
}
793+
794+
for tx in txs {
795+
self.unfetched_pooled_transactions.remove(&tx.hash());
769796
}
770797
}
771798

@@ -1099,6 +1126,36 @@ impl ChainSync {
10991126
}
11001127
}
11011128

1129+
// get some peers to give us transaction pool
1130+
if !self.unfetched_pooled_transactions.is_empty() {
1131+
if let Some(s) = &mut self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions {
1132+
let now = Instant::now();
1133+
1134+
let mut new_asking_pooled_transactions = s.iter().copied().collect::<HashSet<_>>();
1135+
let mut new_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
1136+
while new_asking_pooled_transactions.len() <= 256 {
1137+
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
1138+
if item.next_fetch < now {
1139+
new_asking_pooled_transactions.insert(hash);
1140+
item.tries += 1;
1141+
if item.tries < 5 {
1142+
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
1143+
new_unfetched_pooled_transactions.insert(hash, item);
1144+
}
1145+
}
1146+
}
1147+
}
1148+
1149+
let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::<Vec<_>>();
1150+
SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions);
1151+
1152+
self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions = Some(new_asking_pooled_transactions);
1153+
self.unfetched_pooled_transactions = new_unfetched_pooled_transactions;
1154+
1155+
return;
1156+
}
1157+
}
1158+
11021159
// Only ask for old blocks if the peer has an equal or higher difficulty
11031160
let equal_or_higher_difficulty = peer_difficulty.map_or(true, |pd| pd >= syncing_difficulty);
11041161

@@ -1290,6 +1347,7 @@ impl ChainSync {
12901347
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT,
12911348
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT,
12921349
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT,
1350+
PeerAsking::PooledTransactions => elapsed > POOLED_TRANSACTIONS_TIMEOUT,
12931351
PeerAsking::Nothing => false,
12941352
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT,
12951353
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT,
@@ -1618,10 +1676,12 @@ pub mod tests {
16181676
genesis: H256::zero(),
16191677
network_id: 0,
16201678
latest_hash: peer_latest_hash,
1679+
unsent_pooled_hashes: Some(Default::default()),
16211680
difficulty: None,
16221681
asking: PeerAsking::Nothing,
16231682
asking_blocks: Vec::new(),
16241683
asking_hash: None,
1684+
asking_pooled_transactions: Some(Vec::new()),
16251685
asking_private_state: None,
16261686
ask_time: Instant::now(),
16271687
last_sent_transactions: Default::default(),

0 commit comments

Comments
 (0)