Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit 3374ed3

Browse files
committed
sync_peer tx request selection rewrite
1 parent 77c60f8 commit 3374ed3

File tree

1 file changed

+27
-15
lines changed

1 file changed

+27
-15
lines changed

ethcore/sync/src/chain/mod.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ pub struct ChainSync {
723723
sync_start_time: Option<Instant>,
724724
/// Transactions propagation statistics
725725
transactions_stats: TransactionsStats,
726-
/// Unfetched transactions
726+
/// Transactions whose hash has been announced, but that we have not fetched
727727
unfetched_pooled_transactions: H256FastMap<UnfetchedTransaction>,
728728
/// Enable ancient block downloading
729729
download_old_blocks: bool,
@@ -1176,33 +1176,45 @@ impl ChainSync {
11761176
}
11771177
}
11781178

1179-
// get some peers to give us transaction pool
1179+
// get the peer to give us at least some of announced but unfetched transactions
11801180
if !self.unfetched_pooled_transactions.is_empty() {
1181-
if let Some(s) = &mut self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions {
1181+
if let Some(s) = &mut self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions {
11821182
let now = Instant::now();
11831183

11841184
let mut new_asking_pooled_transactions = s.iter().copied().collect::<HashSet<_>>();
1185-
let mut new_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
1186-
while new_asking_pooled_transactions.len() <= 256 {
1187-
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
1188-
if item.next_fetch < now {
1189-
new_asking_pooled_transactions.insert(hash);
1190-
item.tries += 1;
1191-
if item.tries < 5 {
1192-
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
1193-
new_unfetched_pooled_transactions.insert(hash, item);
1194-
}
1185+
let mut remaining_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
1186+
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
1187+
if new_asking_pooled_transactions.len() >= 256 {
1188+
// can't request any more transactions
1189+
break;
1190+
}
1191+
1192+
// if enough time has passed since last attempt...
1193+
if item.next_fetch < now {
1194+
// ...queue this hash for requesting
1195+
new_asking_pooled_transactions.insert(hash);
1196+
item.tries += 1;
1197+
1198+
// if we just started asking for it, queue it to be asked later on again
1199+
if item.tries < 5 {
1200+
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
1201+
remaining_unfetched_pooled_transactions.insert(hash, item);
1202+
} else {
1203+
// ...otherwise we assume this transaction does not exist and remove its hash from request queue
1204+
remaining_unfetched_pooled_transactions.remove(&hash);
11951205
}
11961206
}
11971207
}
11981208

11991209
let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::<Vec<_>>();
12001210
SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions);
12011211

1202-
self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions = Some(new_asking_pooled_transactions);
1203-
self.unfetched_pooled_transactions = new_unfetched_pooled_transactions;
1212+
self.peers.get_mut(&peer_id).expect("this is always an active peer; qed").asking_pooled_transactions = Some(new_asking_pooled_transactions);
1213+
self.unfetched_pooled_transactions = remaining_unfetched_pooled_transactions;
12041214

12051215
return;
1216+
} else {
1217+
trace!(target: "sync", "Skipping transaction fetch for peer {} as they don't support eth/65", peer_id);
12061218
}
12071219
}
12081220

0 commit comments

Comments
 (0)